{-# LANGUAGE LambdaCase          #-}
{-# LANGUAGE NamedFieldPuns      #-}
{-# LANGUAGE RankNTypes          #-}
{-# LANGUAGE ScopedTypeVariables #-}
{-# LANGUAGE TupleSections       #-}
{-# LANGUAGE TypeFamilies        #-}
{-# LANGUAGE TypeOperators       #-}
module Ouroboros.Network.BlockFetch.Decision.Genesis
  ( -- * Genesis decision logic
    --
    -- | This module contains the part of the block fetch decisions process that is
    -- specific to the bulk sync mode. This logic reuses parts of the logic for the
    -- deadline mode, but it is inherently different.
    --
    -- Definitions:
    --
    -- - Let @inflight :: peer -> Set blk@ be the outstanding blocks, those that
    --   have been requested and are expected to arrive but have not yet.
    --
    -- - Let @peersOrder@ be an order of preference among the peers. This order is
    --   not set in stone and will evolve as we go.
    --
    -- - Let @currentPeer :: Maybe peer@ be the “current peer” with which we are
    --   interacting. If it exists, this peer must be the best according to
    --   @peersOrder@, and the last fetch request must have been sent to them.
    --
    -- - Let @currentStart :: Time@ be the latest time a fetch request was sent
    --   while there were no outstanding blocks.
    --
    -- - Let @gracePeriod@ be a small duration (eg. 10s), during which a “cold” peer
    --   is allowed to warm up (eg. grow the TCP window) before being expected to
    --   feed blocks faster than we can validate them.
    --
    -- One iteration of this decision logic:
    --
    -- 0. If @inflight(currentPeer)@ is non-empty and the block validation component
    --    has idled at any point after @currentStart@ plus @gracePeriod@, then the
    --    peer @currentPeer@ has failed to promptly serve @inflight(currentPeer)@,
    --    and:
    --
    --   - If @currentPeer@ is the ChainSync Jumping dynamo, then it must
    --     immediately be replaced as the dynamo.
    --
    --   - Stop considering the peer “current” and make them the worst according to
    --     the @peersOrder@.
    --
    -- 1. Select @theCandidate :: AnchoredFragment (Header blk)@. This is the best
    --    candidate header chain among the ChainSync clients (eg. best raw
    --    tiebreaker among the longest).
    --
    -- 2. Select @thePeer :: peer@.
    --
    --    - Let @grossRequest@ be the oldest block on @theCandidate@ that has not
    --      already been downloaded.
    --
    --    - If @grossRequest@ is empty, then terminate this iteration. Otherwise,
    --      pick the best peer (according to @peersOrder@) offering the block in
    --      @grossRequest@.
    --
    -- 3. Craft the actual request to @thePeer@ asking blocks of @theCandidate@:
    --
    --    - If the byte size of @inflight(thePeer)@ is below the low-water mark,
    --      then terminate this iteration.
    --
    --    - Decide and send the actual next batch request, as influenced by exactly
    --      which blocks are actually already currently in-flight with @thePeer@.
    --
    -- 4. If we went through the election of a new peer, replace @currentPeer@ and
    --    put the new peer at the front of @peersOrder@. Also reset @currentStart@
    --    if @inflights(thePeer)@ is empty.
    --
    -- Terminate this iteration.

    -- * About the influence of in-flight requests
    --
    -- | One can note that in-flight requests are ignored when finding a new peer, but
    -- considered when crafting the actual request to a chosen peer. This is by
    -- design. We explain the rationale here.
    --
    -- If a peer proves too slow, then we give up on it (see point 0. above), even
    -- if it has requests in-flight. In subsequent selections of peers (point 2.),
    -- the blocks in these requests will not be removed from @theCandidate@ as, as
    -- far as we know, these requests might not return (until the connection to that
    -- peer is terminated by the mini protocol timeout).
    --
    -- When crafting the actual request, we do need to consider the in-flight
    -- requests of the peer, to avoid clogging our network. If some of these
    -- in-flight requests date from when the peer was previously “current”, this
    -- means that we cycled through all the peers that provided @theCandidate@ and
    -- they all failed to serve our blocks promptly.
    --
    -- This is a degenerate case of the algorithm that might happen but only be
    -- transient. Soon enough, @theCandidate@ should be honest (if the consensus
    -- layer does its job correctly), and there should exist an honest peer ready to
    -- serve @theCandidate@ promptly.

    -- * Interactions with ChainSync Jumping (CSJ)
    --
    -- | Because we always require our peers to be able to serve a gross request
    -- with an old block, peers with longer chains have a better chance to pass
    -- this criteria and to be selected as current peer. The CSJ dynamo, being
    -- always ahead of jumpers, has therefore more chances to be selected as the
    -- current peer. It is still possible for a jumper or a disengaged peer to be
    -- selected.
    --
    -- If the current peer is the CSJ dynamo and it is a dishonest peer that retains
    -- blocks, it will get multiple opportunities to do so since it will be selected
    -- as the current peer more often. We therefore rotate the dynamo every time it
    -- is the current peer and it fails to serve blocks promptly.

    -- * About the gross request
    --
    -- | We want to select a peer that is able to serve us a batch of oldest blocks
    -- of @theCandidate@. However, not every peer will be able to deliver these
    -- batches as they might be on different chains. We therefore select a peer only
    -- if its candidate fragment contains the block in the gross request. In this
    -- way, we ensure that the peer can serve at least one block that we wish to
    -- fetch.
    --
    -- If the peer cannot offer any more blocks after that, it will be rotated out
    -- soon.
    fetchDecisionsGenesisM
  ) where

import Control.Exception (assert)
import Control.Monad (guard)
import Control.Monad.Class.MonadTime.SI (MonadMonotonicTime (getMonotonicTime),
           addTime)
import Control.Monad.Trans.Maybe (MaybeT (MaybeT, runMaybeT))
import Control.Monad.Trans.Writer.CPS (Writer, runWriter, tell)
import Control.Tracer (Tracer, traceWith)
import Data.Bifunctor (Bifunctor (..), first)
import Data.DList (DList)
import Data.DList qualified as DList
import Data.Foldable (find, toList)
import Data.List qualified as List
import Data.Maybe (maybeToList)
import Data.Sequence (Seq (..), (<|), (><), (|>))
import Data.Sequence qualified as Sequence
import Data.Set qualified as Set

import Cardano.Prelude (partitionEithers)

import Ouroboros.Network.AnchoredFragment (AnchoredFragment)
import Ouroboros.Network.AnchoredFragment qualified as AF
import Ouroboros.Network.Block
import Ouroboros.Network.BlockFetch.ClientState (FetchRequest (..),
           PeerFetchInFlight (..), PeersOrder (..))
import Ouroboros.Network.BlockFetch.ConsensusInterface (ChainSelStarvation (..),
           FetchMode (..))
import Ouroboros.Network.BlockFetch.DeltaQ (calculatePeerFetchInFlightLimits)

import Cardano.Slotting.Slot (WithOrigin)
import Ouroboros.Network.BlockFetch.Decision
import Ouroboros.Network.BlockFetch.Decision.Trace (TraceDecisionEvent (..))


type WithDeclined peer = Writer (DList (FetchDecline, peer))

runWithDeclined :: WithDeclined peer a -> (a, DList (FetchDecline, peer))
runWithDeclined :: forall peer a.
WithDeclined peer a -> (a, DList (FetchDecline, peer))
runWithDeclined = Writer (DList (FetchDecline, peer)) a
-> (a, DList (FetchDecline, peer))
forall w a. Monoid w => Writer w a -> (a, w)
runWriter

fetchDecisionsGenesisM
  :: forall peer header block m extra.
     (Ord peer,
      HasHeader header,
      HeaderHash header ~ HeaderHash block, MonadMonotonicTime m)
  => Tracer m (TraceDecisionEvent peer header)
  -> FetchDecisionPolicy header
  -> AnchoredFragment header
  -> (Point block -> Bool)
     -- ^ Whether the block has been fetched (only if recent, i.e. within @k@).
  -> MaxSlotNo
  -> ChainSelStarvation
  -> ( PeersOrder peer
     , PeersOrder peer -> m ()
     , peer -> m ()
     )
  -> [(AnchoredFragment header, PeerInfo header peer extra)]
  -> m [(FetchDecision (FetchRequest header), PeerInfo header peer extra)]
fetchDecisionsGenesisM :: forall peer header block (m :: * -> *) extra.
(Ord peer, HasHeader header, HeaderHash header ~ HeaderHash block,
 MonadMonotonicTime m) =>
Tracer m (TraceDecisionEvent peer header)
-> FetchDecisionPolicy header
-> AnchoredFragment header
-> (Point block -> Bool)
-> MaxSlotNo
-> ChainSelStarvation
-> (PeersOrder peer, PeersOrder peer -> m (), peer -> m ())
-> [(AnchoredFragment header, PeerInfo header peer extra)]
-> m [(FetchDecision (FetchRequest header),
       PeerInfo header peer extra)]
fetchDecisionsGenesisM
  Tracer m (TraceDecisionEvent peer header)
tracer
  fetchDecisionPolicy :: FetchDecisionPolicy header
fetchDecisionPolicy@FetchDecisionPolicy {DiffTime
bulkSyncGracePeriod :: DiffTime
bulkSyncGracePeriod :: forall header. FetchDecisionPolicy header -> DiffTime
bulkSyncGracePeriod}
  AnchoredFragment header
currentChain
  Point block -> Bool
fetchedBlocks
  MaxSlotNo
fetchedMaxSlotNo
  ChainSelStarvation
chainSelStarvation
  ( PeersOrder peer
peersOrder0,
    PeersOrder peer -> m ()
writePeersOrder,
    peer -> m ()
demoteCSJDynamo
    )
  [(AnchoredFragment header, PeerInfo header peer extra)]
candidatesAndPeers = do
    peersOrder1 <- PeersOrder peer -> m (PeersOrder peer)
checkLastChainSelStarvation PeersOrder peer
peersOrder0

    let (peersOrder, orderedCandidatesAndPeers) =
          alignPeersOrderWithActualPeers
            (peerInfoPeer . snd)
            (Sequence.fromList candidatesAndPeers)
            peersOrder1

    -- Compute the actual block fetch decision. This contains only declines and
    -- at most one request. 'theDecision' is therefore a 'Maybe'.
    let (theDecision, declines) =
          fetchDecisionsGenesis
            fetchDecisionPolicy
            currentChain
            fetchedBlocks
            fetchedMaxSlotNo
            (toList orderedCandidatesAndPeers)

        newCurrentPeer = PeerInfo header peer extra -> peer
forall header peer extra. PeerInfo header peer extra -> peer
peerInfoPeer (PeerInfo header peer extra -> peer)
-> ((FetchRequest header, PeerInfo header peer extra)
    -> PeerInfo header peer extra)
-> (FetchRequest header, PeerInfo header peer extra)
-> peer
forall b c a. (b -> c) -> (a -> b) -> a -> c
. (FetchRequest header, PeerInfo header peer extra)
-> PeerInfo header peer extra
forall a b. (a, b) -> b
snd ((FetchRequest header, PeerInfo header peer extra) -> peer)
-> Maybe (FetchRequest header, PeerInfo header peer extra)
-> Maybe peer
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> Maybe (FetchRequest header, PeerInfo header peer extra)
theDecision

    case theDecision of
      Just (FetchRequest header
_, (PeerFetchStatus header
_, PeerFetchInFlight header
inflight, PeerGSV
_, peer
_, extra
_))
        | Set (Point header) -> Bool
forall a. Set a -> Bool
Set.null (PeerFetchInFlight header -> Set (Point header)
forall header. PeerFetchInFlight header -> Set (Point header)
peerFetchBlocksInFlight PeerFetchInFlight header
inflight)
       -- If there were no blocks in flight, then this will be the first request,
       -- so we take a new current time.
       -> do
          peersOrderStart <- m Time
forall (m :: * -> *). MonadMonotonicTime m => m Time
getMonotonicTime
          writePeersOrder $ setCurrentPeer newCurrentPeer peersOrder
            { peersOrderStart }
        | Maybe peer
newCurrentPeer Maybe peer -> Maybe peer -> Bool
forall a. Eq a => a -> a -> Bool
/= PeersOrder peer -> Maybe peer
forall peer. PeersOrder peer -> Maybe peer
peersOrderCurrent PeersOrder peer
peersOrder0
       -- If the new current peer is not the old one, then we update the current
       -- peer
       ->
          PeersOrder peer -> m ()
writePeersOrder (PeersOrder peer -> m ()) -> PeersOrder peer -> m ()
forall a b. (a -> b) -> a -> b
$ Maybe peer -> PeersOrder peer -> PeersOrder peer
setCurrentPeer Maybe peer
newCurrentPeer PeersOrder peer
peersOrder
      Maybe (FetchRequest header, PeerInfo header peer extra)
_ -> () -> m ()
forall a. a -> m a
forall (f :: * -> *) a. Applicative f => a -> f a
pure ()

    pure $
      map (first Right) (maybeToList theDecision)
        ++ map (first Left) declines
    where
      -- Align the peers order with the actual peers; this consists in removing
      -- all peers from the peers order that are not in the actual peers list and
      -- adding at the end of the peers order all the actual peers that were not
      -- there before.
      alignPeersOrderWithActualPeers
        :: forall d.
           (d -> peer)
        -> Seq d
        -> PeersOrder peer
        -> (PeersOrder peer, Seq d)
      alignPeersOrderWithActualPeers :: forall d.
(d -> peer) -> Seq d -> PeersOrder peer -> (PeersOrder peer, Seq d)
alignPeersOrderWithActualPeers
        d -> peer
peerOf
        Seq d
actualPeers
        PeersOrder {Time
peersOrderStart :: forall peer. PeersOrder peer -> Time
peersOrderStart :: Time
peersOrderStart, Maybe peer
peersOrderCurrent :: forall peer. PeersOrder peer -> Maybe peer
peersOrderCurrent :: Maybe peer
peersOrderCurrent, Seq peer
peersOrderAll :: Seq peer
peersOrderAll :: forall peer. PeersOrder peer -> Seq peer
peersOrderAll} =
          let peersOrderAll' :: Seq d
              peersOrderAll' :: Seq d
peersOrderAll' =
                    (peer -> Seq d -> Seq d) -> Seq d -> Seq peer -> Seq d
forall a b. (a -> b -> b) -> b -> Seq a -> b
forall (t :: * -> *) a b.
Foldable t =>
(a -> b -> b) -> b -> t a -> b
foldr (\peer
p Seq d
ds ->
                            case (d -> Bool) -> Seq d -> Maybe d
forall (t :: * -> *) a. Foldable t => (a -> Bool) -> t a -> Maybe a
find ((peer
p peer -> peer -> Bool
forall a. Eq a => a -> a -> Bool
==) (peer -> Bool) -> (d -> peer) -> d -> Bool
forall b c a. (b -> c) -> (a -> b) -> a -> c
. d -> peer
peerOf) Seq d
actualPeers of
                              Just d
d  -> d
d d -> Seq d -> Seq d
forall a. a -> Seq a -> Seq a
<| Seq d
ds
                              Maybe d
Nothing -> Seq d
ds
                          )
                          Seq d
forall a. Seq a
Sequence.empty
                          Seq peer
peersOrderAll
                 Seq d -> Seq d -> Seq d
forall a. Seq a -> Seq a -> Seq a
>< (d -> Bool) -> Seq d -> Seq d
forall a. (a -> Bool) -> Seq a -> Seq a
Sequence.filter ((peer -> Seq peer -> Bool
forall (t :: * -> *) a. (Foldable t, Eq a) => a -> t a -> Bool
`notElem` Seq peer
peersOrderAll) (peer -> Bool) -> (d -> peer) -> d -> Bool
forall b c a. (b -> c) -> (a -> b) -> a -> c
. d -> peer
peerOf) Seq d
actualPeers
              -- Set the current peer to Nothing if it is not at the front of
              -- the list.
              peersOrderCurrent' :: Maybe peer
              peersOrderCurrent' :: Maybe peer
peersOrderCurrent' = do
                peer <- Maybe peer
peersOrderCurrent
                guard $ case peersOrderAll' of
                  d
d Sequence.:<| Seq d
_ -> d -> peer
peerOf d
d peer -> peer -> Bool
forall a. Eq a => a -> a -> Bool
== peer
peer
                  Seq d
Sequence.Empty   -> Bool
False
                pure peer
           in (PeersOrder
                { peersOrderCurrent :: Maybe peer
peersOrderCurrent = Maybe peer
peersOrderCurrent',
                  -- INVARIANT met: Current peer is at the front if it exists
                  peersOrderAll :: Seq peer
peersOrderAll = (d -> peer) -> Seq d -> Seq peer
forall a b. (a -> b) -> Seq a -> Seq b
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
fmap d -> peer
peerOf Seq d
peersOrderAll',
                  Time
peersOrderStart :: Time
peersOrderStart :: Time
peersOrderStart
                }
              , Seq d
peersOrderAll'
              )

      -- If the chain selection has been starved recently, that is after the
      -- current peer started (and a grace period), then the current peer is
      -- bad. We push it at the end of the queue, demote it from CSJ dynamo,
      -- and ignore its in-flight blocks for the future.
      checkLastChainSelStarvation :: PeersOrder peer -> m (PeersOrder peer)
      checkLastChainSelStarvation :: PeersOrder peer -> m (PeersOrder peer)
checkLastChainSelStarvation
        peersOrder :: PeersOrder peer
peersOrder@PeersOrder {Time
peersOrderStart :: forall peer. PeersOrder peer -> Time
peersOrderStart :: Time
peersOrderStart, Maybe peer
peersOrderCurrent :: forall peer. PeersOrder peer -> Maybe peer
peersOrderCurrent :: Maybe peer
peersOrderCurrent, Seq peer
peersOrderAll :: forall peer. PeersOrder peer -> Seq peer
peersOrderAll :: Seq peer
peersOrderAll} = do
          lastStarvationTime <- case ChainSelStarvation
chainSelStarvation of
            ChainSelStarvationEndedAt Time
time -> Time -> m Time
forall a. a -> m a
forall (f :: * -> *) a. Applicative f => a -> f a
pure Time
time
            ChainSelStarvation
ChainSelStarvationOngoing      -> m Time
forall (m :: * -> *). MonadMonotonicTime m => m Time
getMonotonicTime
          case peersOrderCurrent of
            Just peer
peer
              | Time
lastStarvationTime Time -> Time -> Bool
forall a. Ord a => a -> a -> Bool
>= DiffTime -> Time -> Time
addTime DiffTime
bulkSyncGracePeriod Time
peersOrderStart -> do
                  Tracer m (TraceDecisionEvent peer header)
-> TraceDecisionEvent peer header -> m ()
forall (m :: * -> *) a. Tracer m a -> a -> m ()
traceWith Tracer m (TraceDecisionEvent peer header)
tracer (peer -> TraceDecisionEvent peer header
forall peer header. peer -> TraceDecisionEvent peer header
PeerStarvedUs peer
peer)
                  peer -> m ()
demoteCSJDynamo peer
peer
                  PeersOrder peer -> m (PeersOrder peer)
forall a. a -> m a
forall (f :: * -> *) a. Applicative f => a -> f a
pure PeersOrder
                         {
                           peersOrderCurrent :: Maybe peer
peersOrderCurrent = Maybe peer
forall a. Maybe a
Nothing,
                           -- INVARIANT met: there is no current peer
                           peersOrderAll :: Seq peer
peersOrderAll = Int -> Seq peer -> Seq peer
forall a. Int -> Seq a -> Seq a
Sequence.drop Int
1 Seq peer
peersOrderAll Seq peer -> peer -> Seq peer
forall a. Seq a -> a -> Seq a
|> peer
peer,
                           Time
peersOrderStart :: Time
peersOrderStart :: Time
peersOrderStart
                         }
            Maybe peer
_ -> PeersOrder peer -> m (PeersOrder peer)
forall a. a -> m a
forall (f :: * -> *) a. Applicative f => a -> f a
pure PeersOrder peer
peersOrder

      setCurrentPeer :: Maybe peer -> PeersOrder peer -> PeersOrder peer
      setCurrentPeer :: Maybe peer -> PeersOrder peer -> PeersOrder peer
setCurrentPeer Maybe peer
Nothing PeersOrder peer
peersOrder = PeersOrder peer
peersOrder {peersOrderCurrent = Nothing}
      setCurrentPeer (Just peer
peer) PeersOrder peer
peersOrder =
        case (peer -> Bool) -> Seq peer -> (Seq peer, Seq peer)
forall a. (a -> Bool) -> Seq a -> (Seq a, Seq a)
Sequence.breakl (peer
peer peer -> peer -> Bool
forall a. Eq a => a -> a -> Bool
==) (PeersOrder peer -> Seq peer
forall peer. PeersOrder peer -> Seq peer
peersOrderAll PeersOrder peer
peersOrder) of
          (Seq peer
xs, peer
p :<| Seq peer
ys) ->
            PeersOrder peer
peersOrder
              { peersOrderCurrent = Just p,
                -- INVARIANT met: Current peer is at the front
                peersOrderAll = p <| xs >< ys
              }
          (Seq peer
_, Seq peer
Empty) -> PeersOrder peer
peersOrder {peersOrderCurrent = Nothing}

-- | Given a list of candidate fragments and their associated peers, choose what
-- to sync from who in the bulk sync mode.
fetchDecisionsGenesis
  :: forall header block peer extra.
     ( HasHeader header
     , HeaderHash header ~ HeaderHash block
     )
  => FetchDecisionPolicy header
  -> AnchoredFragment header
     -- ^ The current chain, anchored at the immutable tip.
  -> (Point block -> Bool)
     -- ^ Whether the block has been fetched (only if recent, i.e. within @k@).
  -> MaxSlotNo
  -> [(AnchoredFragment header, PeerInfo header peer extra)]
     -- ^ Association list of the candidate fragments and their associated peers.
     -- The candidate fragments are anchored in the current chain (not necessarily
     -- at the tip; and not necessarily forking off immediately).
  -> ( Maybe (FetchRequest header, PeerInfo header peer extra),
       [(FetchDecline, PeerInfo header peer extra)]
     )
     -- ^ Association list of the requests and their associated peers. There is at
     -- most one accepted request; everything else is declined. Morally, this is a
     -- map from peers to @'FetchDecision' ('FetchRequest' header)@ with at most
     -- one @'FetchRequest' header@.
fetchDecisionsGenesis :: forall header block peer extra.
(HasHeader header, HeaderHash header ~ HeaderHash block) =>
FetchDecisionPolicy header
-> AnchoredFragment header
-> (Point block -> Bool)
-> MaxSlotNo
-> [(AnchoredFragment header, PeerInfo header peer extra)]
-> (Maybe (FetchRequest header, PeerInfo header peer extra),
    [(FetchDecline, PeerInfo header peer extra)])
fetchDecisionsGenesis
  FetchDecisionPolicy header
fetchDecisionPolicy
  AnchoredFragment header
currentChain
  Point block -> Bool
fetchedBlocks
  MaxSlotNo
fetchedMaxSlotNo
  [(AnchoredFragment header, PeerInfo header peer extra)]
candidatesAndPeers = MaybeT
  (WithDeclined (PeerInfo header peer extra))
  (FetchRequest header, PeerInfo header peer extra)
-> (Maybe (FetchRequest header, PeerInfo header peer extra),
    [(FetchDecline, PeerInfo header peer extra)])
forall peerInfo a.
MaybeT (WithDeclined peerInfo) (a, peerInfo)
-> (Maybe (a, peerInfo), [(FetchDecline, peerInfo)])
combineWithDeclined (MaybeT
   (WithDeclined (PeerInfo header peer extra))
   (FetchRequest header, PeerInfo header peer extra)
 -> (Maybe (FetchRequest header, PeerInfo header peer extra),
     [(FetchDecline, PeerInfo header peer extra)]))
-> MaybeT
     (WithDeclined (PeerInfo header peer extra))
     (FetchRequest header, PeerInfo header peer extra)
-> (Maybe (FetchRequest header, PeerInfo header peer extra),
    [(FetchDecline, PeerInfo header peer extra)])
forall a b. (a -> b) -> a -> b
$ do
    -- Step 1: Select the candidate to sync from. This already eliminates peers
    -- that have an implausible candidate. It returns the remaining candidates
    -- (with their corresponding peer) as suffixes of the immutable tip.
    ( theCandidate :: ChainSuffix header,
      candidatesAndPeers' :: [(ChainSuffix header, PeerInfo header peer extra)]
      ) <-
      WithDeclined
  (PeerInfo header peer extra)
  (Maybe
     (ChainSuffix header,
      [(ChainSuffix header, PeerInfo header peer extra)]))
-> MaybeT
     (WithDeclined (PeerInfo header peer extra))
     (ChainSuffix header,
      [(ChainSuffix header, PeerInfo header peer extra)])
forall (m :: * -> *) a. m (Maybe a) -> MaybeT m a
MaybeT (WithDeclined
   (PeerInfo header peer extra)
   (Maybe
      (ChainSuffix header,
       [(ChainSuffix header, PeerInfo header peer extra)]))
 -> MaybeT
      (WithDeclined (PeerInfo header peer extra))
      (ChainSuffix header,
       [(ChainSuffix header, PeerInfo header peer extra)]))
-> WithDeclined
     (PeerInfo header peer extra)
     (Maybe
        (ChainSuffix header,
         [(ChainSuffix header, PeerInfo header peer extra)]))
-> MaybeT
     (WithDeclined (PeerInfo header peer extra))
     (ChainSuffix header,
      [(ChainSuffix header, PeerInfo header peer extra)])
forall a b. (a -> b) -> a -> b
$
        FetchDecisionPolicy header
-> AnchoredFragment header
-> [(AnchoredFragment header, PeerInfo header peer extra)]
-> WithDeclined
     (PeerInfo header peer extra)
     (Maybe
        (ChainSuffix header,
         [(ChainSuffix header, PeerInfo header peer extra)]))
forall header peerInfo.
HasHeader header =>
FetchDecisionPolicy header
-> AnchoredFragment header
-> [(AnchoredFragment header, peerInfo)]
-> WithDeclined
     peerInfo
     (Maybe (ChainSuffix header, [(ChainSuffix header, peerInfo)]))
selectTheCandidate
          FetchDecisionPolicy header
fetchDecisionPolicy
          AnchoredFragment header
currentChain
          [(AnchoredFragment header, PeerInfo header peer extra)]
candidatesAndPeers

    -- Step 2: Filter out from the chosen candidate fragment the blocks that
    -- have already been downloaded. NOTE: if not declined, @theFragments@ is
    -- guaranteed to be non-empty.
    theFragments :: CandidateFragments header
      <- MaybeT $ dropAlreadyFetchedBlocks candidatesAndPeers' theCandidate

    -- Step 3: Select the peer to sync from. This eliminates peers that cannot
    -- serve a reasonable batch of the candidate, then chooses the peer to sync
    -- from, then again declines the others.
    ( thePeerCandidate :: ChainSuffix header,
      thePeer :: PeerInfo header peer extra
      ) <-
      MaybeT $ selectThePeer theFragments candidatesAndPeers'

    -- Step 4: Fetch the candidate from the selected peer, potentially declining
    -- it (eg. if the peer is already too busy).
    MaybeT $
      makeFetchRequest
        fetchDecisionPolicy
        theFragments
        thePeer
        thePeerCandidate
    where
      combineWithDeclined
        :: forall peerInfo a.
           MaybeT (WithDeclined peerInfo) (a, peerInfo)
        -> ( Maybe (a, peerInfo),
             [(FetchDecline, peerInfo)]
           )
      combineWithDeclined :: forall peerInfo a.
MaybeT (WithDeclined peerInfo) (a, peerInfo)
-> (Maybe (a, peerInfo), [(FetchDecline, peerInfo)])
combineWithDeclined = (DList (FetchDecline, peerInfo) -> [(FetchDecline, peerInfo)])
-> (Maybe (a, peerInfo), DList (FetchDecline, peerInfo))
-> (Maybe (a, peerInfo), [(FetchDecline, peerInfo)])
forall b c a. (b -> c) -> (a, b) -> (a, c)
forall (p :: * -> * -> *) b c a.
Bifunctor p =>
(b -> c) -> p a b -> p a c
second DList (FetchDecline, peerInfo) -> [(FetchDecline, peerInfo)]
forall a. DList a -> [a]
DList.toList ((Maybe (a, peerInfo), DList (FetchDecline, peerInfo))
 -> (Maybe (a, peerInfo), [(FetchDecline, peerInfo)]))
-> (MaybeT (WithDeclined peerInfo) (a, peerInfo)
    -> (Maybe (a, peerInfo), DList (FetchDecline, peerInfo)))
-> MaybeT (WithDeclined peerInfo) (a, peerInfo)
-> (Maybe (a, peerInfo), [(FetchDecline, peerInfo)])
forall b c a. (b -> c) -> (a -> b) -> a -> c
. WithDeclined peerInfo (Maybe (a, peerInfo))
-> (Maybe (a, peerInfo), DList (FetchDecline, peerInfo))
forall peer a.
WithDeclined peer a -> (a, DList (FetchDecline, peer))
runWithDeclined (WithDeclined peerInfo (Maybe (a, peerInfo))
 -> (Maybe (a, peerInfo), DList (FetchDecline, peerInfo)))
-> (MaybeT (WithDeclined peerInfo) (a, peerInfo)
    -> WithDeclined peerInfo (Maybe (a, peerInfo)))
-> MaybeT (WithDeclined peerInfo) (a, peerInfo)
-> (Maybe (a, peerInfo), DList (FetchDecline, peerInfo))
forall b c a. (b -> c) -> (a -> b) -> a -> c
. MaybeT (WithDeclined peerInfo) (a, peerInfo)
-> WithDeclined peerInfo (Maybe (a, peerInfo))
forall (m :: * -> *) a. MaybeT m a -> m (Maybe a)
runMaybeT

      dropAlreadyFetchedBlocks
        :: forall peerInfo.
           [(ChainSuffix header, peerInfo)]
        -> ChainSuffix header
        -> WithDeclined peerInfo (Maybe (CandidateFragments header))
      dropAlreadyFetchedBlocks :: forall peerInfo.
[(ChainSuffix header, peerInfo)]
-> ChainSuffix header
-> WithDeclined peerInfo (Maybe (CandidateFragments header))
dropAlreadyFetchedBlocks [(ChainSuffix header, peerInfo)]
candidatesAndPeers' ChainSuffix header
theCandidate =
        case (Point block -> Bool)
-> MaxSlotNo
-> ChainSuffix header
-> FetchDecision (CandidateFragments header)
forall header block.
(HasHeader header, HeaderHash header ~ HeaderHash block) =>
(Point block -> Bool)
-> MaxSlotNo
-> ChainSuffix header
-> FetchDecision (CandidateFragments header)
dropAlreadyFetched Point block -> Bool
fetchedBlocks MaxSlotNo
fetchedMaxSlotNo ChainSuffix header
theCandidate of
          Left FetchDecline
reason -> do
            DList (FetchDecline, peerInfo)
-> WriterT (DList (FetchDecline, peerInfo)) Identity ()
forall w (m :: * -> *). (Monoid w, Monad m) => w -> WriterT w m ()
tell ([(FetchDecline, peerInfo)] -> DList (FetchDecline, peerInfo)
forall a. [a] -> DList a
DList.fromList [(FetchDecline
reason, peerInfo
peerInfo) | (ChainSuffix header
_, peerInfo
peerInfo) <- [(ChainSuffix header, peerInfo)]
candidatesAndPeers'])
            Maybe (CandidateFragments header)
-> WithDeclined peerInfo (Maybe (CandidateFragments header))
forall a. a -> WriterT (DList (FetchDecline, peerInfo)) Identity a
forall (f :: * -> *) a. Applicative f => a -> f a
pure Maybe (CandidateFragments header)
forall a. Maybe a
Nothing
          Right CandidateFragments header
theFragments -> Maybe (CandidateFragments header)
-> WithDeclined peerInfo (Maybe (CandidateFragments header))
forall a. a -> WriterT (DList (FetchDecline, peerInfo)) Identity a
forall (f :: * -> *) a. Applicative f => a -> f a
pure (CandidateFragments header -> Maybe (CandidateFragments header)
forall a. a -> Maybe a
Just CandidateFragments header
theFragments)

-- | Find the fragments of the chain suffix that we still need to fetch because
-- they are covering blocks that have not yet been fetched.
--
-- Typically this is a single fragment forming a suffix of the chain, but in
-- the general case we can get a bunch of discontiguous chain fragments.
--
-- See also 'dropAlreadyInFlightWithPeer'.
-- Similar to 'filterNotAlreadyFetched'.
dropAlreadyFetched
  :: (HasHeader header, HeaderHash header ~ HeaderHash block)
  => (Point block -> Bool)
     -- ^ Whether the block has been fetched (only if recent, i.e. within @k@).
  -> MaxSlotNo
  -> ChainSuffix header
  -> FetchDecision (CandidateFragments header)
dropAlreadyFetched :: forall header block.
(HasHeader header, HeaderHash header ~ HeaderHash block) =>
(Point block -> Bool)
-> MaxSlotNo
-> ChainSuffix header
-> FetchDecision (CandidateFragments header)
dropAlreadyFetched Point block -> Bool
alreadyDownloaded MaxSlotNo
fetchedMaxSlotNo ChainSuffix header
candidate =
  if [AnchoredFragment header] -> Bool
forall a. [a] -> Bool
forall (t :: * -> *) a. Foldable t => t a -> Bool
null [AnchoredFragment header]
fragments
    then FetchDecline -> Either FetchDecline (CandidateFragments header)
forall a b. a -> Either a b
Left FetchDecline
FetchDeclineAlreadyFetched
    else CandidateFragments header
-> Either FetchDecline (CandidateFragments header)
forall a b. b -> Either a b
Right (ChainSuffix header
candidate, [AnchoredFragment header]
fragments)
  where
    fragments :: [AnchoredFragment header]
fragments = (header -> Bool)
-> MaxSlotNo
-> AnchoredFragment header
-> [AnchoredFragment header]
forall header.
HasHeader header =>
(header -> Bool)
-> MaxSlotNo
-> AnchoredFragment header
-> [AnchoredFragment header]
filterWithMaxSlotNo header -> Bool
notAlreadyFetched MaxSlotNo
fetchedMaxSlotNo (ChainSuffix header -> AnchoredFragment header
forall header. ChainSuffix header -> AnchoredFragment header
getChainSuffix ChainSuffix header
candidate)
    notAlreadyFetched :: header -> Bool
notAlreadyFetched = Bool -> Bool
not (Bool -> Bool) -> (header -> Bool) -> header -> Bool
forall b c a. (b -> c) -> (a -> b) -> a -> c
. Point block -> Bool
alreadyDownloaded (Point block -> Bool) -> (header -> Point block) -> header -> Bool
forall b c a. (b -> c) -> (a -> b) -> a -> c
. Point header -> Point block
forall {k1} {k2} (b :: k1) (b' :: k2).
Coercible (HeaderHash b) (HeaderHash b') =>
Point b -> Point b'
castPoint (Point header -> Point block)
-> (header -> Point header) -> header -> Point block
forall b c a. (b -> c) -> (a -> b) -> a -> c
. header -> Point header
forall block. HasHeader block => block -> Point block
blockPoint

-- | Given a list of candidate fragments and their associated peers, select the
-- candidate to sync from. Return this fragment, the list of peers that are
-- still in race to serve it, and the list of peers that are already being
-- declined.
selectTheCandidate
  :: forall header peerInfo.
     HasHeader header
  => FetchDecisionPolicy header
  -> AnchoredFragment header
     -- ^ The current chain.
  -> [(AnchoredFragment header, peerInfo)]
     -- ^ The candidate fragments and their associated peers.
  -> WithDeclined
       peerInfo
       (Maybe (ChainSuffix header, [(ChainSuffix header, peerInfo)]))
     -- ^ The pair of: (a) a list of peers that we have decided are not right,
     -- eg. because they presented us with a chain forking too deep, and (b) the
     -- selected candidate that we choose to sync from and a list of peers that
     -- are still in the race to serve that candidate.
selectTheCandidate :: forall header peerInfo.
HasHeader header =>
FetchDecisionPolicy header
-> AnchoredFragment header
-> [(AnchoredFragment header, peerInfo)]
-> WithDeclined
     peerInfo
     (Maybe (ChainSuffix header, [(ChainSuffix header, peerInfo)]))
selectTheCandidate
  FetchDecisionPolicy {HasCallStack =>
AnchoredFragment header -> AnchoredFragment header -> Ordering
compareCandidateChains :: HasCallStack =>
AnchoredFragment header -> AnchoredFragment header -> Ordering
compareCandidateChains :: forall header.
FetchDecisionPolicy header
-> HasCallStack =>
   AnchoredFragment header -> AnchoredFragment header -> Ordering
compareCandidateChains, HasCallStack =>
AnchoredFragment header -> AnchoredFragment header -> Bool
plausibleCandidateChain :: HasCallStack =>
AnchoredFragment header -> AnchoredFragment header -> Bool
plausibleCandidateChain :: forall header.
FetchDecisionPolicy header
-> HasCallStack =>
   AnchoredFragment header -> AnchoredFragment header -> Bool
plausibleCandidateChain}
  AnchoredFragment header
currentChain =
        [(FetchDecision (ChainSuffix header), peerInfo)]
-> WithDeclined
     peerInfo
     (Maybe (ChainSuffix header, [(ChainSuffix header, peerInfo)]))
separateDeclinedAndStillInRace
        -- Select the suffix up to the intersection with the current chain. This can
        -- eliminate candidates that fork too deep.
      ([(FetchDecision (ChainSuffix header), peerInfo)]
 -> WithDeclined
      peerInfo
      (Maybe (ChainSuffix header, [(ChainSuffix header, peerInfo)])))
-> ([(AnchoredFragment header, peerInfo)]
    -> [(FetchDecision (ChainSuffix header), peerInfo)])
-> [(AnchoredFragment header, peerInfo)]
-> WithDeclined
     peerInfo
     (Maybe (ChainSuffix header, [(ChainSuffix header, peerInfo)]))
forall b c a. (b -> c) -> (a -> b) -> a -> c
. AnchoredFragment header
-> [(FetchDecision (AnchoredFragment header), peerInfo)]
-> [(FetchDecision (ChainSuffix header), peerInfo)]
forall header block peerinfo.
(HasHeader header, HasHeader block,
 HeaderHash header ~ HeaderHash block) =>
AnchoredFragment block
-> [(FetchDecision (AnchoredFragment header), peerinfo)]
-> [(FetchDecision (ChainSuffix header), peerinfo)]
selectForkSuffixes AnchoredFragment header
currentChain
        -- Filter to keep chains the consensus layer tells us are plausible.
      ([(FetchDecision (AnchoredFragment header), peerInfo)]
 -> [(FetchDecision (ChainSuffix header), peerInfo)])
-> ([(AnchoredFragment header, peerInfo)]
    -> [(FetchDecision (AnchoredFragment header), peerInfo)])
-> [(AnchoredFragment header, peerInfo)]
-> [(FetchDecision (ChainSuffix header), peerInfo)]
forall b c a. (b -> c) -> (a -> b) -> a -> c
. (AnchoredFragment header -> AnchoredFragment header -> Bool)
-> AnchoredFragment header
-> [(AnchoredFragment header, peerInfo)]
-> [(FetchDecision (AnchoredFragment header), peerInfo)]
forall block header peerinfo.
(AnchoredFragment block -> AnchoredFragment header -> Bool)
-> AnchoredFragment block
-> [(AnchoredFragment header, peerinfo)]
-> [(FetchDecision (AnchoredFragment header), peerinfo)]
filterPlausibleCandidates HasCallStack =>
AnchoredFragment header -> AnchoredFragment header -> Bool
AnchoredFragment header -> AnchoredFragment header -> Bool
plausibleCandidateChain AnchoredFragment header
currentChain
    where
      -- Write all of the declined peers, and find the longest candidate
      -- fragment if there is any.
      separateDeclinedAndStillInRace
        :: [(FetchDecision (ChainSuffix header), peerInfo)]
        -> WithDeclined peerInfo (Maybe (ChainSuffix header, [(ChainSuffix header, peerInfo)]))
      separateDeclinedAndStillInRace :: [(FetchDecision (ChainSuffix header), peerInfo)]
-> WithDeclined
     peerInfo
     (Maybe (ChainSuffix header, [(ChainSuffix header, peerInfo)]))
separateDeclinedAndStillInRace [(FetchDecision (ChainSuffix header), peerInfo)]
decisions = do
        let ([(FetchDecline, peerInfo)]
declined, [(ChainSuffix header, peerInfo)]
inRace) = [Either (FetchDecline, peerInfo) (ChainSuffix header, peerInfo)]
-> ([(FetchDecline, peerInfo)], [(ChainSuffix header, peerInfo)])
forall a b. [Either a b] -> ([a], [b])
partitionEithers
              [ (FetchDecline -> (FetchDecline, peerInfo))
-> (ChainSuffix header -> (ChainSuffix header, peerInfo))
-> FetchDecision (ChainSuffix header)
-> Either (FetchDecline, peerInfo) (ChainSuffix header, peerInfo)
forall a b c d. (a -> b) -> (c -> d) -> Either a c -> Either b d
forall (p :: * -> * -> *) a b c d.
Bifunctor p =>
(a -> b) -> (c -> d) -> p a c -> p b d
bimap (,peerInfo
p) (,peerInfo
p) FetchDecision (ChainSuffix header)
d | (FetchDecision (ChainSuffix header)
d, peerInfo
p) <- [(FetchDecision (ChainSuffix header), peerInfo)]
decisions ]
        DList (FetchDecline, peerInfo)
-> WriterT (DList (FetchDecline, peerInfo)) Identity ()
forall w (m :: * -> *). (Monoid w, Monad m) => w -> WriterT w m ()
tell ([(FetchDecline, peerInfo)] -> DList (FetchDecline, peerInfo)
forall a. [a] -> DList a
DList.fromList [(FetchDecline, peerInfo)]
declined)
        case [(ChainSuffix header, peerInfo)]
inRace of
          [] -> Maybe (ChainSuffix header, [(ChainSuffix header, peerInfo)])
-> WithDeclined
     peerInfo
     (Maybe (ChainSuffix header, [(ChainSuffix header, peerInfo)]))
forall a. a -> WriterT (DList (FetchDecline, peerInfo)) Identity a
forall (f :: * -> *) a. Applicative f => a -> f a
pure Maybe (ChainSuffix header, [(ChainSuffix header, peerInfo)])
forall a. Maybe a
Nothing
          (ChainSuffix header, peerInfo)
_ : [(ChainSuffix header, peerInfo)]
_ -> do
            let maxChainOn :: ((ChainSuffix header, peerInfo) -> AnchoredFragment header)
-> (ChainSuffix header, peerInfo)
-> (ChainSuffix header, peerInfo)
-> (ChainSuffix header, peerInfo)
maxChainOn (ChainSuffix header, peerInfo) -> AnchoredFragment header
f (ChainSuffix header, peerInfo)
c0 (ChainSuffix header, peerInfo)
c1 = case HasCallStack =>
AnchoredFragment header -> AnchoredFragment header -> Ordering
AnchoredFragment header -> AnchoredFragment header -> Ordering
compareCandidateChains ((ChainSuffix header, peerInfo) -> AnchoredFragment header
f (ChainSuffix header, peerInfo)
c0) ((ChainSuffix header, peerInfo) -> AnchoredFragment header
f (ChainSuffix header, peerInfo)
c1) of
                  Ordering
LT -> (ChainSuffix header, peerInfo)
c1
                  Ordering
_  -> (ChainSuffix header, peerInfo)
c0
                -- maximumBy yields the last element in case of a tie while we
                -- prefer the first one
                chainSfx :: ChainSuffix header
chainSfx = (ChainSuffix header, peerInfo) -> ChainSuffix header
forall a b. (a, b) -> a
fst ((ChainSuffix header, peerInfo) -> ChainSuffix header)
-> (ChainSuffix header, peerInfo) -> ChainSuffix header
forall a b. (a -> b) -> a -> b
$
                  ((ChainSuffix header, peerInfo)
 -> (ChainSuffix header, peerInfo)
 -> (ChainSuffix header, peerInfo))
-> [(ChainSuffix header, peerInfo)]
-> (ChainSuffix header, peerInfo)
forall a. HasCallStack => (a -> a -> a) -> [a] -> a
List.foldl1' (((ChainSuffix header, peerInfo) -> AnchoredFragment header)
-> (ChainSuffix header, peerInfo)
-> (ChainSuffix header, peerInfo)
-> (ChainSuffix header, peerInfo)
maxChainOn (ChainSuffix header -> AnchoredFragment header
forall header. ChainSuffix header -> AnchoredFragment header
getChainSuffix (ChainSuffix header -> AnchoredFragment header)
-> ((ChainSuffix header, peerInfo) -> ChainSuffix header)
-> (ChainSuffix header, peerInfo)
-> AnchoredFragment header
forall b c a. (b -> c) -> (a -> b) -> a -> c
. (ChainSuffix header, peerInfo) -> ChainSuffix header
forall a b. (a, b) -> a
fst)) [(ChainSuffix header, peerInfo)]
inRace
            Maybe (ChainSuffix header, [(ChainSuffix header, peerInfo)])
-> WithDeclined
     peerInfo
     (Maybe (ChainSuffix header, [(ChainSuffix header, peerInfo)]))
forall a. a -> WriterT (DList (FetchDecline, peerInfo)) Identity a
forall (f :: * -> *) a. Applicative f => a -> f a
pure (Maybe (ChainSuffix header, [(ChainSuffix header, peerInfo)])
 -> WithDeclined
      peerInfo
      (Maybe (ChainSuffix header, [(ChainSuffix header, peerInfo)])))
-> Maybe (ChainSuffix header, [(ChainSuffix header, peerInfo)])
-> WithDeclined
     peerInfo
     (Maybe (ChainSuffix header, [(ChainSuffix header, peerInfo)]))
forall a b. (a -> b) -> a -> b
$ (ChainSuffix header, [(ChainSuffix header, peerInfo)])
-> Maybe (ChainSuffix header, [(ChainSuffix header, peerInfo)])
forall a. a -> Maybe a
Just (ChainSuffix header
chainSfx, [(ChainSuffix header, peerInfo)]
inRace)

-- | Given _the_ candidate fragment to sync from, and a list of peers (with
-- their corresponding candidate fragments), choose which peer to sync _the_
-- candidate fragment from.
--
-- We first filter out all the peers that cannot even serve a reasonable batch
-- of _the_ candidate fragment, and then we choose the first one according to
-- the ordering passed as argument.
--
-- PRECONDITION: The set of peers must be included in the peer order queue.
-- PRECONDITION: The given candidate fragments must not be empty.
selectThePeer
  :: forall header peer extra.
     HasHeader header
  => CandidateFragments header
  -> [(ChainSuffix header, PeerInfo header peer extra)]
     -- ^ The candidate fragment that we have selected to sync from, as suffix
     -- of the immutable tip.
  -> WithDeclined
       (PeerInfo header peer extra)
       (Maybe (ChainSuffix header, PeerInfo header peer extra))
     -- ^ Association list of candidate fragments (as suffixes of the immutable
     -- tip) and their associated peers.
selectThePeer :: forall header peer extra.
HasHeader header =>
CandidateFragments header
-> [(ChainSuffix header, PeerInfo header peer extra)]
-> WithDeclined
     (PeerInfo header peer extra)
     (Maybe (ChainSuffix header, PeerInfo header peer extra))
selectThePeer
  CandidateFragments header
theFragments
  [(ChainSuffix header, PeerInfo header peer extra)]
candidates = do
    -- Create a fetch request for the blocks in question. The request has exactly
    -- 1 block. It will only be used to choose the peer to fetch from, but we will
    -- later craft a more refined request for that peer. See [About the gross
    -- request] in the module documentation. Because @theFragments@ is not
    -- empty, and does not contain empty fragments, @grossRequest@ will not be empty.
    let firstBlock :: [AF.AnchoredSeq (WithOrigin SlotNo) (AF.Anchor header) header]
                   -> [AF.AnchoredSeq (WithOrigin SlotNo) (AF.Anchor header) header]
        firstBlock :: [AnchoredSeq (WithOrigin SlotNo) (Anchor header) header]
-> [AnchoredSeq (WithOrigin SlotNo) (Anchor header) header]
firstBlock = (AnchoredSeq (WithOrigin SlotNo) (Anchor header) header
 -> AnchoredSeq (WithOrigin SlotNo) (Anchor header) header)
-> [AnchoredSeq (WithOrigin SlotNo) (Anchor header) header]
-> [AnchoredSeq (WithOrigin SlotNo) (Anchor header) header]
forall a b. (a -> b) -> [a] -> [b]
map (Int
-> AnchoredSeq (WithOrigin SlotNo) (Anchor header) header
-> AnchoredSeq (WithOrigin SlotNo) (Anchor header) header
forall v a b.
Anchorable v a b =>
Int -> AnchoredSeq v a b -> AnchoredSeq v a b
AF.takeOldest Int
1) ([AnchoredSeq (WithOrigin SlotNo) (Anchor header) header]
 -> [AnchoredSeq (WithOrigin SlotNo) (Anchor header) header])
-> ([AnchoredSeq (WithOrigin SlotNo) (Anchor header) header]
    -> [AnchoredSeq (WithOrigin SlotNo) (Anchor header) header])
-> [AnchoredSeq (WithOrigin SlotNo) (Anchor header) header]
-> [AnchoredSeq (WithOrigin SlotNo) (Anchor header) header]
forall b c a. (b -> c) -> (a -> b) -> a -> c
. Int
-> [AnchoredSeq (WithOrigin SlotNo) (Anchor header) header]
-> [AnchoredSeq (WithOrigin SlotNo) (Anchor header) header]
forall a. Int -> [a] -> [a]
take Int
1 ([AnchoredSeq (WithOrigin SlotNo) (Anchor header) header]
 -> [AnchoredSeq (WithOrigin SlotNo) (Anchor header) header])
-> ([AnchoredSeq (WithOrigin SlotNo) (Anchor header) header]
    -> [AnchoredSeq (WithOrigin SlotNo) (Anchor header) header])
-> [AnchoredSeq (WithOrigin SlotNo) (Anchor header) header]
-> [AnchoredSeq (WithOrigin SlotNo) (Anchor header) header]
forall b c a. (b -> c) -> (a -> b) -> a -> c
. (AnchoredSeq (WithOrigin SlotNo) (Anchor header) header -> Bool)
-> [AnchoredSeq (WithOrigin SlotNo) (Anchor header) header]
-> [AnchoredSeq (WithOrigin SlotNo) (Anchor header) header]
forall a. (a -> Bool) -> [a] -> [a]
filter (Bool -> Bool
not (Bool -> Bool)
-> (AnchoredSeq (WithOrigin SlotNo) (Anchor header) header -> Bool)
-> AnchoredSeq (WithOrigin SlotNo) (Anchor header) header
-> Bool
forall b c a. (b -> c) -> (a -> b) -> a -> c
. AnchoredSeq (WithOrigin SlotNo) (Anchor header) header -> Bool
forall v a b. AnchoredSeq v a b -> Bool
AF.null)

        fetchRequestFragments :: [AF.AnchoredSeq (WithOrigin SlotNo) (AF.Anchor header) header]
        fetchRequestFragments :: [AnchoredSeq (WithOrigin SlotNo) (Anchor header) header]
fetchRequestFragments = [AnchoredSeq (WithOrigin SlotNo) (Anchor header) header]
-> [AnchoredSeq (WithOrigin SlotNo) (Anchor header) header]
firstBlock ([AnchoredSeq (WithOrigin SlotNo) (Anchor header) header]
 -> [AnchoredSeq (WithOrigin SlotNo) (Anchor header) header])
-> [AnchoredSeq (WithOrigin SlotNo) (Anchor header) header]
-> [AnchoredSeq (WithOrigin SlotNo) (Anchor header) header]
forall a b. (a -> b) -> a -> b
$ CandidateFragments header
-> [AnchoredSeq (WithOrigin SlotNo) (Anchor header) header]
forall a b. (a, b) -> b
snd CandidateFragments header
theFragments

        grossRequest :: FetchRequest header
        grossRequest :: FetchRequest header
grossRequest = Bool -> FetchRequest header -> FetchRequest header
forall a. HasCallStack => Bool -> a -> a
assert ((AnchoredSeq (WithOrigin SlotNo) (Anchor header) header -> Bool)
-> [AnchoredSeq (WithOrigin SlotNo) (Anchor header) header] -> Bool
forall (t :: * -> *) a. Foldable t => (a -> Bool) -> t a -> Bool
all (Bool -> Bool
not (Bool -> Bool)
-> (AnchoredSeq (WithOrigin SlotNo) (Anchor header) header -> Bool)
-> AnchoredSeq (WithOrigin SlotNo) (Anchor header) header
-> Bool
forall b c a. (b -> c) -> (a -> b) -> a -> c
. AnchoredSeq (WithOrigin SlotNo) (Anchor header) header -> Bool
forall v a b. AnchoredSeq v a b -> Bool
AF.null) [AnchoredSeq (WithOrigin SlotNo) (Anchor header) header]
fetchRequestFragments)
                     (FetchRequest header -> FetchRequest header)
-> FetchRequest header -> FetchRequest header
forall a b. (a -> b) -> a -> b
$ FetchRequest { [AnchoredSeq (WithOrigin SlotNo) (Anchor header) header]
fetchRequestFragments :: [AnchoredSeq (WithOrigin SlotNo) (Anchor header) header]
fetchRequestFragments :: [AnchoredSeq (WithOrigin SlotNo) (Anchor header) header]
fetchRequestFragments }

    -- Return the first peer that can serve the gross request and decline
    -- the other peers.
    FetchRequest header
-> [(ChainSuffix header, PeerInfo header peer extra)]
-> WriterT
     (DList (FetchDecline, PeerInfo header peer extra))
     Identity
     (Maybe (ChainSuffix header, PeerInfo header peer extra))
go FetchRequest header
grossRequest [(ChainSuffix header, PeerInfo header peer extra)]
candidates
  where
    go :: FetchRequest header
-> [(ChainSuffix header, PeerInfo header peer extra)]
-> WriterT
     (DList (FetchDecline, PeerInfo header peer extra))
     Identity
     (Maybe (ChainSuffix header, PeerInfo header peer extra))
go FetchRequest header
grossRequest (c :: (ChainSuffix header, PeerInfo header peer extra)
c@(ChainSuffix header
candidate, PeerInfo header peer extra
peerInfo) : [(ChainSuffix header, PeerInfo header peer extra)]
xs) = do
      if FetchRequest header
grossRequest FetchRequest header -> ChainSuffix header -> Bool
`requestHeadInCandidate` ChainSuffix header
candidate then do
        DList (FetchDecline, PeerInfo header peer extra)
-> WriterT
     (DList (FetchDecline, PeerInfo header peer extra)) Identity ()
forall w (m :: * -> *). (Monoid w, Monad m) => w -> WriterT w m ()
tell (DList (FetchDecline, PeerInfo header peer extra)
 -> WriterT
      (DList (FetchDecline, PeerInfo header peer extra)) Identity ())
-> DList (FetchDecline, PeerInfo header peer extra)
-> WriterT
     (DList (FetchDecline, PeerInfo header peer extra)) Identity ()
forall a b. (a -> b) -> a -> b
$ [(FetchDecline, PeerInfo header peer extra)]
-> DList (FetchDecline, PeerInfo header peer extra)
forall a. [a] -> DList a
DList.fromList
          [(FetchMode -> Word -> FetchDecline
FetchDeclineConcurrencyLimit FetchMode
FetchModeGenesis Word
1, PeerInfo header peer extra
pInfo)
          | (ChainSuffix header
_, PeerInfo header peer extra
pInfo) <- [(ChainSuffix header, PeerInfo header peer extra)]
xs
          ]
        Maybe (ChainSuffix header, PeerInfo header peer extra)
-> WriterT
     (DList (FetchDecline, PeerInfo header peer extra))
     Identity
     (Maybe (ChainSuffix header, PeerInfo header peer extra))
forall a.
a
-> WriterT
     (DList (FetchDecline, PeerInfo header peer extra)) Identity a
forall (f :: * -> *) a. Applicative f => a -> f a
pure ((ChainSuffix header, PeerInfo header peer extra)
-> Maybe (ChainSuffix header, PeerInfo header peer extra)
forall a. a -> Maybe a
Just (ChainSuffix header, PeerInfo header peer extra)
c)
      else do
        DList (FetchDecline, PeerInfo header peer extra)
-> WriterT
     (DList (FetchDecline, PeerInfo header peer extra)) Identity ()
forall w (m :: * -> *). (Monoid w, Monad m) => w -> WriterT w m ()
tell (DList (FetchDecline, PeerInfo header peer extra)
 -> WriterT
      (DList (FetchDecline, PeerInfo header peer extra)) Identity ())
-> DList (FetchDecline, PeerInfo header peer extra)
-> WriterT
     (DList (FetchDecline, PeerInfo header peer extra)) Identity ()
forall a b. (a -> b) -> a -> b
$ [(FetchDecline, PeerInfo header peer extra)]
-> DList (FetchDecline, PeerInfo header peer extra)
forall a. [a] -> DList a
DList.fromList [(FetchDecline
FetchDeclineAlreadyFetched, PeerInfo header peer extra
peerInfo)]
        FetchRequest header
-> [(ChainSuffix header, PeerInfo header peer extra)]
-> WriterT
     (DList (FetchDecline, PeerInfo header peer extra))
     Identity
     (Maybe (ChainSuffix header, PeerInfo header peer extra))
go FetchRequest header
grossRequest [(ChainSuffix header, PeerInfo header peer extra)]
xs
    go FetchRequest header
_grossRequest [] = Maybe (ChainSuffix header, PeerInfo header peer extra)
-> WriterT
     (DList (FetchDecline, PeerInfo header peer extra))
     Identity
     (Maybe (ChainSuffix header, PeerInfo header peer extra))
forall a.
a
-> WriterT
     (DList (FetchDecline, PeerInfo header peer extra)) Identity a
forall (f :: * -> *) a. Applicative f => a -> f a
pure Maybe (ChainSuffix header, PeerInfo header peer extra)
forall a. Maybe a
Nothing


    requestHeadInCandidate :: FetchRequest header -> ChainSuffix header -> Bool
    requestHeadInCandidate :: FetchRequest header -> ChainSuffix header -> Bool
requestHeadInCandidate FetchRequest header
request ChainSuffix header
candidate =
      case FetchRequest header
-> [AnchoredSeq (WithOrigin SlotNo) (Anchor header) header]
forall header. FetchRequest header -> [AnchoredFragment header]
fetchRequestFragments FetchRequest header
request of
        fragments :: [AnchoredSeq (WithOrigin SlotNo) (Anchor header) header]
fragments@(AnchoredSeq (WithOrigin SlotNo) (Anchor header) header
_:[AnchoredSeq (WithOrigin SlotNo) (Anchor header) header]
_)
          | Point header
-> AnchoredSeq (WithOrigin SlotNo) (Anchor header) header -> Bool
forall block.
HasHeader block =>
Point block -> AnchoredFragment block -> Bool
AF.withinFragmentBounds
              (AnchoredSeq (WithOrigin SlotNo) (Anchor header) header
-> Point header
forall block.
HasHeader block =>
AnchoredFragment block -> Point block
AF.headPoint (AnchoredSeq (WithOrigin SlotNo) (Anchor header) header
 -> Point header)
-> AnchoredSeq (WithOrigin SlotNo) (Anchor header) header
-> Point header
forall a b. (a -> b) -> a -> b
$ [AnchoredSeq (WithOrigin SlotNo) (Anchor header) header]
-> AnchoredSeq (WithOrigin SlotNo) (Anchor header) header
forall a. HasCallStack => [a] -> a
last [AnchoredSeq (WithOrigin SlotNo) (Anchor header) header]
fragments)
              (ChainSuffix header
-> AnchoredSeq (WithOrigin SlotNo) (Anchor header) header
forall header. ChainSuffix header -> AnchoredFragment header
getChainSuffix ChainSuffix header
candidate)
          ->
            Bool
True
        [AnchoredSeq (WithOrigin SlotNo) (Anchor header) header]
_ ->
            Bool
False

-- | Given a candidate and a peer to sync from, create a request for that
-- specific peer. We might take the 'FetchDecision' to decline the request, but
-- only for “good” reasons, eg. if the peer is already too busy.
makeFetchRequest
  :: HasHeader header
  => FetchDecisionPolicy header
  -> CandidateFragments header
     -- ^ The candidate fragment that we have selected to sync from, as suffix of
     -- the immutable tip.
  -> PeerInfo header peer extra
     -- ^ The peer that we have selected to sync from.
  -> ChainSuffix header
     -- ^ Its candidate fragment as suffix of the immutable tip.
  -> WithDeclined
       (PeerInfo header peer extra)
       (Maybe (FetchRequest header, PeerInfo header peer extra))
makeFetchRequest :: forall header peer extra.
HasHeader header =>
FetchDecisionPolicy header
-> CandidateFragments header
-> PeerInfo header peer extra
-> ChainSuffix header
-> WithDeclined
     (PeerInfo header peer extra)
     (Maybe (FetchRequest header, PeerInfo header peer extra))
makeFetchRequest
  FetchDecisionPolicy header
fetchDecisionPolicy
  CandidateFragments header
theFragments
  thePeer :: PeerInfo header peer extra
thePeer@(PeerFetchStatus header
status, PeerFetchInFlight header
inflight, PeerGSV
gsvs, peer
_, extra
_)
  ChainSuffix header
thePeerCandidate =
    let theDecision :: Either FetchDecline (FetchRequest header)
theDecision = do
          -- Drop blocks that are already in-flight with this peer.
          fragments <- PeerFetchInFlight header
-> CandidateFragments header
-> FetchDecision (CandidateFragments header)
forall header.
HasHeader header =>
PeerFetchInFlight header
-> CandidateFragments header
-> FetchDecision (CandidateFragments header)
dropAlreadyInFlightWithPeer PeerFetchInFlight header
inflight CandidateFragments header
theFragments

          -- Trim the fragments to the peer's candidate, keeping only blocks that
          -- they may actually serve.
          trimmedFragments <- snd fragments `trimFragmentsToCandidate` thePeerCandidate

          -- Try to create a request for those fragments.
          fetchRequestDecision
            fetchDecisionPolicy
            FetchModeBulkSync
            0 -- bypass all concurrency limits.
            (calculatePeerFetchInFlightLimits gsvs)
            inflight
            status
            (Right trimmedFragments)
     in case Either FetchDecline (FetchRequest header)
theDecision of
          Left FetchDecline
reason -> DList (FetchDecline, PeerInfo header peer extra)
-> WriterT
     (DList (FetchDecline, PeerInfo header peer extra)) Identity ()
forall w (m :: * -> *). (Monoid w, Monad m) => w -> WriterT w m ()
tell ([(FetchDecline, PeerInfo header peer extra)]
-> DList (FetchDecline, PeerInfo header peer extra)
forall a. [a] -> DList a
DList.fromList [(FetchDecline
reason, PeerInfo header peer extra
thePeer)]) WriterT
  (DList (FetchDecline, PeerInfo header peer extra)) Identity ()
-> WithDeclined
     (PeerInfo header peer extra)
     (Maybe (FetchRequest header, PeerInfo header peer extra))
-> WithDeclined
     (PeerInfo header peer extra)
     (Maybe (FetchRequest header, PeerInfo header peer extra))
forall a b.
WriterT
  (DList (FetchDecline, PeerInfo header peer extra)) Identity a
-> WriterT
     (DList (FetchDecline, PeerInfo header peer extra)) Identity b
-> WriterT
     (DList (FetchDecline, PeerInfo header peer extra)) Identity b
forall (m :: * -> *) a b. Monad m => m a -> m b -> m b
>> Maybe (FetchRequest header, PeerInfo header peer extra)
-> WithDeclined
     (PeerInfo header peer extra)
     (Maybe (FetchRequest header, PeerInfo header peer extra))
forall a.
a
-> WriterT
     (DList (FetchDecline, PeerInfo header peer extra)) Identity a
forall (f :: * -> *) a. Applicative f => a -> f a
pure Maybe (FetchRequest header, PeerInfo header peer extra)
forall a. Maybe a
Nothing
          Right FetchRequest header
theRequest -> Maybe (FetchRequest header, PeerInfo header peer extra)
-> WithDeclined
     (PeerInfo header peer extra)
     (Maybe (FetchRequest header, PeerInfo header peer extra))
forall a.
a
-> WriterT
     (DList (FetchDecline, PeerInfo header peer extra)) Identity a
forall (f :: * -> *) a. Applicative f => a -> f a
pure (Maybe (FetchRequest header, PeerInfo header peer extra)
 -> WithDeclined
      (PeerInfo header peer extra)
      (Maybe (FetchRequest header, PeerInfo header peer extra)))
-> Maybe (FetchRequest header, PeerInfo header peer extra)
-> WithDeclined
     (PeerInfo header peer extra)
     (Maybe (FetchRequest header, PeerInfo header peer extra))
forall a b. (a -> b) -> a -> b
$ (FetchRequest header, PeerInfo header peer extra)
-> Maybe (FetchRequest header, PeerInfo header peer extra)
forall a. a -> Maybe a
Just (FetchRequest header
theRequest, PeerInfo header peer extra
thePeer)
    where
      trimFragmentsToCandidate :: [AnchoredFragment b]
-> ChainSuffix block1 -> Either FetchDecline [AnchoredFragment b]
trimFragmentsToCandidate [AnchoredFragment b]
fragments ChainSuffix block1
candidate =
        let trimmedFragments :: [AnchoredFragment b]
trimmedFragments =
              [ AnchoredFragment b
prefix
              | AnchoredFragment b
fragment <- [AnchoredFragment b]
fragments
              , Just (AnchoredFragment block1
_, AnchoredFragment b
prefix, AnchoredFragment block1
_, AnchoredFragment b
_) <- [AnchoredFragment block1
-> AnchoredFragment b
-> Maybe
     (AnchoredFragment block1, AnchoredFragment b,
      AnchoredFragment block1, AnchoredFragment b)
forall block1 block2.
(HasHeader block1, HasHeader block2,
 HeaderHash block1 ~ HeaderHash block2) =>
AnchoredFragment block1
-> AnchoredFragment block2
-> Maybe
     (AnchoredFragment block1, AnchoredFragment block2,
      AnchoredFragment block1, AnchoredFragment block2)
AF.intersect (ChainSuffix block1 -> AnchoredFragment block1
forall header. ChainSuffix header -> AnchoredFragment header
getChainSuffix ChainSuffix block1
candidate) AnchoredFragment b
fragment]
              , Bool -> Bool
not (AnchoredFragment b -> Bool
forall v a b. AnchoredSeq v a b -> Bool
AF.null AnchoredFragment b
prefix)
              ]
         in if [AnchoredFragment b] -> Bool
forall a. [a] -> Bool
forall (t :: * -> *) a. Foldable t => t a -> Bool
null [AnchoredFragment b]
trimmedFragments
              then FetchDecline -> Either FetchDecline [AnchoredFragment b]
forall a b. a -> Either a b
Left FetchDecline
FetchDeclineAlreadyFetched
              else [AnchoredFragment b] -> Either FetchDecline [AnchoredFragment b]
forall a b. b -> Either a b
Right [AnchoredFragment b]
trimmedFragments

-- | Find the fragments of the chain suffix that we still need to fetch because
-- they are covering blocks that are not currently in the process of being
-- fetched from this peer.
--
-- Typically this is a single fragment forming a suffix of the chain, but in
-- the general case we can get a bunch of discontiguous chain fragments.
--
-- See also 'dropAlreadyFetched'
-- Similar to 'filterNotAlreadyInFlightWithPeer'.
dropAlreadyInFlightWithPeer ::
  (HasHeader header) =>
  PeerFetchInFlight header ->
  CandidateFragments header ->
  FetchDecision (CandidateFragments header)
dropAlreadyInFlightWithPeer :: forall header.
HasHeader header =>
PeerFetchInFlight header
-> CandidateFragments header
-> FetchDecision (CandidateFragments header)
dropAlreadyInFlightWithPeer PeerFetchInFlight header
inflight (ChainSuffix header
candidate, [AnchoredFragment header]
chainfragments) =
  if [AnchoredFragment header] -> Bool
forall a. [a] -> Bool
forall (t :: * -> *) a. Foldable t => t a -> Bool
null [AnchoredFragment header]
fragments
    then FetchDecline
-> Either
     FetchDecline (ChainSuffix header, [AnchoredFragment header])
forall a b. a -> Either a b
Left FetchDecline
FetchDeclineInFlightThisPeer
    else (ChainSuffix header, [AnchoredFragment header])
-> Either
     FetchDecline (ChainSuffix header, [AnchoredFragment header])
forall a b. b -> Either a b
Right (ChainSuffix header
candidate, [AnchoredFragment header]
fragments)
  where
    fragments :: [AnchoredFragment header]
fragments = (AnchoredFragment header -> [AnchoredFragment header])
-> [AnchoredFragment header] -> [AnchoredFragment header]
forall (t :: * -> *) a b. Foldable t => (a -> [b]) -> t a -> [b]
concatMap ((header -> Bool)
-> MaxSlotNo
-> AnchoredFragment header
-> [AnchoredFragment header]
forall header.
HasHeader header =>
(header -> Bool)
-> MaxSlotNo
-> AnchoredFragment header
-> [AnchoredFragment header]
filterWithMaxSlotNo header -> Bool
notAlreadyInFlight (PeerFetchInFlight header -> MaxSlotNo
forall header. PeerFetchInFlight header -> MaxSlotNo
peerFetchMaxSlotNo PeerFetchInFlight header
inflight)) [AnchoredFragment header]
chainfragments
    notAlreadyInFlight :: header -> Bool
notAlreadyInFlight header
b = header -> Point header
forall block. HasHeader block => block -> Point block
blockPoint header
b Point header -> Set (Point header) -> Bool
forall a. Ord a => a -> Set a -> Bool
`Set.notMember` PeerFetchInFlight header -> Set (Point header)
forall header. PeerFetchInFlight header -> Set (Point header)
peerFetchBlocksInFlight PeerFetchInFlight header
inflight