{-# LANGUAGE BangPatterns        #-}
{-# LANGUAGE NamedFieldPuns      #-}
{-# LANGUAGE RankNTypes          #-}
{-# LANGUAGE ScopedTypeVariables #-}
{-# LANGUAGE TypeFamilies        #-}
{-# LANGUAGE TypeOperators       #-}

module Ouroboros.Network.BlockFetch.Decision
  ( -- * Deciding what to fetch
    fetchDecisions
  , FetchDecisionPolicy (..)
  , FetchMode (..)
  , PeerInfo
  , FetchDecision
  , FetchDecline (..)
    -- ** Components of the decision-making process
  , filterPlausibleCandidates
  , selectForkSuffixes
  , filterNotAlreadyFetched
  , filterNotAlreadyInFlightWithPeer
  , prioritisePeerChains
  , filterNotAlreadyInFlightWithOtherPeers
  , fetchRequestDecisions
  ) where

import Data.Set qualified as Set

import Data.Function (on)
import Data.Hashable
import Data.List as List (foldl', groupBy, sortBy, transpose)
import Data.Maybe (mapMaybe)
import Data.Set (Set)
import GHC.Stack (HasCallStack)

import Control.Exception (assert)
import Control.Monad (guard)
import Control.Monad.Class.MonadTime.SI (DiffTime)

import Ouroboros.Network.AnchoredFragment (AnchoredFragment, AnchoredSeq (..))
import Ouroboros.Network.AnchoredFragment qualified as AF
import Ouroboros.Network.Block
import Ouroboros.Network.Point (withOriginToMaybe)

import Ouroboros.Network.BlockFetch.ClientState (FetchRequest (..),
           PeerFetchInFlight (..), PeerFetchStatus (..))
import Ouroboros.Network.BlockFetch.ConsensusInterface (FetchMode (..))
import Ouroboros.Network.BlockFetch.DeltaQ (PeerFetchInFlightLimits (..),
           PeerGSV (..), SizeInBytes, calculatePeerFetchInFlightLimits,
           comparePeerGSV, comparePeerGSV', estimateExpectedResponseDuration,
           estimateResponseDeadlineProbability)


data FetchDecisionPolicy header = FetchDecisionPolicy {
       forall header. FetchDecisionPolicy header -> Word
maxInFlightReqsPerPeer  :: Word,  -- A protocol constant.

       forall header. FetchDecisionPolicy header -> Word
maxConcurrencyBulkSync  :: Word,
       forall header. FetchDecisionPolicy header -> Word
maxConcurrencyDeadline  :: Word,
       forall header. FetchDecisionPolicy header -> DiffTime
decisionLoopInterval    :: DiffTime,
       forall header. FetchDecisionPolicy header -> Int
peerSalt                :: Int,

       forall header.
FetchDecisionPolicy header
-> HasCallStack =>
   AnchoredFragment header -> AnchoredFragment header -> Bool
plausibleCandidateChain :: HasCallStack
                               => AnchoredFragment header
                               -> AnchoredFragment header -> Bool,

       forall header.
FetchDecisionPolicy header
-> HasCallStack =>
   AnchoredFragment header -> AnchoredFragment header -> Ordering
compareCandidateChains  :: HasCallStack
                               => AnchoredFragment header
                               -> AnchoredFragment header
                               -> Ordering,

       forall header. FetchDecisionPolicy header -> header -> SizeInBytes
blockFetchSize          :: header -> SizeInBytes
     }


type PeerInfo header peer extra =
       ( PeerFetchStatus header,
         PeerFetchInFlight header,
         PeerGSV,
         peer,
         extra
       )

-- | Throughout the decision making process we accumulate reasons to decline
-- to fetch any blocks. This type is used to wrap intermediate and final
-- results.
--
type FetchDecision result = Either FetchDecline result

-- | All the various reasons we can decide not to fetch blocks from a peer.
--
-- It is worth highlighting which of these reasons result from competition
-- among upstream peers.
--
-- * 'FetchDeclineInFlightOtherPeer': decline this peer because all the
--   unfetched blocks of its candidate chain have already been requested from
--   other peers. This reason reflects the least-consequential competition
--   among peers: the competition that determines merely which upstream peer to
--   burden with the request (eg the one with the best
--   'Ouroboros.Network.BlockFetch.DeltaQ.DeltaQ' metrics). The consequences
--   are relatively minor because the unfetched blocks on this peer's candidate
--   chain will be requested regardless; it's merely a question of "From who?".
--   (One exception: if an adversarial peer wins this competition such that the
--   blocks are only requested from them, then it may be possible that this
--   decision determines whether the blocks are ever /received/. But that
--   depends on details of timeouts, a longer competing chain being soon
--   received within those timeouts, and so on.)
--
-- * 'FetchDeclineChainNotPlausible': decline this peer because the node has
--   already fetched, validated, and selected a chain better than its candidate
--   chain from other peers (or from the node's own block forge). Because the
--   node's current selection is influenced by what blocks other peers have
--   recently served (or it recently minted), this reason reflects that peers
--   /indirectly/ compete by serving as long of a chain as possible and as
--   promptly as possible. When the tips of the peers' selections are all
--   within their respective forecast horizons (see
--   'Ouroboros.Consensus.Ledger.SupportsProtocol.ledgerViewForecastAt'), then
--   the length of their candidate chains will typically be the length of their
--   selections, since the ChainSync is free to race ahead (in contrast, the
--   BlockFetch pipeline depth is bounded such that it will, for a syncing
--   node, not be able to request all blocks between the selection and the end
--   of the forecast window). But if one or more of their tips is beyond the
--   horizon, then the relative length of the candidate chains is more
--   complicated, influenced by both the relative density of the chains'
--   suffixes and the relative age of the chains' intersection with the node's
--   selection (since each peer's forecast horizon is a fixed number of slots
--   after the candidate's successor of that intersection).
--
-- * 'FetchDeclineConcurrencyLimit': decline this peer while the node has
--   already fully allocated the artificially scarce 'maxConcurrentFetchPeers'
--   resource amongst its other peers. This reason reflects the
--   least-fundamental competition: it's the only way a node would decline a
--   candidate chain C that it would immediately switch to if C had somehow
--   already been fetched (and any better current candidates hadn't). It is
--   possible that this peer's candidate fragment is better than the candidate
--   fragments of other peers, but that should only happen ephemerally (eg for
--   a brief while immediately after first connecting to this peer).
--
-- * 'FetchDeclineChainIntersectionTooDeep': decline this peer because the node's
--   selection has more than @K@ blocks that are not on this peer's candidate
--   chain. Typically, this reason occurs after the node has been declined---ie
--   lost the above competitions---for a long enough duration. This decision
--   only arises if the BlockFetch decision logic wins a harmless race against
--   the ChainSync client once the node's selection gets longer, since
--   'Ouroboros.Consensus.MiniProtocol.ChainSync.Client.ForkTooDeep'
--   disconnects from such a peer.
--
data FetchDecline =
     -- | This peer's candidate chain is not longer than our chain. For more
     -- details see
     -- 'Ouroboros.Consensus.MiniProtocol.BlockFetch.ClientInterface.mkBlockFetchConsensusInterface'
     -- which implements 'plausibleCandidateChain'.
     --
     FetchDeclineChainNotPlausible

     -- | Switching to this peer's candidate chain would require rolling back
     -- more than @K@ blocks.
     --
   | FetchDeclineChainIntersectionTooDeep

     -- | Every block on this peer's candidate chain has already been fetched.
     --
   | FetchDeclineAlreadyFetched

     -- | This peer's candidate chain has already been requested from this
     -- peer.
     --
   | FetchDeclineInFlightThisPeer

     -- | Some blocks on this peer's candidate chain have not yet been fetched,
     -- but all of those have already been requested from other peers.
     --
   | FetchDeclineInFlightOtherPeer

     -- | This peer's BlockFetch client is shutting down, see
     -- 'PeerFetchStatusShutdown'.
     --
   | FetchDeclinePeerShutdown

     -- | Blockfetch is starting up and waiting on corresponding Chainsync.
   | FetchDeclinePeerStarting


   -- The reasons above this comment are fundamental and/or obvious. On the
   -- other hand, the reasons below are heuristic.


     -- | This peer is in a potentially-temporary state in which it has not
     -- responded to us within a certain expected time limit, see
     -- 'PeerFetchStatusAberrant'.
     --
   | FetchDeclinePeerSlow

     -- | This peer is not under the 'maxInFlightReqsPerPeer' limit.
     --
     -- The argument is the 'maxInFlightReqsPerPeer' constant.
     --
   | FetchDeclineReqsInFlightLimit  !Word

     -- | This peer is not under the 'inFlightBytesHighWatermark' bytes limit.
     --
     -- The arguments are:
     --
     -- * number of bytes currently in flight for that peer
     -- * the configured 'inFlightBytesLowWatermark' constant
     -- * the configured 'inFlightBytesHighWatermark' constant
     --
   | FetchDeclineBytesInFlightLimit !SizeInBytes !SizeInBytes !SizeInBytes

     -- | This peer is not under the 'inFlightBytesLowWatermark'.
     --
     -- The arguments are:
     --
     -- * number of bytes currently in flight for that peer
     -- * the configured 'inFlightBytesLowWatermark' constant
     -- * the configured 'inFlightBytesHighWatermark' constant
     --
   | FetchDeclinePeerBusy           !SizeInBytes !SizeInBytes !SizeInBytes

     -- | The node is not under the 'maxConcurrentFetchPeers' limit.
     --
     -- The arguments are:
     --
     -- * the current 'FetchMode'
     -- * the corresponding configured limit constant, either
     --   'maxConcurrencyBulkSync' or 'maxConcurrencyDeadline'
     --
   | FetchDeclineConcurrencyLimit   !FetchMode !Word
  deriving (FetchDecline -> FetchDecline -> Bool
(FetchDecline -> FetchDecline -> Bool)
-> (FetchDecline -> FetchDecline -> Bool) -> Eq FetchDecline
forall a. (a -> a -> Bool) -> (a -> a -> Bool) -> Eq a
$c== :: FetchDecline -> FetchDecline -> Bool
== :: FetchDecline -> FetchDecline -> Bool
$c/= :: FetchDecline -> FetchDecline -> Bool
/= :: FetchDecline -> FetchDecline -> Bool
Eq, Int -> FetchDecline -> ShowS
[FetchDecline] -> ShowS
FetchDecline -> String
(Int -> FetchDecline -> ShowS)
-> (FetchDecline -> String)
-> ([FetchDecline] -> ShowS)
-> Show FetchDecline
forall a.
(Int -> a -> ShowS) -> (a -> String) -> ([a] -> ShowS) -> Show a
$cshowsPrec :: Int -> FetchDecline -> ShowS
showsPrec :: Int -> FetchDecline -> ShowS
$cshow :: FetchDecline -> String
show :: FetchDecline -> String
$cshowList :: [FetchDecline] -> ShowS
showList :: [FetchDecline] -> ShowS
Show)


-- | The \"oh noes?!\" operator.
--
-- In the case of an error, the operator provides a specific error value.
--
(?!) :: Maybe a -> e -> Either e a
Just a
x  ?! :: forall a e. Maybe a -> e -> Either e a
?! e
_ = a -> Either e a
forall a b. b -> Either a b
Right a
x
Maybe a
Nothing ?! e
e = e -> Either e a
forall a b. a -> Either a b
Left  e
e

-- | The combination of a 'ChainSuffix' and a list of discontiguous
-- 'AnchoredFragment's:
--
-- * When comparing two 'CandidateFragments' as candidate chains, we use the
--   'ChainSuffix'.
--
-- * To track which blocks of that candidate still have to be downloaded, we
--   use a list of discontiguous 'AnchoredFragment's.
--
type CandidateFragments header = (ChainSuffix header, [AnchoredFragment header])


fetchDecisions
  :: (Ord peer,
      Hashable peer,
      HasHeader header,
      HeaderHash header ~ HeaderHash block)
  => FetchDecisionPolicy header
  -> FetchMode
  -> AnchoredFragment header
  -> (Point block -> Bool)
  -> MaxSlotNo
  -> [(AnchoredFragment header, PeerInfo header peer extra)]
  -> [(FetchDecision (FetchRequest header), PeerInfo header peer extra)]
fetchDecisions :: forall peer header block extra.
(Ord peer, Hashable peer, HasHeader header,
 HeaderHash header ~ HeaderHash block) =>
FetchDecisionPolicy header
-> FetchMode
-> AnchoredFragment header
-> (Point block -> Bool)
-> MaxSlotNo
-> [(AnchoredFragment header, PeerInfo header peer extra)]
-> [(FetchDecision (FetchRequest header),
     PeerInfo header peer extra)]
fetchDecisions fetchDecisionPolicy :: FetchDecisionPolicy header
fetchDecisionPolicy@FetchDecisionPolicy {
                 HasCallStack =>
AnchoredFragment header -> AnchoredFragment header -> Bool
plausibleCandidateChain :: forall header.
FetchDecisionPolicy header
-> HasCallStack =>
   AnchoredFragment header -> AnchoredFragment header -> Bool
plausibleCandidateChain :: HasCallStack =>
AnchoredFragment header -> AnchoredFragment header -> Bool
plausibleCandidateChain,
                 HasCallStack =>
AnchoredFragment header -> AnchoredFragment header -> Ordering
compareCandidateChains :: forall header.
FetchDecisionPolicy header
-> HasCallStack =>
   AnchoredFragment header -> AnchoredFragment header -> Ordering
compareCandidateChains :: HasCallStack =>
AnchoredFragment header -> AnchoredFragment header -> Ordering
compareCandidateChains,
                 header -> SizeInBytes
blockFetchSize :: forall header. FetchDecisionPolicy header -> header -> SizeInBytes
blockFetchSize :: header -> SizeInBytes
blockFetchSize,
                 Int
peerSalt :: forall header. FetchDecisionPolicy header -> Int
peerSalt :: Int
peerSalt
               }
               FetchMode
fetchMode
               AnchoredFragment header
currentChain
               Point block -> Bool
fetchedBlocks
               MaxSlotNo
fetchedMaxSlotNo =

    -- Finally, make a decision for each (chain, peer) pair.
    FetchDecisionPolicy header
-> FetchMode
-> [(FetchDecision [AnchoredFragment header],
     PeerFetchStatus header, PeerFetchInFlight header, PeerGSV, peer,
     PeerInfo header peer extra)]
-> [(FetchDecision (FetchRequest header),
     PeerInfo header peer extra)]
forall extra header peer.
(Hashable peer, HasHeader header, Ord peer) =>
FetchDecisionPolicy header
-> FetchMode
-> [(FetchDecision [AnchoredFragment header],
     PeerFetchStatus header, PeerFetchInFlight header, PeerGSV, peer,
     extra)]
-> [(FetchDecision (FetchRequest header), extra)]
fetchRequestDecisions
      FetchDecisionPolicy header
fetchDecisionPolicy
      FetchMode
fetchMode
  ([(FetchDecision [AnchoredFragment header], PeerFetchStatus header,
   PeerFetchInFlight header, PeerGSV, peer,
   PeerInfo header peer extra)]
 -> [(FetchDecision (FetchRequest header),
      PeerInfo header peer extra)])
-> ([(AnchoredFragment header, PeerInfo header peer extra)]
    -> [(FetchDecision [AnchoredFragment header],
         PeerFetchStatus header, PeerFetchInFlight header, PeerGSV, peer,
         PeerInfo header peer extra)])
-> [(AnchoredFragment header, PeerInfo header peer extra)]
-> [(FetchDecision (FetchRequest header),
     PeerInfo header peer extra)]
forall b c a. (b -> c) -> (a -> b) -> a -> c
. ((FetchDecision [AnchoredFragment header],
  PeerInfo header peer extra)
 -> (FetchDecision [AnchoredFragment header],
     PeerFetchStatus header, PeerFetchInFlight header, PeerGSV, peer,
     PeerInfo header peer extra))
-> [(FetchDecision [AnchoredFragment header],
     PeerInfo header peer extra)]
-> [(FetchDecision [AnchoredFragment header],
     PeerFetchStatus header, PeerFetchInFlight header, PeerGSV, peer,
     PeerInfo header peer extra)]
forall a b. (a -> b) -> [a] -> [b]
map (FetchDecision [AnchoredFragment header],
 PeerInfo header peer extra)
-> (FetchDecision [AnchoredFragment header],
    PeerFetchStatus header, PeerFetchInFlight header, PeerGSV, peer,
    PeerInfo header peer extra)
forall {a} {b} {c} {d} {e} {e}.
(a, (b, c, d, e, e)) -> (a, b, c, d, e, (b, c, d, e, e))
swizzleSIG

    -- Filter to keep blocks that are not already in-flight with other peers.
  ([(FetchDecision [AnchoredFragment header],
   PeerInfo header peer extra)]
 -> [(FetchDecision [AnchoredFragment header],
      PeerFetchStatus header, PeerFetchInFlight header, PeerGSV, peer,
      PeerInfo header peer extra)])
-> ([(AnchoredFragment header, PeerInfo header peer extra)]
    -> [(FetchDecision [AnchoredFragment header],
         PeerInfo header peer extra)])
-> [(AnchoredFragment header, PeerInfo header peer extra)]
-> [(FetchDecision [AnchoredFragment header],
     PeerFetchStatus header, PeerFetchInFlight header, PeerGSV, peer,
     PeerInfo header peer extra)]
forall b c a. (b -> c) -> (a -> b) -> a -> c
. FetchMode
-> [(FetchDecision [AnchoredFragment header],
     PeerFetchStatus header, PeerFetchInFlight header,
     PeerInfo header peer extra)]
-> [(FetchDecision [AnchoredFragment header],
     PeerInfo header peer extra)]
forall header peerinfo.
HasHeader header =>
FetchMode
-> [(FetchDecision [AnchoredFragment header],
     PeerFetchStatus header, PeerFetchInFlight header, peerinfo)]
-> [(FetchDecision [AnchoredFragment header], peerinfo)]
filterNotAlreadyInFlightWithOtherPeers
      FetchMode
fetchMode
  ([(FetchDecision [AnchoredFragment header], PeerFetchStatus header,
   PeerFetchInFlight header, PeerInfo header peer extra)]
 -> [(FetchDecision [AnchoredFragment header],
      PeerInfo header peer extra)])
-> ([(AnchoredFragment header, PeerInfo header peer extra)]
    -> [(FetchDecision [AnchoredFragment header],
         PeerFetchStatus header, PeerFetchInFlight header,
         PeerInfo header peer extra)])
-> [(AnchoredFragment header, PeerInfo header peer extra)]
-> [(FetchDecision [AnchoredFragment header],
     PeerInfo header peer extra)]
forall b c a. (b -> c) -> (a -> b) -> a -> c
. ((FetchDecision [AnchoredFragment header],
  PeerInfo header peer extra)
 -> (FetchDecision [AnchoredFragment header],
     PeerFetchStatus header, PeerFetchInFlight header,
     PeerInfo header peer extra))
-> [(FetchDecision [AnchoredFragment header],
     PeerInfo header peer extra)]
-> [(FetchDecision [AnchoredFragment header],
     PeerFetchStatus header, PeerFetchInFlight header,
     PeerInfo header peer extra)]
forall a b. (a -> b) -> [a] -> [b]
map (FetchDecision [AnchoredFragment header],
 PeerInfo header peer extra)
-> (FetchDecision [AnchoredFragment header],
    PeerFetchStatus header, PeerFetchInFlight header,
    PeerInfo header peer extra)
forall {a} {b} {c} {c} {d} {e}.
(a, (b, c, c, d, e)) -> (a, b, c, (b, c, c, d, e))
swizzleSI

    -- Reorder chains based on consensus policy and network timing data.
  ([(FetchDecision [AnchoredFragment header],
   PeerInfo header peer extra)]
 -> [(FetchDecision [AnchoredFragment header],
      PeerFetchStatus header, PeerFetchInFlight header,
      PeerInfo header peer extra)])
-> ([(AnchoredFragment header, PeerInfo header peer extra)]
    -> [(FetchDecision [AnchoredFragment header],
         PeerInfo header peer extra)])
-> [(AnchoredFragment header, PeerInfo header peer extra)]
-> [(FetchDecision [AnchoredFragment header],
     PeerFetchStatus header, PeerFetchInFlight header,
     PeerInfo header peer extra)]
forall b c a. (b -> c) -> (a -> b) -> a -> c
. FetchMode
-> Int
-> (AnchoredFragment header -> AnchoredFragment header -> Ordering)
-> (header -> SizeInBytes)
-> [(FetchDecision (CandidateFragments header),
     PeerFetchInFlight header, PeerGSV, peer,
     PeerInfo header peer extra)]
-> [(FetchDecision [AnchoredFragment header],
     PeerInfo header peer extra)]
forall extra header peer.
(HasHeader header, Hashable peer, Ord peer) =>
FetchMode
-> Int
-> (AnchoredFragment header -> AnchoredFragment header -> Ordering)
-> (header -> SizeInBytes)
-> [(FetchDecision (CandidateFragments header),
     PeerFetchInFlight header, PeerGSV, peer, extra)]
-> [(FetchDecision [AnchoredFragment header], extra)]
prioritisePeerChains
      FetchMode
fetchMode
      Int
peerSalt
      HasCallStack =>
AnchoredFragment header -> AnchoredFragment header -> Ordering
AnchoredFragment header -> AnchoredFragment header -> Ordering
compareCandidateChains
      header -> SizeInBytes
blockFetchSize
  ([(FetchDecision (CandidateFragments header),
   PeerFetchInFlight header, PeerGSV, peer,
   PeerInfo header peer extra)]
 -> [(FetchDecision [AnchoredFragment header],
      PeerInfo header peer extra)])
-> ([(AnchoredFragment header, PeerInfo header peer extra)]
    -> [(FetchDecision (CandidateFragments header),
         PeerFetchInFlight header, PeerGSV, peer,
         PeerInfo header peer extra)])
-> [(AnchoredFragment header, PeerInfo header peer extra)]
-> [(FetchDecision [AnchoredFragment header],
     PeerInfo header peer extra)]
forall b c a. (b -> c) -> (a -> b) -> a -> c
. ((FetchDecision (CandidateFragments header),
  PeerInfo header peer extra)
 -> (FetchDecision (CandidateFragments header),
     PeerFetchInFlight header, PeerGSV, peer,
     PeerInfo header peer extra))
-> [(FetchDecision (CandidateFragments header),
     PeerInfo header peer extra)]
-> [(FetchDecision (CandidateFragments header),
     PeerFetchInFlight header, PeerGSV, peer,
     PeerInfo header peer extra)]
forall a b. (a -> b) -> [a] -> [b]
map (FetchDecision (CandidateFragments header),
 PeerInfo header peer extra)
-> (FetchDecision (CandidateFragments header),
    PeerFetchInFlight header, PeerGSV, peer,
    PeerInfo header peer extra)
forall {a} {a} {b} {c} {d} {e}.
(a, (a, b, c, d, e)) -> (a, b, c, d, (a, b, c, d, e))
swizzleIG

    -- Filter to keep blocks that are not already in-flight for this peer.
  ([(FetchDecision (CandidateFragments header),
   PeerInfo header peer extra)]
 -> [(FetchDecision (CandidateFragments header),
      PeerFetchInFlight header, PeerGSV, peer,
      PeerInfo header peer extra)])
-> ([(AnchoredFragment header, PeerInfo header peer extra)]
    -> [(FetchDecision (CandidateFragments header),
         PeerInfo header peer extra)])
-> [(AnchoredFragment header, PeerInfo header peer extra)]
-> [(FetchDecision (CandidateFragments header),
     PeerFetchInFlight header, PeerGSV, peer,
     PeerInfo header peer extra)]
forall b c a. (b -> c) -> (a -> b) -> a -> c
. [(FetchDecision (CandidateFragments header),
  PeerFetchInFlight header, PeerInfo header peer extra)]
-> [(FetchDecision (CandidateFragments header),
     PeerInfo header peer extra)]
forall header peerinfo.
HasHeader header =>
[(FetchDecision (CandidateFragments header),
  PeerFetchInFlight header, peerinfo)]
-> [(FetchDecision (CandidateFragments header), peerinfo)]
filterNotAlreadyInFlightWithPeer
  ([(FetchDecision (CandidateFragments header),
   PeerFetchInFlight header, PeerInfo header peer extra)]
 -> [(FetchDecision (CandidateFragments header),
      PeerInfo header peer extra)])
-> ([(AnchoredFragment header, PeerInfo header peer extra)]
    -> [(FetchDecision (CandidateFragments header),
         PeerFetchInFlight header, PeerInfo header peer extra)])
-> [(AnchoredFragment header, PeerInfo header peer extra)]
-> [(FetchDecision (CandidateFragments header),
     PeerInfo header peer extra)]
forall b c a. (b -> c) -> (a -> b) -> a -> c
. ((FetchDecision (CandidateFragments header),
  PeerInfo header peer extra)
 -> (FetchDecision (CandidateFragments header),
     PeerFetchInFlight header, PeerInfo header peer extra))
-> [(FetchDecision (CandidateFragments header),
     PeerInfo header peer extra)]
-> [(FetchDecision (CandidateFragments header),
     PeerFetchInFlight header, PeerInfo header peer extra)]
forall a b. (a -> b) -> [a] -> [b]
map (FetchDecision (CandidateFragments header),
 PeerInfo header peer extra)
-> (FetchDecision (CandidateFragments header),
    PeerFetchInFlight header, PeerInfo header peer extra)
forall {a} {a} {b} {c} {d} {e}.
(a, (a, b, c, d, e)) -> (a, b, (a, b, c, d, e))
swizzleI

    -- Filter to keep blocks that have not already been downloaded.
  ([(FetchDecision (CandidateFragments header),
   PeerInfo header peer extra)]
 -> [(FetchDecision (CandidateFragments header),
      PeerFetchInFlight header, PeerInfo header peer extra)])
-> ([(AnchoredFragment header, PeerInfo header peer extra)]
    -> [(FetchDecision (CandidateFragments header),
         PeerInfo header peer extra)])
-> [(AnchoredFragment header, PeerInfo header peer extra)]
-> [(FetchDecision (CandidateFragments header),
     PeerFetchInFlight header, PeerInfo header peer extra)]
forall b c a. (b -> c) -> (a -> b) -> a -> c
. (Point block -> Bool)
-> MaxSlotNo
-> [(FetchDecision (ChainSuffix header),
     PeerInfo header peer extra)]
-> [(FetchDecision (CandidateFragments header),
     PeerInfo header peer extra)]
forall header block peerinfo.
(HasHeader header, HeaderHash header ~ HeaderHash block) =>
(Point block -> Bool)
-> MaxSlotNo
-> [(FetchDecision (ChainSuffix header), peerinfo)]
-> [(FetchDecision (CandidateFragments header), peerinfo)]
filterNotAlreadyFetched
      Point block -> Bool
fetchedBlocks
      MaxSlotNo
fetchedMaxSlotNo

    -- Select the suffix up to the intersection with the current chain.
  ([(FetchDecision (ChainSuffix header), PeerInfo header peer extra)]
 -> [(FetchDecision (CandidateFragments header),
      PeerInfo header peer extra)])
-> ([(AnchoredFragment header, PeerInfo header peer extra)]
    -> [(FetchDecision (ChainSuffix header),
         PeerInfo header peer extra)])
-> [(AnchoredFragment header, PeerInfo header peer extra)]
-> [(FetchDecision (CandidateFragments header),
     PeerInfo header peer extra)]
forall b c a. (b -> c) -> (a -> b) -> a -> c
. AnchoredFragment header
-> [(FetchDecision (AnchoredFragment header),
     PeerInfo header peer extra)]
-> [(FetchDecision (ChainSuffix header),
     PeerInfo header peer extra)]
forall header block peerinfo.
(HasHeader header, HasHeader block,
 HeaderHash header ~ HeaderHash block) =>
AnchoredFragment block
-> [(FetchDecision (AnchoredFragment header), peerinfo)]
-> [(FetchDecision (ChainSuffix header), peerinfo)]
selectForkSuffixes
      AnchoredFragment header
currentChain

    -- First, filter to keep chains the consensus layer tells us are plausible.
  ([(FetchDecision (AnchoredFragment header),
   PeerInfo header peer extra)]
 -> [(FetchDecision (ChainSuffix header),
      PeerInfo header peer extra)])
-> ([(AnchoredFragment header, PeerInfo header peer extra)]
    -> [(FetchDecision (AnchoredFragment header),
         PeerInfo header peer extra)])
-> [(AnchoredFragment header, PeerInfo header peer extra)]
-> [(FetchDecision (ChainSuffix header),
     PeerInfo header peer extra)]
forall b c a. (b -> c) -> (a -> b) -> a -> c
. (AnchoredFragment header -> AnchoredFragment header -> Bool)
-> AnchoredFragment header
-> [(AnchoredFragment header, PeerInfo header peer extra)]
-> [(FetchDecision (AnchoredFragment header),
     PeerInfo header peer extra)]
forall block header peerinfo.
(AnchoredFragment block -> AnchoredFragment header -> Bool)
-> AnchoredFragment block
-> [(AnchoredFragment header, peerinfo)]
-> [(FetchDecision (AnchoredFragment header), peerinfo)]
filterPlausibleCandidates
      HasCallStack =>
AnchoredFragment header -> AnchoredFragment header -> Bool
AnchoredFragment header -> AnchoredFragment header -> Bool
plausibleCandidateChain
      AnchoredFragment header
currentChain
  where
    -- Data swizzling functions to get the right info into each stage.
    swizzleI :: (a, (a, b, c, d, e)) -> (a, b, (a, b, c, d, e))
swizzleI   (a
c, p :: (a, b, c, d, e)
p@(a
_,     b
inflight,c
_,d
_,      e
_)) = (a
c,         b
inflight,       (a, b, c, d, e)
p)
    swizzleIG :: (a, (a, b, c, d, e)) -> (a, b, c, d, (a, b, c, d, e))
swizzleIG  (a
c, p :: (a, b, c, d, e)
p@(a
_,     b
inflight,c
gsvs,d
peer,e
_)) = (a
c,         b
inflight, c
gsvs, d
peer, (a, b, c, d, e)
p)
    swizzleSI :: (a, (b, c, c, d, e)) -> (a, b, c, (b, c, c, d, e))
swizzleSI  (a
c, p :: (b, c, c, d, e)
p@(b
status,c
inflight,c
_,d
_,      e
_)) = (a
c, b
status, c
inflight,       (b, c, c, d, e)
p)
    swizzleSIG :: (a, (b, c, d, e, e)) -> (a, b, c, d, e, (b, c, d, e, e))
swizzleSIG (a
c, p :: (b, c, d, e, e)
p@(b
status,c
inflight,d
gsvs,e
peer,e
_)) = (a
c, b
status, c
inflight, d
gsvs, e
peer, (b, c, d, e, e)
p)

{-
We have the node's /current/ or /adopted/ chain. This is the node's chain in
the sense specified by the Ouroboros algorithm. It is a fully verified chain
with block bodies and a ledger state.

    ┆   ┆
    ├───┤
    │   │
    ├───┤
    │   │
    ├───┤
    │   │
    ├───┤
    │   │
 ───┴───┴─── current chain length (block number)

With chain selection we are interested in /candidate/ chains. We have these
candidate chains in the form of chains of verified headers, but without bodies.

The consensus layer gives us the current set of candidate chains from our peers
and we have the task of selecting which block bodies to download, and then
passing those block bodes back to the consensus layer. The consensus layer will
try to validate them and decide if it wants to update its current chain.

    ┆   ┆     ┆   ┆     ┆   ┆     ┆   ┆     ┆   ┆
    ├───┤     ├───┤     ├───┤     ├───┤     ├───┤
    │   │     │   │     │   │     │   │     │   │
    ├───┤     ├───┤     ├───┤     ├───┤     ├───┤
    │   │     │   │     │   │     │   │     │   │
    ├───┤     ├───┤     ├───┤     ├───┤     ├───┤
    │   │     │   │     │   │     │   │     │   │
    ├───┤     ├───┤     ├───┤     ├───┤     └───┘
    │   │     │   │     │   │     │   │
 ───┴───┴─────┼───┼─────┼───┼─────┼───┼───────────── current chain length
              │   │     │   │     │   │
  current     ├───┤     ├───┤     └───┘
  (blocks)    │   │     │   │
              └───┘     └───┘
                A         B         C         D
             candidates
             (headers)

In this example we have four candidate chains, with all but chain D strictly
longer than our current chain.

In general there are many candidate chains. We make a distinction between a
candidate chain and the peer from which it is available. It is often the
case that the same chain is available from multiple peers. We will try to be
clear about when we are referring to chains or the combination of a chain and
the peer from which it is available.

For the sake of the example let us assume we have the four chains above
available from the following peers.

peer    1         2         3         4         5         6         7
      ┆   ┆     ┆   ┆     ┆   ┆     ┆   ┆     ┆   ┆     ┆   ┆     ┆   ┆
      ├───┤     ├───┤     ├───┤     ├───┤     ├───┤     ├───┤     ├───┤
      │   │     │   │     │   │     │   │     │   │     │   │     │   │
      ├───┤     ├───┤     ├───┤     ├───┤     └───┘     ├───┤     ├───┤
      │   │     │   │     │   │     │   │               │   │     │   │
    ──┼───┼─────┼───┼─────┼───┼─────┼───┼───────────────┼───┼─────┼───┼──
      │   │     │   │     │   │     │   │               │   │     │   │
      └───┘     ├───┤     ├───┤     ├───┤               ├───┤     ├───┤
                │   │     │   │     │   │               │   │     │   │
                └───┘     └───┘     └───┘               └───┘     └───┘
chain   C         A         B         A         D         B         A

This is the form in which we are informed about candidate chains from the
consensus layer, the combination of a chain and the peer it is from. This
makes sense, since these things change independently.

We will process the chains in this form, keeping the peer/chain combination all
the way through. Although there could in principle be some opportunistic saving
by sharing when multiple peers provide the same chain, taking advantage of this
adds complexity and does nothing to improve our worst case costs.

We are only interested in candidate chains that are strictly longer than our
current chain. So our first task is to filter down to this set.
-}


-- | Keep only those candidate chains that are preferred over the current
-- chain. Typically, this means that their length is longer than the length of
-- the current chain.
--
filterPlausibleCandidates
  :: (AnchoredFragment block -> AnchoredFragment header -> Bool)
  -> AnchoredFragment block  -- ^ The current chain
  -> [(AnchoredFragment header, peerinfo)]
  -> [(FetchDecision (AnchoredFragment header), peerinfo)]
filterPlausibleCandidates :: forall block header peerinfo.
(AnchoredFragment block -> AnchoredFragment header -> Bool)
-> AnchoredFragment block
-> [(AnchoredFragment header, peerinfo)]
-> [(FetchDecision (AnchoredFragment header), peerinfo)]
filterPlausibleCandidates AnchoredFragment block -> AnchoredFragment header -> Bool
plausibleCandidateChain AnchoredFragment block
currentChain [(AnchoredFragment header, peerinfo)]
chains =
    [ (FetchDecision (AnchoredFragment header)
chain', peerinfo
peer)
    | (AnchoredFragment header
chain,  peerinfo
peer) <- [(AnchoredFragment header, peerinfo)]
chains
    , let chain' :: FetchDecision (AnchoredFragment header)
chain' = do
            Bool -> Maybe ()
forall (f :: * -> *). Alternative f => Bool -> f ()
guard (AnchoredFragment block -> AnchoredFragment header -> Bool
plausibleCandidateChain AnchoredFragment block
currentChain AnchoredFragment header
chain)
              Maybe () -> FetchDecline -> Either FetchDecline ()
forall a e. Maybe a -> e -> Either e a
?! FetchDecline
FetchDeclineChainNotPlausible
            AnchoredFragment header -> FetchDecision (AnchoredFragment header)
forall a. a -> Either FetchDecline a
forall (m :: * -> *) a. Monad m => a -> m a
return AnchoredFragment header
chain
    ]


{-
In the example, this leaves us with only the candidate chains: A, B and C, but
still paired up with the various peers.


peer    1         2         3         4                   6         7
      ┆   ┆     ┆   ┆     ┆   ┆     ┆   ┆               ┆   ┆     ┆   ┆
      ├───┤     ├───┤     ├───┤     ├───┤               ├───┤     ├───┤
      │   │     │   │     │   │     │   │               │   │     │   │
      ├───┤     ├───┤     ├───┤     ├───┤               ├───┤     ├───┤
      │   │     │   │     │   │     │   │               │   │     │   │
    ──┼───┼─────┼───┼─────┼───┼─────┼───┼───────────────┼───┼─────┼───┼──
      │   │     │   │     │   │     │   │               │   │     │   │
      └───┘     ├───┤     ├───┤     ├───┤               ├───┤     ├───┤
                │   │     │   │     │   │               │   │     │   │
                └───┘     └───┘     └───┘               └───┘     └───┘
chain   C         A         B         A                   B         A
-}


{-
Of course we would at most need to download the blocks in a candidate chain
that are not already in the current chain. So we must find those intersections.

Before we do that, lets define how we represent a suffix of a chain. We do this
very simply as a chain fragment: exactly those blocks contained in the suffix.
A chain fragment is of course not a chain, but has many similar invariants.

We will later also need to represent chain ranges when we send block fetch
requests. We do this using a pair of points: the first and last blocks in the
range.  While we can represent an empty chain fragment, we cannot represent an
empty fetch range, but this is ok since we never request empty ranges.

 Chain fragment
    ┌───┐
    │ ◉ │ Start of range, inclusive
    ├───┤
    │   │
    ├───┤
    │   │
    ├───┤
    │   │
    ├───┤
    │ ◉ │ End of range, inclusive.
    └───┘
-}

-- | A chain suffix, obtained by intersecting a candidate chain with the
-- current chain.
--
-- The anchor point of a 'ChainSuffix' will be a point within the bounds of
-- the current chain ('AF.withinFragmentBounds'), indicating that it forks off
-- in the last @K@ blocks.
--
-- A 'ChainSuffix' must be non-empty, as an empty suffix, i.e. the candidate
-- chain is equal to the current chain, would not be a plausible candidate.
newtype ChainSuffix header =
    ChainSuffix { forall header. ChainSuffix header -> AnchoredFragment header
getChainSuffix :: AnchoredFragment header }

{-
We define the /chain suffix/ as the suffix of the candidate chain up until (but
not including) where it intersects the current chain.


   current    peer 1    peer 2

    ┆   ┆
    ├───┤
    │  ◀┿━━━━━━━━━━━━━━━━━┓
    ├───┤               ┌─╂─┐
    │   │               │ ◉ │
    ├───┤               ├───┤
    │   │               │   │
    ├───┤               ├───┤
    │  ◀┿━━━━━━━┓       │   │
 ───┴───┴─────┬─╂─┬─────┼───┼───
              │ ◉ │     │   │
              └───┘     ├───┤
                        │ ◉ │
                        └───┘
                C         A

In this example we found that C was a strict extension of the current chain
and chain A was a short fork.

Note that it's possible that we don't find any intersection within the last K
blocks. This means the candidate forks by more than K and so we are not
interested in this candidate at all.
-}

-- | Find the chain suffix for a candidate chain, with respect to the
-- current chain.
--
chainForkSuffix
  :: (HasHeader header, HasHeader block,
      HeaderHash header ~ HeaderHash block)
  => AnchoredFragment block  -- ^ Current chain.
  -> AnchoredFragment header -- ^ Candidate chain
  -> Maybe (ChainSuffix header)
chainForkSuffix :: forall header block.
(HasHeader header, HasHeader block,
 HeaderHash header ~ HeaderHash block) =>
AnchoredFragment block
-> AnchoredFragment header -> Maybe (ChainSuffix header)
chainForkSuffix AnchoredFragment block
current AnchoredFragment header
candidate =
    case AnchoredFragment block
-> AnchoredFragment header
-> Maybe
     (AnchoredFragment block, AnchoredFragment header,
      AnchoredFragment block, AnchoredFragment header)
forall block1 block2.
(HasHeader block1, HasHeader block2,
 HeaderHash block1 ~ HeaderHash block2) =>
AnchoredFragment block1
-> AnchoredFragment block2
-> Maybe
     (AnchoredFragment block1, AnchoredFragment block2,
      AnchoredFragment block1, AnchoredFragment block2)
AF.intersect AnchoredFragment block
current AnchoredFragment header
candidate of
      Maybe
  (AnchoredFragment block, AnchoredFragment header,
   AnchoredFragment block, AnchoredFragment header)
Nothing                         -> Maybe (ChainSuffix header)
forall a. Maybe a
Nothing
      Just (AnchoredFragment block
_, AnchoredFragment header
_, AnchoredFragment block
_, AnchoredFragment header
candidateSuffix) ->
        -- If the suffix is empty, it means the candidate chain was equal to
        -- the current chain and didn't fork off. Such a candidate chain is
        -- not a plausible candidate, so it must have been filtered out.
        Bool -> Maybe (ChainSuffix header) -> Maybe (ChainSuffix header)
forall a. HasCallStack => Bool -> a -> a
assert (Bool -> Bool
not (AnchoredFragment header -> Bool
forall v a b. AnchoredSeq v a b -> Bool
AF.null AnchoredFragment header
candidateSuffix)) (Maybe (ChainSuffix header) -> Maybe (ChainSuffix header))
-> Maybe (ChainSuffix header) -> Maybe (ChainSuffix header)
forall a b. (a -> b) -> a -> b
$
        ChainSuffix header -> Maybe (ChainSuffix header)
forall a. a -> Maybe a
Just (AnchoredFragment header -> ChainSuffix header
forall header. AnchoredFragment header -> ChainSuffix header
ChainSuffix AnchoredFragment header
candidateSuffix)

selectForkSuffixes
  :: (HasHeader header, HasHeader block,
      HeaderHash header ~ HeaderHash block)
  => AnchoredFragment block
  -> [(FetchDecision (AnchoredFragment header), peerinfo)]
  -> [(FetchDecision (ChainSuffix      header), peerinfo)]
selectForkSuffixes :: forall header block peerinfo.
(HasHeader header, HasHeader block,
 HeaderHash header ~ HeaderHash block) =>
AnchoredFragment block
-> [(FetchDecision (AnchoredFragment header), peerinfo)]
-> [(FetchDecision (ChainSuffix header), peerinfo)]
selectForkSuffixes AnchoredFragment block
current [(FetchDecision (AnchoredFragment header), peerinfo)]
chains =
    [ (FetchDecision (ChainSuffix header)
mchain', peerinfo
peer)
    | (FetchDecision (AnchoredFragment header)
mchain,  peerinfo
peer) <- [(FetchDecision (AnchoredFragment header), peerinfo)]
chains
    , let mchain' :: FetchDecision (ChainSuffix header)
mchain' = do
            chain <- FetchDecision (AnchoredFragment header)
mchain
            chainForkSuffix current chain ?! FetchDeclineChainIntersectionTooDeep
    ]

{-
We define the /fetch range/ as the suffix of the fork range that has not yet
had its blocks downloaded and block content checked against the headers.

    ┆   ┆
    ├───┤
    │   │
    ├───┤               ┌───┐
    │   │    already    │   │
    ├───┤    fetched    ├───┤
    │   │    blocks     │   │
    ├───┤               ├───┤
    │   │               │░◉░│  ◄  fetch range
 ───┴───┴─────┬───┬─────┼───┼───
              │░◉░│ ◄   │░░░│
              └───┘     ├───┤
                        │░◉░│  ◄
                        └───┘

In earlier versions of this scheme we maintained and relied on the invariant
that the ranges of fetched blocks are backwards closed. This meant we never had
discontinuous ranges of fetched or not-yet-fetched blocks. This invariant does
simplify things somewhat by keeping the ranges continuous however it precludes
fetching ranges of blocks from different peers in parallel.

We do not maintain any such invariant and so we have to deal with there being
gaps in the ranges we have already fetched or are yet to fetch. To keep the
tracking simple we do not track the ranges themselves, rather we track the set
of individual blocks without their relationship to each other.

-}

-- | Find the fragments of the chain suffix that we still need to fetch, these
-- are the fragments covering blocks that have not yet been fetched and are
-- not currently in the process of being fetched from this peer.
--
-- Typically this is a single fragment forming a suffix of the chain, but in
-- the general case we can get a bunch of discontiguous chain fragments.
--
filterNotAlreadyFetched
  :: (HasHeader header, HeaderHash header ~ HeaderHash block)
  => (Point block -> Bool)
  -> MaxSlotNo
  -> [(FetchDecision (ChainSuffix        header), peerinfo)]
  -> [(FetchDecision (CandidateFragments header), peerinfo)]
filterNotAlreadyFetched :: forall header block peerinfo.
(HasHeader header, HeaderHash header ~ HeaderHash block) =>
(Point block -> Bool)
-> MaxSlotNo
-> [(FetchDecision (ChainSuffix header), peerinfo)]
-> [(FetchDecision (CandidateFragments header), peerinfo)]
filterNotAlreadyFetched Point block -> Bool
alreadyDownloaded MaxSlotNo
fetchedMaxSlotNo [(FetchDecision (ChainSuffix header), peerinfo)]
chains =
    [ (FetchDecision (CandidateFragments header)
mcandidates, peerinfo
peer)
    | (FetchDecision (ChainSuffix header)
mcandidate,  peerinfo
peer) <- [(FetchDecision (ChainSuffix header), peerinfo)]
chains
    , let mcandidates :: FetchDecision (CandidateFragments header)
mcandidates = do
            candidate <- FetchDecision (ChainSuffix header)
mcandidate
            let fragments = (header -> Bool)
-> MaxSlotNo
-> AnchoredFragment header
-> [AnchoredFragment header]
forall header.
HasHeader header =>
(header -> Bool)
-> MaxSlotNo
-> AnchoredFragment header
-> [AnchoredFragment header]
filterWithMaxSlotNo
                              header -> Bool
notAlreadyFetched
                              MaxSlotNo
fetchedMaxSlotNo
                              (ChainSuffix header -> AnchoredFragment header
forall header. ChainSuffix header -> AnchoredFragment header
getChainSuffix ChainSuffix header
candidate)
            guard (not (null fragments)) ?! FetchDeclineAlreadyFetched
            return (candidate, fragments)
    ]
  where
    notAlreadyFetched :: header -> Bool
notAlreadyFetched = Bool -> Bool
not (Bool -> Bool) -> (header -> Bool) -> header -> Bool
forall b c a. (b -> c) -> (a -> b) -> a -> c
. Point block -> Bool
alreadyDownloaded (Point block -> Bool) -> (header -> Point block) -> header -> Bool
forall b c a. (b -> c) -> (a -> b) -> a -> c
. Point header -> Point block
forall {k1} {k2} (b :: k1) (b' :: k2).
Coercible (HeaderHash b) (HeaderHash b') =>
Point b -> Point b'
castPoint (Point header -> Point block)
-> (header -> Point header) -> header -> Point block
forall b c a. (b -> c) -> (a -> b) -> a -> c
. header -> Point header
forall block. HasHeader block => block -> Point block
blockPoint


filterNotAlreadyInFlightWithPeer
  :: HasHeader header
  => [(FetchDecision (CandidateFragments header), PeerFetchInFlight header,
                                                  peerinfo)]
  -> [(FetchDecision (CandidateFragments header), peerinfo)]
filterNotAlreadyInFlightWithPeer :: forall header peerinfo.
HasHeader header =>
[(FetchDecision (CandidateFragments header),
  PeerFetchInFlight header, peerinfo)]
-> [(FetchDecision (CandidateFragments header), peerinfo)]
filterNotAlreadyInFlightWithPeer [(FetchDecision (CandidateFragments header),
  PeerFetchInFlight header, peerinfo)]
chains =
    [ (FetchDecision (CandidateFragments header)
mcandidatefragments',          peerinfo
peer)
    | (FetchDecision (CandidateFragments header)
mcandidatefragments, PeerFetchInFlight header
inflight, peerinfo
peer) <- [(FetchDecision (CandidateFragments header),
  PeerFetchInFlight header, peerinfo)]
chains
    , let mcandidatefragments' :: FetchDecision (CandidateFragments header)
mcandidatefragments' = do
            (candidate, chainfragments) <- FetchDecision (CandidateFragments header)
mcandidatefragments
            let fragments = (AnchoredFragment header -> [AnchoredFragment header])
-> [AnchoredFragment header] -> [AnchoredFragment header]
forall (t :: * -> *) a b. Foldable t => (a -> [b]) -> t a -> [b]
concatMap ((header -> Bool)
-> MaxSlotNo
-> AnchoredFragment header
-> [AnchoredFragment header]
forall header.
HasHeader header =>
(header -> Bool)
-> MaxSlotNo
-> AnchoredFragment header
-> [AnchoredFragment header]
filterWithMaxSlotNo
                                         (PeerFetchInFlight header -> header -> Bool
forall {block}.
HasHeader block =>
PeerFetchInFlight block -> block -> Bool
notAlreadyInFlight PeerFetchInFlight header
inflight)
                                         (PeerFetchInFlight header -> MaxSlotNo
forall header. PeerFetchInFlight header -> MaxSlotNo
peerFetchMaxSlotNo PeerFetchInFlight header
inflight))
                                      [AnchoredFragment header]
chainfragments
            guard (not (null fragments)) ?! FetchDeclineInFlightThisPeer
            return (candidate, fragments)
    ]
  where
    notAlreadyInFlight :: PeerFetchInFlight block -> block -> Bool
notAlreadyInFlight PeerFetchInFlight block
inflight block
b =
      block -> Point block
forall block. HasHeader block => block -> Point block
blockPoint block
b Point block -> Set (Point block) -> Bool
forall a. Ord a => a -> Set a -> Bool
`Set.notMember` PeerFetchInFlight block -> Set (Point block)
forall header. PeerFetchInFlight header -> Set (Point header)
peerFetchBlocksInFlight PeerFetchInFlight block
inflight


-- | A penultimate step of filtering, but this time across peers, rather than
-- individually for each peer. If we're following the parallel fetch
-- mode then we filter out blocks that are already in-flight with other
-- peers.
--
-- Note that this does /not/ cover blocks that are proposed to be fetched in
-- this round of decisions. That step is covered  in 'fetchRequestDecisions'.
--
filterNotAlreadyInFlightWithOtherPeers
  :: HasHeader header
  => FetchMode
  -> [( FetchDecision [AnchoredFragment header]
      , PeerFetchStatus header
      , PeerFetchInFlight header
      , peerinfo )]
  -> [(FetchDecision [AnchoredFragment header], peerinfo)]

filterNotAlreadyInFlightWithOtherPeers :: forall header peerinfo.
HasHeader header =>
FetchMode
-> [(FetchDecision [AnchoredFragment header],
     PeerFetchStatus header, PeerFetchInFlight header, peerinfo)]
-> [(FetchDecision [AnchoredFragment header], peerinfo)]
filterNotAlreadyInFlightWithOtherPeers FetchMode
FetchModeDeadline [(FetchDecision [AnchoredFragment header], PeerFetchStatus header,
  PeerFetchInFlight header, peerinfo)]
chains =
    [ (FetchDecision [AnchoredFragment header]
mchainfragments,       peerinfo
peer)
    | (FetchDecision [AnchoredFragment header]
mchainfragments, PeerFetchStatus header
_, PeerFetchInFlight header
_, peerinfo
peer) <- [(FetchDecision [AnchoredFragment header], PeerFetchStatus header,
  PeerFetchInFlight header, peerinfo)]
chains ]

filterNotAlreadyInFlightWithOtherPeers FetchMode
FetchModeBulkSync [(FetchDecision [AnchoredFragment header], PeerFetchStatus header,
  PeerFetchInFlight header, peerinfo)]
chains =
    [ (FetchDecision [AnchoredFragment header]
mcandidatefragments',      peerinfo
peer)
    | (FetchDecision [AnchoredFragment header]
mcandidatefragments, PeerFetchStatus header
_, PeerFetchInFlight header
_, peerinfo
peer) <- [(FetchDecision [AnchoredFragment header], PeerFetchStatus header,
  PeerFetchInFlight header, peerinfo)]
chains
    , let mcandidatefragments' :: FetchDecision [AnchoredFragment header]
mcandidatefragments' = do
            chainfragments <- FetchDecision [AnchoredFragment header]
mcandidatefragments
            let fragments = (AnchoredFragment header -> [AnchoredFragment header])
-> [AnchoredFragment header] -> [AnchoredFragment header]
forall (t :: * -> *) a b. Foldable t => (a -> [b]) -> t a -> [b]
concatMap ((header -> Bool)
-> MaxSlotNo
-> AnchoredFragment header
-> [AnchoredFragment header]
forall header.
HasHeader header =>
(header -> Bool)
-> MaxSlotNo
-> AnchoredFragment header
-> [AnchoredFragment header]
filterWithMaxSlotNo
                                        header -> Bool
notAlreadyInFlight
                                        MaxSlotNo
maxSlotNoInFlightWithOtherPeers)
                                      [AnchoredFragment header]
chainfragments
            guard (not (null fragments)) ?! FetchDeclineInFlightOtherPeer
            return fragments
    ]
  where
    notAlreadyInFlight :: header -> Bool
notAlreadyInFlight header
b =
      header -> Point header
forall block. HasHeader block => block -> Point block
blockPoint header
b Point header -> Set (Point header) -> Bool
forall a. Ord a => a -> Set a -> Bool
`Set.notMember` Set (Point header)
blocksInFlightWithOtherPeers

   -- All the blocks that are already in-flight with all peers
    blocksInFlightWithOtherPeers :: Set (Point header)
blocksInFlightWithOtherPeers =
      [Set (Point header)] -> Set (Point header)
forall (f :: * -> *) a. (Foldable f, Ord a) => f (Set a) -> Set a
Set.unions
        [ case PeerFetchStatus header
status of
            PeerFetchStatus header
PeerFetchStatusShutdown -> Set (Point header)
forall a. Set a
Set.empty
            PeerFetchStatus header
PeerFetchStatusStarting -> Set (Point header)
forall a. Set a
Set.empty
            PeerFetchStatus header
PeerFetchStatusAberrant -> Set (Point header)
forall a. Set a
Set.empty
            PeerFetchStatus header
_other                  -> PeerFetchInFlight header -> Set (Point header)
forall header. PeerFetchInFlight header -> Set (Point header)
peerFetchBlocksInFlight PeerFetchInFlight header
inflight
        | (FetchDecision [AnchoredFragment header]
_, PeerFetchStatus header
status, PeerFetchInFlight header
inflight, peerinfo
_) <- [(FetchDecision [AnchoredFragment header], PeerFetchStatus header,
  PeerFetchInFlight header, peerinfo)]
chains ]

    -- The highest slot number that is or has been in flight for any peer.
    maxSlotNoInFlightWithOtherPeers :: MaxSlotNo
maxSlotNoInFlightWithOtherPeers = (MaxSlotNo -> MaxSlotNo -> MaxSlotNo)
-> MaxSlotNo -> [MaxSlotNo] -> MaxSlotNo
forall b a. (b -> a -> b) -> b -> [a] -> b
forall (t :: * -> *) b a.
Foldable t =>
(b -> a -> b) -> b -> t a -> b
List.foldl' MaxSlotNo -> MaxSlotNo -> MaxSlotNo
forall a. Ord a => a -> a -> a
max MaxSlotNo
NoMaxSlotNo
      [ PeerFetchInFlight header -> MaxSlotNo
forall header. PeerFetchInFlight header -> MaxSlotNo
peerFetchMaxSlotNo PeerFetchInFlight header
inflight | (FetchDecision [AnchoredFragment header]
_, PeerFetchStatus header
_, PeerFetchInFlight header
inflight, peerinfo
_) <- [(FetchDecision [AnchoredFragment header], PeerFetchStatus header,
  PeerFetchInFlight header, peerinfo)]
chains ]

-- | Filter a fragment. This is an optimised variant that will behave the same
-- as 'AnchoredFragment.filter' if the following precondition is satisfied:
--
-- PRECONDITION: for all @hdr@ in the chain fragment: if @blockSlot hdr >
-- maxSlotNo@ then the predicate should not hold for any header after @hdr@ in
-- the chain fragment.
--
-- For example, when filtering out already downloaded blocks from the
-- fragment, it does not make sense to keep filtering after having encountered
-- the highest slot number the ChainDB has seen so far: blocks with a greater
-- slot number cannot have been downloaded yet. When the candidate fragments
-- get far ahead of the current chain, e.g., @2k@ headers, this optimisation
-- avoids the linear cost of filtering these headers when we know in advance
-- they will all remain in the final fragment. In case the given slot number
-- is 'NoSlotNo', no filtering takes place, as there should be no matches
-- because we haven't downloaded any blocks yet.
--
-- For example, when filtering out blocks already in-flight for the given
-- peer, the given @maxSlotNo@ can correspond to the block with the highest
-- slot number that so far has been in-flight for the given peer. When no
-- blocks have been in-flight yet, @maxSlotNo@ can be 'NoSlotNo', in which
-- case no filtering needs to take place, which makes sense, as there are no
-- blocks to filter out. Note that this is conservative: if a block is for
-- some reason multiple times in-flight (maybe it has to be redownloaded) and
-- the block's slot number matches the @maxSlotNo@, it will now be filtered
-- (while the filtering might previously have stopped before encountering the
-- block in question). This is fine, as the filter will now include the block,
-- because according to the filtering predicate, the block is not in-flight.
filterWithMaxSlotNo
  :: forall header. HasHeader header
  => (header -> Bool)
  -> MaxSlotNo  -- ^ @maxSlotNo@
  -> AnchoredFragment header
  -> [AnchoredFragment header]
filterWithMaxSlotNo :: forall header.
HasHeader header =>
(header -> Bool)
-> MaxSlotNo
-> AnchoredFragment header
-> [AnchoredFragment header]
filterWithMaxSlotNo header -> Bool
p MaxSlotNo
maxSlotNo =
    (header -> Bool)
-> (header -> Bool)
-> AnchoredSeq (WithOrigin SlotNo) (Anchor header) header
-> [AnchoredSeq (WithOrigin SlotNo) (Anchor header) header]
forall v a b.
Anchorable v a b =>
(b -> Bool)
-> (b -> Bool) -> AnchoredSeq v a b -> [AnchoredSeq v a b]
AF.filterWithStop header -> Bool
p ((MaxSlotNo -> MaxSlotNo -> Bool
forall a. Ord a => a -> a -> Bool
> MaxSlotNo
maxSlotNo) (MaxSlotNo -> Bool) -> (header -> MaxSlotNo) -> header -> Bool
forall b c a. (b -> c) -> (a -> b) -> a -> c
. SlotNo -> MaxSlotNo
MaxSlotNo (SlotNo -> MaxSlotNo) -> (header -> SlotNo) -> header -> MaxSlotNo
forall b c a. (b -> c) -> (a -> b) -> a -> c
. header -> SlotNo
forall b. HasHeader b => b -> SlotNo
blockSlot)

prioritisePeerChains
  :: forall extra header peer.
   ( HasHeader header
   , Hashable peer
   , Ord peer
   )
  => FetchMode
  -> Int
  -> (AnchoredFragment header -> AnchoredFragment header -> Ordering)
  -> (header -> SizeInBytes)
  -> [(FetchDecision (CandidateFragments header), PeerFetchInFlight header,
                                                  PeerGSV,
                                                  peer,
                                                  extra )]
  -> [(FetchDecision [AnchoredFragment header],   extra)]
prioritisePeerChains :: forall extra header peer.
(HasHeader header, Hashable peer, Ord peer) =>
FetchMode
-> Int
-> (AnchoredFragment header -> AnchoredFragment header -> Ordering)
-> (header -> SizeInBytes)
-> [(FetchDecision (CandidateFragments header),
     PeerFetchInFlight header, PeerGSV, peer, extra)]
-> [(FetchDecision [AnchoredFragment header], extra)]
prioritisePeerChains FetchMode
FetchModeDeadline Int
salt AnchoredFragment header -> AnchoredFragment header -> Ordering
compareCandidateChains header -> SizeInBytes
blockFetchSize =
    ((Either
    FetchDecline
    (ProbabilityBand, ChainSuffix header, [AnchoredFragment header]),
  extra)
 -> (FetchDecision [AnchoredFragment header], extra))
-> [(Either
       FetchDecline
       (ProbabilityBand, ChainSuffix header, [AnchoredFragment header]),
     extra)]
-> [(FetchDecision [AnchoredFragment header], extra)]
forall a b. (a -> b) -> [a] -> [b]
map (\(Either
  FetchDecline
  (ProbabilityBand, ChainSuffix header, [AnchoredFragment header])
decision, extra
peer) ->
            (((ProbabilityBand, ChainSuffix header, [AnchoredFragment header])
 -> [AnchoredFragment header])
-> Either
     FetchDecline
     (ProbabilityBand, ChainSuffix header, [AnchoredFragment header])
-> FetchDecision [AnchoredFragment header]
forall a b.
(a -> b) -> Either FetchDecline a -> Either FetchDecline b
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
fmap (\(ProbabilityBand
_,ChainSuffix header
_,[AnchoredFragment header]
fragment) -> [AnchoredFragment header]
fragment) Either
  FetchDecline
  (ProbabilityBand, ChainSuffix header, [AnchoredFragment header])
decision, extra
peer))
  ([(Either
     FetchDecline
     (ProbabilityBand, ChainSuffix header, [AnchoredFragment header]),
   extra)]
 -> [(FetchDecision [AnchoredFragment header], extra)])
-> ([(FetchDecision (CandidateFragments header),
      PeerFetchInFlight header, PeerGSV, peer, extra)]
    -> [(Either
           FetchDecline
           (ProbabilityBand, ChainSuffix header, [AnchoredFragment header]),
         extra)])
-> [(FetchDecision (CandidateFragments header),
     PeerFetchInFlight header, PeerGSV, peer, extra)]
-> [(FetchDecision [AnchoredFragment header], extra)]
forall b c a. (b -> c) -> (a -> b) -> a -> c
. ([(Either
     FetchDecline
     (ProbabilityBand, ChainSuffix header, [AnchoredFragment header]),
   extra)]
 -> [(Either
        FetchDecline
        (ProbabilityBand, ChainSuffix header, [AnchoredFragment header]),
      extra)])
-> [[(Either
        FetchDecline
        (ProbabilityBand, ChainSuffix header, [AnchoredFragment header]),
      extra)]]
-> [(Either
       FetchDecline
       (ProbabilityBand, ChainSuffix header, [AnchoredFragment header]),
     extra)]
forall (t :: * -> *) a b. Foldable t => (a -> [b]) -> t a -> [b]
concatMap ( [[(Either
     FetchDecline
     (ProbabilityBand, ChainSuffix header, [AnchoredFragment header]),
   extra)]]
-> [(Either
       FetchDecline
       (ProbabilityBand, ChainSuffix header, [AnchoredFragment header]),
     extra)]
forall (t :: * -> *) a. Foldable t => t [a] -> [a]
concat
              ([[(Either
      FetchDecline
      (ProbabilityBand, ChainSuffix header, [AnchoredFragment header]),
    extra)]]
 -> [(Either
        FetchDecline
        (ProbabilityBand, ChainSuffix header, [AnchoredFragment header]),
      extra)])
-> ([(Either
        FetchDecline
        (ProbabilityBand, ChainSuffix header, [AnchoredFragment header]),
      extra)]
    -> [[(Either
            FetchDecline
            (ProbabilityBand, ChainSuffix header, [AnchoredFragment header]),
          extra)]])
-> [(Either
       FetchDecline
       (ProbabilityBand, ChainSuffix header, [AnchoredFragment header]),
     extra)]
-> [(Either
       FetchDecline
       (ProbabilityBand, ChainSuffix header, [AnchoredFragment header]),
     extra)]
forall b c a. (b -> c) -> (a -> b) -> a -> c
. [[(Either
     FetchDecline
     (ProbabilityBand, ChainSuffix header, [AnchoredFragment header]),
   extra)]]
-> [[(Either
        FetchDecline
        (ProbabilityBand, ChainSuffix header, [AnchoredFragment header]),
      extra)]]
forall a. [[a]] -> [[a]]
transpose
              ([[(Either
      FetchDecline
      (ProbabilityBand, ChainSuffix header, [AnchoredFragment header]),
    extra)]]
 -> [[(Either
         FetchDecline
         (ProbabilityBand, ChainSuffix header, [AnchoredFragment header]),
       extra)]])
-> ([(Either
        FetchDecline
        (ProbabilityBand, ChainSuffix header, [AnchoredFragment header]),
      extra)]
    -> [[(Either
            FetchDecline
            (ProbabilityBand, ChainSuffix header, [AnchoredFragment header]),
          extra)]])
-> [(Either
       FetchDecline
       (ProbabilityBand, ChainSuffix header, [AnchoredFragment header]),
     extra)]
-> [[(Either
        FetchDecline
        (ProbabilityBand, ChainSuffix header, [AnchoredFragment header]),
      extra)]]
forall b c a. (b -> c) -> (a -> b) -> a -> c
. ((Either
    FetchDecline
    (ProbabilityBand, ChainSuffix header, [AnchoredFragment header]),
  extra)
 -> (Either
       FetchDecline
       (ProbabilityBand, ChainSuffix header, [AnchoredFragment header]),
     extra)
 -> Bool)
-> [(Either
       FetchDecline
       (ProbabilityBand, ChainSuffix header, [AnchoredFragment header]),
     extra)]
-> [[(Either
        FetchDecline
        (ProbabilityBand, ChainSuffix header, [AnchoredFragment header]),
      extra)]]
forall a. (a -> a -> Bool) -> [a] -> [[a]]
groupBy (Equating
  (Either
     FetchDecline
     (ProbabilityBand, ChainSuffix header, [AnchoredFragment header]))
-> (Either
      FetchDecline
      (ProbabilityBand, ChainSuffix header, [AnchoredFragment header]),
    extra)
-> (Either
      FetchDecline
      (ProbabilityBand, ChainSuffix header, [AnchoredFragment header]),
    extra)
-> Bool
forall a b. Equating a -> Equating (a, b)
equatingFst
                          (Equating
  (ProbabilityBand, ChainSuffix header, [AnchoredFragment header])
-> Equating
     (Either
        FetchDecline
        (ProbabilityBand, ChainSuffix header, [AnchoredFragment header]))
forall b a. Equating b -> Equating (Either a b)
equatingRight
                            (Point header -> Point header -> Bool
forall a. Eq a => a -> a -> Bool
(==) (Point header -> Point header -> Bool)
-> ((ProbabilityBand, ChainSuffix header,
     [AnchoredFragment header])
    -> Point header)
-> Equating
     (ProbabilityBand, ChainSuffix header, [AnchoredFragment header])
forall b c a. (b -> b -> c) -> (a -> b) -> a -> a -> c
`on` (ProbabilityBand, ChainSuffix header, [AnchoredFragment header])
-> Point header
forall {block} {a} {c}.
HasHeader block =>
(a, ChainSuffix block, c) -> Point block
chainHeadPoint)))
              ([(Either
     FetchDecline
     (ProbabilityBand, ChainSuffix header, [AnchoredFragment header]),
   extra)]
 -> [[(Either
         FetchDecline
         (ProbabilityBand, ChainSuffix header, [AnchoredFragment header]),
       extra)]])
-> ([(Either
        FetchDecline
        (ProbabilityBand, ChainSuffix header, [AnchoredFragment header]),
      extra)]
    -> [(Either
           FetchDecline
           (ProbabilityBand, ChainSuffix header, [AnchoredFragment header]),
         extra)])
-> [(Either
       FetchDecline
       (ProbabilityBand, ChainSuffix header, [AnchoredFragment header]),
     extra)]
-> [[(Either
        FetchDecline
        (ProbabilityBand, ChainSuffix header, [AnchoredFragment header]),
      extra)]]
forall b c a. (b -> c) -> (a -> b) -> a -> c
. ((Either
    FetchDecline
    (ProbabilityBand, ChainSuffix header, [AnchoredFragment header]),
  extra)
 -> (Either
       FetchDecline
       (ProbabilityBand, ChainSuffix header, [AnchoredFragment header]),
     extra)
 -> Ordering)
-> [(Either
       FetchDecline
       (ProbabilityBand, ChainSuffix header, [AnchoredFragment header]),
     extra)]
-> [(Either
       FetchDecline
       (ProbabilityBand, ChainSuffix header, [AnchoredFragment header]),
     extra)]
forall a. (a -> a -> Ordering) -> [a] -> [a]
sortBy  (Comparing
  (Either
     FetchDecline
     (ProbabilityBand, ChainSuffix header, [AnchoredFragment header]))
-> (Either
      FetchDecline
      (ProbabilityBand, ChainSuffix header, [AnchoredFragment header]),
    extra)
-> (Either
      FetchDecline
      (ProbabilityBand, ChainSuffix header, [AnchoredFragment header]),
    extra)
-> Ordering
forall a b. Comparing a -> Comparing (a, b)
comparingFst
                          (Comparing
  (ProbabilityBand, ChainSuffix header, [AnchoredFragment header])
-> Comparing
     (Either
        FetchDecline
        (ProbabilityBand, ChainSuffix header, [AnchoredFragment header]))
forall b a. Comparing b -> Comparing (Either a b)
comparingRight
                            (Point header -> Point header -> Ordering
forall a. Ord a => a -> a -> Ordering
compare (Point header -> Point header -> Ordering)
-> ((ProbabilityBand, ChainSuffix header,
     [AnchoredFragment header])
    -> Point header)
-> Comparing
     (ProbabilityBand, ChainSuffix header, [AnchoredFragment header])
forall b c a. (b -> b -> c) -> (a -> b) -> a -> a -> c
`on` (ProbabilityBand, ChainSuffix header, [AnchoredFragment header])
-> Point header
forall {block} {a} {c}.
HasHeader block =>
(a, ChainSuffix block, c) -> Point block
chainHeadPoint)))
              )
  ([[(Either
      FetchDecline
      (ProbabilityBand, ChainSuffix header, [AnchoredFragment header]),
    extra)]]
 -> [(Either
        FetchDecline
        (ProbabilityBand, ChainSuffix header, [AnchoredFragment header]),
      extra)])
-> ([(FetchDecision (CandidateFragments header),
      PeerFetchInFlight header, PeerGSV, peer, extra)]
    -> [[(Either
            FetchDecline
            (ProbabilityBand, ChainSuffix header, [AnchoredFragment header]),
          extra)]])
-> [(FetchDecision (CandidateFragments header),
     PeerFetchInFlight header, PeerGSV, peer, extra)]
-> [(Either
       FetchDecline
       (ProbabilityBand, ChainSuffix header, [AnchoredFragment header]),
     extra)]
forall b c a. (b -> c) -> (a -> b) -> a -> c
. ((Either
    FetchDecline
    (ProbabilityBand, ChainSuffix header, [AnchoredFragment header]),
  extra)
 -> (Either
       FetchDecline
       (ProbabilityBand, ChainSuffix header, [AnchoredFragment header]),
     extra)
 -> Bool)
-> [(Either
       FetchDecline
       (ProbabilityBand, ChainSuffix header, [AnchoredFragment header]),
     extra)]
-> [[(Either
        FetchDecline
        (ProbabilityBand, ChainSuffix header, [AnchoredFragment header]),
      extra)]]
forall a. (a -> a -> Bool) -> [a] -> [[a]]
groupBy (Equating
  (Either
     FetchDecline
     (ProbabilityBand, ChainSuffix header, [AnchoredFragment header]))
-> (Either
      FetchDecline
      (ProbabilityBand, ChainSuffix header, [AnchoredFragment header]),
    extra)
-> (Either
      FetchDecline
      (ProbabilityBand, ChainSuffix header, [AnchoredFragment header]),
    extra)
-> Bool
forall a b. Equating a -> Equating (a, b)
equatingFst
              (Equating
  (ProbabilityBand, ChainSuffix header, [AnchoredFragment header])
-> Equating
     (Either
        FetchDecline
        (ProbabilityBand, ChainSuffix header, [AnchoredFragment header]))
forall b a. Equating b -> Equating (Either a b)
equatingRight
                (Equating ProbabilityBand
-> Equating (ChainSuffix header)
-> Equating (ProbabilityBand, ChainSuffix header)
forall a b. Equating a -> Equating b -> Equating (a, b)
equatingPair
                   -- compare on probability band first, then preferred chain
                   Equating ProbabilityBand
forall a. Eq a => a -> a -> Bool
(==)
                   (AnchoredFragment header -> AnchoredFragment header -> Bool
equateCandidateChains (AnchoredFragment header -> AnchoredFragment header -> Bool)
-> (ChainSuffix header -> AnchoredFragment header)
-> Equating (ChainSuffix header)
forall b c a. (b -> b -> c) -> (a -> b) -> a -> a -> c
`on` ChainSuffix header -> AnchoredFragment header
forall header. ChainSuffix header -> AnchoredFragment header
getChainSuffix)
                 Equating (ProbabilityBand, ChainSuffix header)
-> ((ProbabilityBand, ChainSuffix header,
     [AnchoredFragment header])
    -> (ProbabilityBand, ChainSuffix header))
-> Equating
     (ProbabilityBand, ChainSuffix header, [AnchoredFragment header])
forall b c a. (b -> b -> c) -> (a -> b) -> a -> a -> c
`on`
                   (\(ProbabilityBand
band, ChainSuffix header
chain, [AnchoredFragment header]
_fragments) -> (ProbabilityBand
band, ChainSuffix header
chain)))))
  ([(Either
     FetchDecline
     (ProbabilityBand, ChainSuffix header, [AnchoredFragment header]),
   extra)]
 -> [[(Either
         FetchDecline
         (ProbabilityBand, ChainSuffix header, [AnchoredFragment header]),
       extra)]])
-> ([(FetchDecision (CandidateFragments header),
      PeerFetchInFlight header, PeerGSV, peer, extra)]
    -> [(Either
           FetchDecline
           (ProbabilityBand, ChainSuffix header, [AnchoredFragment header]),
         extra)])
-> [(FetchDecision (CandidateFragments header),
     PeerFetchInFlight header, PeerGSV, peer, extra)]
-> [[(Either
        FetchDecline
        (ProbabilityBand, ChainSuffix header, [AnchoredFragment header]),
      extra)]]
forall b c a. (b -> c) -> (a -> b) -> a -> c
. ((Either
    FetchDecline
    (ProbabilityBand, ChainSuffix header, [AnchoredFragment header]),
  extra)
 -> (Either
       FetchDecline
       (ProbabilityBand, ChainSuffix header, [AnchoredFragment header]),
     extra)
 -> Ordering)
-> [(Either
       FetchDecline
       (ProbabilityBand, ChainSuffix header, [AnchoredFragment header]),
     extra)]
-> [(Either
       FetchDecline
       (ProbabilityBand, ChainSuffix header, [AnchoredFragment header]),
     extra)]
forall a. (a -> a -> Ordering) -> [a] -> [a]
sortBy  (((Either
    FetchDecline
    (ProbabilityBand, ChainSuffix header, [AnchoredFragment header]),
  extra)
 -> (Either
       FetchDecline
       (ProbabilityBand, ChainSuffix header, [AnchoredFragment header]),
     extra)
 -> Ordering)
-> (Either
      FetchDecline
      (ProbabilityBand, ChainSuffix header, [AnchoredFragment header]),
    extra)
-> (Either
      FetchDecline
      (ProbabilityBand, ChainSuffix header, [AnchoredFragment header]),
    extra)
-> Ordering
forall a. Comparing a -> Comparing a
descendingOrder
              (Comparing
  (Either
     FetchDecline
     (ProbabilityBand, ChainSuffix header, [AnchoredFragment header]))
-> (Either
      FetchDecline
      (ProbabilityBand, ChainSuffix header, [AnchoredFragment header]),
    extra)
-> (Either
      FetchDecline
      (ProbabilityBand, ChainSuffix header, [AnchoredFragment header]),
    extra)
-> Ordering
forall a b. Comparing a -> Comparing (a, b)
comparingFst
                (Comparing
  (ProbabilityBand, ChainSuffix header, [AnchoredFragment header])
-> Comparing
     (Either
        FetchDecline
        (ProbabilityBand, ChainSuffix header, [AnchoredFragment header]))
forall b a. Comparing b -> Comparing (Either a b)
comparingRight
                  (Comparing ProbabilityBand
-> Comparing (ChainSuffix header)
-> Comparing (ProbabilityBand, ChainSuffix header)
forall a b. Comparing a -> Comparing b -> Comparing (a, b)
comparingPair
                     -- compare on probability band first, then preferred chain
                     Comparing ProbabilityBand
forall a. Ord a => a -> a -> Ordering
compare
                     (AnchoredFragment header -> AnchoredFragment header -> Ordering
compareCandidateChains (AnchoredFragment header -> AnchoredFragment header -> Ordering)
-> (ChainSuffix header -> AnchoredFragment header)
-> Comparing (ChainSuffix header)
forall b c a. (b -> b -> c) -> (a -> b) -> a -> a -> c
`on` ChainSuffix header -> AnchoredFragment header
forall header. ChainSuffix header -> AnchoredFragment header
getChainSuffix)
                   Comparing (ProbabilityBand, ChainSuffix header)
-> ((ProbabilityBand, ChainSuffix header,
     [AnchoredFragment header])
    -> (ProbabilityBand, ChainSuffix header))
-> Comparing
     (ProbabilityBand, ChainSuffix header, [AnchoredFragment header])
forall b c a. (b -> b -> c) -> (a -> b) -> a -> a -> c
`on`
                      (\(ProbabilityBand
band, ChainSuffix header
chain, [AnchoredFragment header]
_fragments) -> (ProbabilityBand
band, ChainSuffix header
chain))))))
  ([(Either
     FetchDecline
     (ProbabilityBand, ChainSuffix header, [AnchoredFragment header]),
   extra)]
 -> [(Either
        FetchDecline
        (ProbabilityBand, ChainSuffix header, [AnchoredFragment header]),
      extra)])
-> ([(FetchDecision (CandidateFragments header),
      PeerFetchInFlight header, PeerGSV, peer, extra)]
    -> [(Either
           FetchDecline
           (ProbabilityBand, ChainSuffix header, [AnchoredFragment header]),
         extra)])
-> [(FetchDecision (CandidateFragments header),
     PeerFetchInFlight header, PeerGSV, peer, extra)]
-> [(Either
       FetchDecline
       (ProbabilityBand, ChainSuffix header, [AnchoredFragment header]),
     extra)]
forall b c a. (b -> c) -> (a -> b) -> a -> c
. ((FetchDecision (CandidateFragments header),
  PeerFetchInFlight header, PeerGSV, peer, extra)
 -> (Either
       FetchDecline
       (ProbabilityBand, ChainSuffix header, [AnchoredFragment header]),
     extra))
-> [(FetchDecision (CandidateFragments header),
     PeerFetchInFlight header, PeerGSV, peer, extra)]
-> [(Either
       FetchDecline
       (ProbabilityBand, ChainSuffix header, [AnchoredFragment header]),
     extra)]
forall a b. (a -> b) -> [a] -> [b]
map (FetchDecision (CandidateFragments header),
 PeerFetchInFlight header, PeerGSV, peer, extra)
-> (Either
      FetchDecline
      (ProbabilityBand, ChainSuffix header, [AnchoredFragment header]),
    extra)
annotateProbabilityBand
  ([(FetchDecision (CandidateFragments header),
   PeerFetchInFlight header, PeerGSV, peer, extra)]
 -> [(Either
        FetchDecline
        (ProbabilityBand, ChainSuffix header, [AnchoredFragment header]),
      extra)])
-> ([(FetchDecision (CandidateFragments header),
      PeerFetchInFlight header, PeerGSV, peer, extra)]
    -> [(FetchDecision (CandidateFragments header),
         PeerFetchInFlight header, PeerGSV, peer, extra)])
-> [(FetchDecision (CandidateFragments header),
     PeerFetchInFlight header, PeerGSV, peer, extra)]
-> [(Either
       FetchDecline
       (ProbabilityBand, ChainSuffix header, [AnchoredFragment header]),
     extra)]
forall b c a. (b -> c) -> (a -> b) -> a -> c
. ((FetchDecision (CandidateFragments header),
  PeerFetchInFlight header, PeerGSV, peer, extra)
 -> (FetchDecision (CandidateFragments header),
     PeerFetchInFlight header, PeerGSV, peer, extra)
 -> Ordering)
-> [(FetchDecision (CandidateFragments header),
     PeerFetchInFlight header, PeerGSV, peer, extra)]
-> [(FetchDecision (CandidateFragments header),
     PeerFetchInFlight header, PeerGSV, peer, extra)]
forall a. (a -> a -> Ordering) -> [a] -> [a]
sortBy (\(FetchDecision (CandidateFragments header)
_,PeerFetchInFlight header
_,PeerGSV
a,peer
ap,extra
_) (FetchDecision (CandidateFragments header)
_,PeerFetchInFlight header
_,PeerGSV
b,peer
bp,extra
_) ->
      Int -> (PeerGSV, peer) -> (PeerGSV, peer) -> Ordering
forall peer.
(Hashable peer, Ord peer) =>
Int -> (PeerGSV, peer) -> (PeerGSV, peer) -> Ordering
comparePeerGSV' Int
salt (PeerGSV
a,peer
ap) (PeerGSV
b,peer
bp))
  where
    annotateProbabilityBand :: (FetchDecision (CandidateFragments header),
 PeerFetchInFlight header, PeerGSV, peer, extra)
-> (Either
      FetchDecline
      (ProbabilityBand, ChainSuffix header, [AnchoredFragment header]),
    extra)
annotateProbabilityBand (Left FetchDecline
decline, PeerFetchInFlight header
_, PeerGSV
_, peer
_, extra
peer) = (FetchDecline
-> Either
     FetchDecline
     (ProbabilityBand, ChainSuffix header, [AnchoredFragment header])
forall a b. a -> Either a b
Left FetchDecline
decline, extra
peer)
    annotateProbabilityBand (Right (ChainSuffix header
chain,[AnchoredFragment header]
fragments), PeerFetchInFlight header
inflight, PeerGSV
gsvs, peer
_, extra
peer) =
        ((ProbabilityBand, ChainSuffix header, [AnchoredFragment header])
-> Either
     FetchDecline
     (ProbabilityBand, ChainSuffix header, [AnchoredFragment header])
forall a b. b -> Either a b
Right (ProbabilityBand
band, ChainSuffix header
chain, [AnchoredFragment header]
fragments), extra
peer)
      where
        band :: ProbabilityBand
band = Double -> ProbabilityBand
probabilityBand (Double -> ProbabilityBand) -> Double -> ProbabilityBand
forall a b. (a -> b) -> a -> b
$
                 PeerGSV -> SizeInBytes -> SizeInBytes -> DiffTime -> Double
estimateResponseDeadlineProbability
                   PeerGSV
gsvs
                   (PeerFetchInFlight header -> SizeInBytes
forall header. PeerFetchInFlight header -> SizeInBytes
peerFetchBytesInFlight PeerFetchInFlight header
inflight)
                   ((header -> SizeInBytes) -> [AnchoredFragment header] -> SizeInBytes
forall header.
(header -> SizeInBytes) -> [AnchoredFragment header] -> SizeInBytes
totalFetchSize header -> SizeInBytes
blockFetchSize [AnchoredFragment header]
fragments)
                   DiffTime
deadline

    deadline :: DiffTime
deadline = DiffTime
2 -- seconds -- TODO: get this from external info

    equateCandidateChains :: AnchoredFragment header -> AnchoredFragment header -> Bool
equateCandidateChains AnchoredFragment header
chain1 AnchoredFragment header
chain2
      | Ordering
EQ <- AnchoredFragment header -> AnchoredFragment header -> Ordering
compareCandidateChains AnchoredFragment header
chain1 AnchoredFragment header
chain2 = Bool
True
      | Bool
otherwise                                  = Bool
False

    chainHeadPoint :: (a, ChainSuffix block, c) -> Point block
chainHeadPoint (a
_,ChainSuffix AnchoredFragment block
c,c
_) = AnchoredFragment block -> Point block
forall block.
HasHeader block =>
AnchoredFragment block -> Point block
AF.headPoint AnchoredFragment block
c

prioritisePeerChains FetchMode
FetchModeBulkSync Int
salt AnchoredFragment header -> AnchoredFragment header -> Ordering
compareCandidateChains header -> SizeInBytes
blockFetchSize =
    ((Either
    FetchDecline
    (DiffTime, ChainSuffix header, [AnchoredFragment header]),
  extra)
 -> (FetchDecision [AnchoredFragment header], extra))
-> [(Either
       FetchDecline
       (DiffTime, ChainSuffix header, [AnchoredFragment header]),
     extra)]
-> [(FetchDecision [AnchoredFragment header], extra)]
forall a b. (a -> b) -> [a] -> [b]
map (\(Either
  FetchDecline
  (DiffTime, ChainSuffix header, [AnchoredFragment header])
decision, extra
peer) ->
            (((DiffTime, ChainSuffix header, [AnchoredFragment header])
 -> [AnchoredFragment header])
-> Either
     FetchDecline
     (DiffTime, ChainSuffix header, [AnchoredFragment header])
-> FetchDecision [AnchoredFragment header]
forall a b.
(a -> b) -> Either FetchDecline a -> Either FetchDecline b
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
fmap (\(DiffTime
_, ChainSuffix header
_, [AnchoredFragment header]
fragment) -> [AnchoredFragment header]
fragment) Either
  FetchDecline
  (DiffTime, ChainSuffix header, [AnchoredFragment header])
decision, extra
peer))
  ([(Either
     FetchDecline
     (DiffTime, ChainSuffix header, [AnchoredFragment header]),
   extra)]
 -> [(FetchDecision [AnchoredFragment header], extra)])
-> ([(FetchDecision (CandidateFragments header),
      PeerFetchInFlight header, PeerGSV, peer, extra)]
    -> [(Either
           FetchDecline
           (DiffTime, ChainSuffix header, [AnchoredFragment header]),
         extra)])
-> [(FetchDecision (CandidateFragments header),
     PeerFetchInFlight header, PeerGSV, peer, extra)]
-> [(FetchDecision [AnchoredFragment header], extra)]
forall b c a. (b -> c) -> (a -> b) -> a -> c
. ((Either
    FetchDecline
    (DiffTime, ChainSuffix header, [AnchoredFragment header]),
  extra)
 -> (Either
       FetchDecline
       (DiffTime, ChainSuffix header, [AnchoredFragment header]),
     extra)
 -> Ordering)
-> [(Either
       FetchDecline
       (DiffTime, ChainSuffix header, [AnchoredFragment header]),
     extra)]
-> [(Either
       FetchDecline
       (DiffTime, ChainSuffix header, [AnchoredFragment header]),
     extra)]
forall a. (a -> a -> Ordering) -> [a] -> [a]
sortBy (Comparing
  (Either
     FetchDecline
     (DiffTime, ChainSuffix header, [AnchoredFragment header]))
-> (Either
      FetchDecline
      (DiffTime, ChainSuffix header, [AnchoredFragment header]),
    extra)
-> (Either
      FetchDecline
      (DiffTime, ChainSuffix header, [AnchoredFragment header]),
    extra)
-> Ordering
forall a b. Comparing a -> Comparing (a, b)
comparingFst
             (Comparing (DiffTime, ChainSuffix header, [AnchoredFragment header])
-> Comparing
     (Either
        FetchDecline
        (DiffTime, ChainSuffix header, [AnchoredFragment header]))
forall b a. Comparing b -> Comparing (Either a b)
comparingRight
               (Comparing (ChainSuffix header)
-> Comparing DiffTime -> Comparing (ChainSuffix header, DiffTime)
forall a b. Comparing a -> Comparing b -> Comparing (a, b)
comparingPair
                  -- compare on preferred chain first, then duration
                  (AnchoredFragment header -> AnchoredFragment header -> Ordering
compareCandidateChains (AnchoredFragment header -> AnchoredFragment header -> Ordering)
-> (ChainSuffix header -> AnchoredFragment header)
-> Comparing (ChainSuffix header)
forall b c a. (b -> b -> c) -> (a -> b) -> a -> a -> c
`on` ChainSuffix header -> AnchoredFragment header
forall header. ChainSuffix header -> AnchoredFragment header
getChainSuffix)
                  Comparing DiffTime
forall a. Ord a => a -> a -> Ordering
compare
                Comparing (ChainSuffix header, DiffTime)
-> ((DiffTime, ChainSuffix header, [AnchoredFragment header])
    -> (ChainSuffix header, DiffTime))
-> Comparing
     (DiffTime, ChainSuffix header, [AnchoredFragment header])
forall b c a. (b -> b -> c) -> (a -> b) -> a -> a -> c
`on`
                  (\(DiffTime
duration, ChainSuffix header
chain, [AnchoredFragment header]
_fragments) -> (ChainSuffix header
chain, DiffTime
duration)))))
  ([(Either
     FetchDecline
     (DiffTime, ChainSuffix header, [AnchoredFragment header]),
   extra)]
 -> [(Either
        FetchDecline
        (DiffTime, ChainSuffix header, [AnchoredFragment header]),
      extra)])
-> ([(FetchDecision (CandidateFragments header),
      PeerFetchInFlight header, PeerGSV, peer, extra)]
    -> [(Either
           FetchDecline
           (DiffTime, ChainSuffix header, [AnchoredFragment header]),
         extra)])
-> [(FetchDecision (CandidateFragments header),
     PeerFetchInFlight header, PeerGSV, peer, extra)]
-> [(Either
       FetchDecline
       (DiffTime, ChainSuffix header, [AnchoredFragment header]),
     extra)]
forall b c a. (b -> c) -> (a -> b) -> a -> c
. ((FetchDecision (CandidateFragments header),
  PeerFetchInFlight header, PeerGSV, peer, extra)
 -> (Either
       FetchDecline
       (DiffTime, ChainSuffix header, [AnchoredFragment header]),
     extra))
-> [(FetchDecision (CandidateFragments header),
     PeerFetchInFlight header, PeerGSV, peer, extra)]
-> [(Either
       FetchDecline
       (DiffTime, ChainSuffix header, [AnchoredFragment header]),
     extra)]
forall a b. (a -> b) -> [a] -> [b]
map (FetchDecision (CandidateFragments header),
 PeerFetchInFlight header, PeerGSV, peer, extra)
-> (Either
      FetchDecline
      (DiffTime, ChainSuffix header, [AnchoredFragment header]),
    extra)
annotateDuration
  ([(FetchDecision (CandidateFragments header),
   PeerFetchInFlight header, PeerGSV, peer, extra)]
 -> [(Either
        FetchDecline
        (DiffTime, ChainSuffix header, [AnchoredFragment header]),
      extra)])
-> ([(FetchDecision (CandidateFragments header),
      PeerFetchInFlight header, PeerGSV, peer, extra)]
    -> [(FetchDecision (CandidateFragments header),
         PeerFetchInFlight header, PeerGSV, peer, extra)])
-> [(FetchDecision (CandidateFragments header),
     PeerFetchInFlight header, PeerGSV, peer, extra)]
-> [(Either
       FetchDecline
       (DiffTime, ChainSuffix header, [AnchoredFragment header]),
     extra)]
forall b c a. (b -> c) -> (a -> b) -> a -> c
. ((FetchDecision (CandidateFragments header),
  PeerFetchInFlight header, PeerGSV, peer, extra)
 -> (FetchDecision (CandidateFragments header),
     PeerFetchInFlight header, PeerGSV, peer, extra)
 -> Ordering)
-> [(FetchDecision (CandidateFragments header),
     PeerFetchInFlight header, PeerGSV, peer, extra)]
-> [(FetchDecision (CandidateFragments header),
     PeerFetchInFlight header, PeerGSV, peer, extra)]
forall a. (a -> a -> Ordering) -> [a] -> [a]
sortBy (\(FetchDecision (CandidateFragments header)
_,PeerFetchInFlight header
_,PeerGSV
a,peer
ap,extra
_) (FetchDecision (CandidateFragments header)
_,PeerFetchInFlight header
_,PeerGSV
b,peer
bp,extra
_) ->
      Int -> (PeerGSV, peer) -> (PeerGSV, peer) -> Ordering
forall peer.
(Hashable peer, Ord peer) =>
Int -> (PeerGSV, peer) -> (PeerGSV, peer) -> Ordering
comparePeerGSV' Int
salt (PeerGSV
a,peer
ap) (PeerGSV
b,peer
bp))
  where
    annotateDuration :: (FetchDecision (CandidateFragments header),
 PeerFetchInFlight header, PeerGSV, peer, extra)
-> (Either
      FetchDecline
      (DiffTime, ChainSuffix header, [AnchoredFragment header]),
    extra)
annotateDuration (Left FetchDecline
decline, PeerFetchInFlight header
_, PeerGSV
_, peer
_, extra
peer) = (FetchDecline
-> Either
     FetchDecline
     (DiffTime, ChainSuffix header, [AnchoredFragment header])
forall a b. a -> Either a b
Left FetchDecline
decline, extra
peer)
    annotateDuration (Right (ChainSuffix header
chain,[AnchoredFragment header]
fragments), PeerFetchInFlight header
inflight, PeerGSV
gsvs, peer
_, extra
peer) =
        ((DiffTime, ChainSuffix header, [AnchoredFragment header])
-> Either
     FetchDecline
     (DiffTime, ChainSuffix header, [AnchoredFragment header])
forall a b. b -> Either a b
Right (DiffTime
duration, ChainSuffix header
chain, [AnchoredFragment header]
fragments), extra
peer)
      where
        -- TODO: consider if we should put this into bands rather than just
        -- taking the full value.
        duration :: DiffTime
duration = PeerGSV -> SizeInBytes -> SizeInBytes -> DiffTime
estimateExpectedResponseDuration
                     PeerGSV
gsvs
                     (PeerFetchInFlight header -> SizeInBytes
forall header. PeerFetchInFlight header -> SizeInBytes
peerFetchBytesInFlight PeerFetchInFlight header
inflight)
                     ((header -> SizeInBytes) -> [AnchoredFragment header] -> SizeInBytes
forall header.
(header -> SizeInBytes) -> [AnchoredFragment header] -> SizeInBytes
totalFetchSize header -> SizeInBytes
blockFetchSize [AnchoredFragment header]
fragments)

totalFetchSize :: (header -> SizeInBytes)
               -> [AnchoredFragment header]
               -> SizeInBytes
totalFetchSize :: forall header.
(header -> SizeInBytes) -> [AnchoredFragment header] -> SizeInBytes
totalFetchSize header -> SizeInBytes
blockFetchSize [AnchoredFragment header]
fragments =
  [SizeInBytes] -> SizeInBytes
forall a. Num a => [a] -> a
forall (t :: * -> *) a. (Foldable t, Num a) => t a -> a
sum [ header -> SizeInBytes
blockFetchSize header
header
      | AnchoredFragment header
fragment <- [AnchoredFragment header]
fragments
      , header
header   <- AnchoredFragment header -> [header]
forall v a b. AnchoredSeq v a b -> [b]
AF.toOldestFirst AnchoredFragment header
fragment ]

type Comparing a = a -> a -> Ordering
type Equating  a = a -> a -> Bool

descendingOrder :: Comparing a -> Comparing a
descendingOrder :: forall a. Comparing a -> Comparing a
descendingOrder Comparing a
cmp = Comparing a -> Comparing a
forall a b c. (a -> b -> c) -> b -> a -> c
flip Comparing a
cmp

comparingPair :: Comparing a -> Comparing b -> Comparing (a, b)
comparingPair :: forall a b. Comparing a -> Comparing b -> Comparing (a, b)
comparingPair Comparing a
cmpA Comparing b
cmpB (a
a1, b
b1) (a
a2, b
b2) = Comparing a
cmpA a
a1 a
a2 Ordering -> Ordering -> Ordering
forall a. Semigroup a => a -> a -> a
<> Comparing b
cmpB b
b1 b
b2

equatingPair :: Equating a -> Equating b -> Equating (a, b)
equatingPair :: forall a b. Equating a -> Equating b -> Equating (a, b)
equatingPair Equating a
eqA Equating b
eqB (a
a1, b
b1) (a
a2, b
b2) = Equating a
eqA a
a1 a
a2 Bool -> Bool -> Bool
&& Equating b
eqB b
b1 b
b2

comparingEither :: Comparing a -> Comparing b -> Comparing (Either a b)
comparingEither :: forall a b. Comparing a -> Comparing b -> Comparing (Either a b)
comparingEither Comparing a
_ Comparing b
_    (Left  a
_) (Right b
_) = Ordering
LT
comparingEither Comparing a
cmpA Comparing b
_ (Left  a
x) (Left  a
y) = Comparing a
cmpA a
x a
y
comparingEither Comparing a
_ Comparing b
cmpB (Right b
x) (Right b
y) = Comparing b
cmpB b
x b
y
comparingEither Comparing a
_ Comparing b
_    (Right b
_) (Left  a
_) = Ordering
GT

equatingEither :: Equating a -> Equating b -> Equating (Either a b)
equatingEither :: forall a b. Equating a -> Equating b -> Equating (Either a b)
equatingEither Equating a
_ Equating b
_   (Left  a
_) (Right b
_) = Bool
False
equatingEither Equating a
eqA Equating b
_ (Left  a
x) (Left  a
y) = Equating a
eqA a
x a
y
equatingEither Equating a
_ Equating b
eqB (Right b
x) (Right b
y) = Equating b
eqB b
x b
y
equatingEither Equating a
_ Equating b
_   (Right b
_) (Left  a
_) = Bool
False

comparingFst :: Comparing a -> Comparing (a, b)
comparingFst :: forall a b. Comparing a -> Comparing (a, b)
comparingFst Comparing a
cmp = Comparing a
cmp Comparing a -> ((a, b) -> a) -> (a, b) -> (a, b) -> Ordering
forall b c a. (b -> b -> c) -> (a -> b) -> a -> a -> c
`on` (a, b) -> a
forall a b. (a, b) -> a
fst

equatingFst :: Equating a -> Equating (a, b)
equatingFst :: forall a b. Equating a -> Equating (a, b)
equatingFst Equating a
eq = Equating a
eq Equating a -> ((a, b) -> a) -> (a, b) -> (a, b) -> Bool
forall b c a. (b -> b -> c) -> (a -> b) -> a -> a -> c
`on` (a, b) -> a
forall a b. (a, b) -> a
fst

comparingRight :: Comparing b -> Comparing (Either a b)
comparingRight :: forall b a. Comparing b -> Comparing (Either a b)
comparingRight = Comparing a -> Comparing b -> Comparing (Either a b)
forall a b. Comparing a -> Comparing b -> Comparing (Either a b)
comparingEither Comparing a
forall a. Monoid a => a
mempty

equatingRight :: Equating b -> Equating (Either a b)
equatingRight :: forall b a. Equating b -> Equating (Either a b)
equatingRight = Equating a -> Equating b -> Equating (Either a b)
forall a b. Equating a -> Equating b -> Equating (Either a b)
equatingEither (\a
_ a
_ -> Bool
True)

-- | Given the probability of the download completing within the deadline,
-- classify that into one of three broad bands: high, medium and low.
--
-- The bands are
--
-- * high:    98% -- 100%
-- * medium:  75% --  98%
-- * low:      0% --  75%
--
probabilityBand :: Double -> ProbabilityBand
probabilityBand :: Double -> ProbabilityBand
probabilityBand Double
p
  | Double
p Double -> Double -> Bool
forall a. Ord a => a -> a -> Bool
> Double
0.98  = ProbabilityBand
ProbabilityHigh
  | Double
p Double -> Double -> Bool
forall a. Ord a => a -> a -> Bool
> Double
0.75  = ProbabilityBand
ProbabilityModerate
  | Bool
otherwise = ProbabilityBand
ProbabilityLow
 -- TODO: for hysteresis, increase probability if we're already using this peer

data ProbabilityBand = ProbabilityLow
                     | ProbabilityModerate
                     | ProbabilityHigh
  deriving (Equating ProbabilityBand
Equating ProbabilityBand
-> Equating ProbabilityBand -> Eq ProbabilityBand
forall a. (a -> a -> Bool) -> (a -> a -> Bool) -> Eq a
$c== :: Equating ProbabilityBand
== :: Equating ProbabilityBand
$c/= :: Equating ProbabilityBand
/= :: Equating ProbabilityBand
Eq, Eq ProbabilityBand
Eq ProbabilityBand =>
Comparing ProbabilityBand
-> Equating ProbabilityBand
-> Equating ProbabilityBand
-> Equating ProbabilityBand
-> Equating ProbabilityBand
-> (ProbabilityBand -> ProbabilityBand -> ProbabilityBand)
-> (ProbabilityBand -> ProbabilityBand -> ProbabilityBand)
-> Ord ProbabilityBand
Equating ProbabilityBand
Comparing ProbabilityBand
ProbabilityBand -> ProbabilityBand -> ProbabilityBand
forall a.
Eq a =>
(a -> a -> Ordering)
-> (a -> a -> Bool)
-> (a -> a -> Bool)
-> (a -> a -> Bool)
-> (a -> a -> Bool)
-> (a -> a -> a)
-> (a -> a -> a)
-> Ord a
$ccompare :: Comparing ProbabilityBand
compare :: Comparing ProbabilityBand
$c< :: Equating ProbabilityBand
< :: Equating ProbabilityBand
$c<= :: Equating ProbabilityBand
<= :: Equating ProbabilityBand
$c> :: Equating ProbabilityBand
> :: Equating ProbabilityBand
$c>= :: Equating ProbabilityBand
>= :: Equating ProbabilityBand
$cmax :: ProbabilityBand -> ProbabilityBand -> ProbabilityBand
max :: ProbabilityBand -> ProbabilityBand -> ProbabilityBand
$cmin :: ProbabilityBand -> ProbabilityBand -> ProbabilityBand
min :: ProbabilityBand -> ProbabilityBand -> ProbabilityBand
Ord, Int -> ProbabilityBand -> ShowS
[ProbabilityBand] -> ShowS
ProbabilityBand -> String
(Int -> ProbabilityBand -> ShowS)
-> (ProbabilityBand -> String)
-> ([ProbabilityBand] -> ShowS)
-> Show ProbabilityBand
forall a.
(Int -> a -> ShowS) -> (a -> String) -> ([a] -> ShowS) -> Show a
$cshowsPrec :: Int -> ProbabilityBand -> ShowS
showsPrec :: Int -> ProbabilityBand -> ShowS
$cshow :: ProbabilityBand -> String
show :: ProbabilityBand -> String
$cshowList :: [ProbabilityBand] -> ShowS
showList :: [ProbabilityBand] -> ShowS
Show)


{-
In the second phase we walk over the prioritised fetch suffixes for each peer
and make a decision about whether we should initiate any new fetch requests.

This decision is based on a number of factors:

 * Is the fetch suffix empty? If so, there's nothing to do.
 * Do we already have block fetch requests in flight with this peer?
 * If so are we under the maximum number of in-flight blocks for this peer?
 * Is this peer still performing within expectations or has it missed any soft
   time outs?
 * Has the peer missed any hard timeouts or otherwise been disconnected.
 * Are we at our soft or hard limit of the number of peers we are prepared to
   fetch blocks from concurrently?

We look at each peer chain fetch suffix one by one. Of course decisions we
make earlier can affect decisions later, in particular the number of peers we
fetch from concurrently can increase if we fetch from a new peer, and we must
obviously take that into account when considering later peer chains.
-}


fetchRequestDecisions
  :: forall extra header peer.
      ( Hashable peer
      , HasHeader header
      , Ord peer
      )
  => FetchDecisionPolicy header
  -> FetchMode
  -> [( FetchDecision [AnchoredFragment header]
      , PeerFetchStatus header
      , PeerFetchInFlight header
      , PeerGSV
      , peer
      , extra)]
  -> [(FetchDecision (FetchRequest header), extra)]
fetchRequestDecisions :: forall extra header peer.
(Hashable peer, HasHeader header, Ord peer) =>
FetchDecisionPolicy header
-> FetchMode
-> [(FetchDecision [AnchoredFragment header],
     PeerFetchStatus header, PeerFetchInFlight header, PeerGSV, peer,
     extra)]
-> [(FetchDecision (FetchRequest header), extra)]
fetchRequestDecisions FetchDecisionPolicy header
fetchDecisionPolicy FetchMode
fetchMode [(FetchDecision [AnchoredFragment header], PeerFetchStatus header,
  PeerFetchInFlight header, PeerGSV, peer, extra)]
chains =
    Word
-> Set (Point header)
-> MaxSlotNo
-> [(FetchDecision [AnchoredFragment header],
     PeerFetchStatus header, PeerFetchInFlight header, PeerGSV, peer,
     extra)]
-> [(FetchDecision (FetchRequest header), extra)]
go Word
nConcurrentFetchPeers0 Set (Point header)
forall a. Set a
Set.empty MaxSlotNo
NoMaxSlotNo [(FetchDecision [AnchoredFragment header], PeerFetchStatus header,
  PeerFetchInFlight header, PeerGSV, peer, extra)]
chains
  where
    go :: Word
       -> Set (Point header)
       -> MaxSlotNo
       -> [(Either FetchDecline [AnchoredFragment header],
            PeerFetchStatus header, PeerFetchInFlight header, PeerGSV, peer, extra)]
       -> [(FetchDecision (FetchRequest header), extra)]
    go :: Word
-> Set (Point header)
-> MaxSlotNo
-> [(FetchDecision [AnchoredFragment header],
     PeerFetchStatus header, PeerFetchInFlight header, PeerGSV, peer,
     extra)]
-> [(FetchDecision (FetchRequest header), extra)]
go !Word
_ !Set (Point header)
_ !MaxSlotNo
_ [] = []
    go !Word
nConcurrentFetchPeers !Set (Point header)
blocksFetchedThisRound !MaxSlotNo
maxSlotNoFetchedThisRound
       ((FetchDecision [AnchoredFragment header]
mchainfragments, PeerFetchStatus header
status, PeerFetchInFlight header
inflight, PeerGSV
gsvs, peer
peer, extra
extra) : [(FetchDecision [AnchoredFragment header], PeerFetchStatus header,
  PeerFetchInFlight header, PeerGSV, peer, extra)]
cps) =

        (FetchDecision (FetchRequest header)
decision, extra
extra)
      (FetchDecision (FetchRequest header), extra)
-> [(FetchDecision (FetchRequest header), extra)]
-> [(FetchDecision (FetchRequest header), extra)]
forall a. a -> [a] -> [a]
: Word
-> Set (Point header)
-> MaxSlotNo
-> [(FetchDecision [AnchoredFragment header],
     PeerFetchStatus header, PeerFetchInFlight header, PeerGSV, peer,
     extra)]
-> [(FetchDecision (FetchRequest header), extra)]
go Word
nConcurrentFetchPeers' Set (Point header)
blocksFetchedThisRound'
           MaxSlotNo
maxSlotNoFetchedThisRound' [(FetchDecision [AnchoredFragment header], PeerFetchStatus header,
  PeerFetchInFlight header, PeerGSV, peer, extra)]
cps
      where
        decision :: FetchDecision (FetchRequest header)
decision = FetchDecisionPolicy header
-> FetchMode
-> Word
-> PeerFetchInFlightLimits
-> PeerFetchInFlight header
-> PeerFetchStatus header
-> FetchDecision [AnchoredFragment header]
-> FetchDecision (FetchRequest header)
forall header.
HasHeader header =>
FetchDecisionPolicy header
-> FetchMode
-> Word
-> PeerFetchInFlightLimits
-> PeerFetchInFlight header
-> PeerFetchStatus header
-> FetchDecision [AnchoredFragment header]
-> FetchDecision (FetchRequest header)
fetchRequestDecision
                     FetchDecisionPolicy header
fetchDecisionPolicy
                     FetchMode
fetchMode
                     -- Permit the preferred peers to by pass any concurrency limits.
                     (if peer -> [peer] -> Bool
forall a. Eq a => a -> [a] -> Bool
forall (t :: * -> *) a. (Foldable t, Eq a) => a -> t a -> Bool
elem peer
peer [peer]
nPreferedPeers then Word
0
                                                  else Word
nConcurrentFetchPeers)
                     (PeerGSV -> PeerFetchInFlightLimits
calculatePeerFetchInFlightLimits PeerGSV
gsvs)
                     PeerFetchInFlight header
inflight
                     PeerFetchStatus header
status
                     FetchDecision [AnchoredFragment header]
mchainfragments'

        mchainfragments' :: FetchDecision [AnchoredFragment header]
mchainfragments' =
          case FetchMode
fetchMode of
            FetchMode
FetchModeDeadline -> FetchDecision [AnchoredFragment header]
mchainfragments
            FetchMode
FetchModeBulkSync -> do
                chainfragments <- FetchDecision [AnchoredFragment header]
mchainfragments
                let fragments =
                      (AnchoredFragment header -> [AnchoredFragment header])
-> [AnchoredFragment header] -> [AnchoredFragment header]
forall (t :: * -> *) a b. Foldable t => (a -> [b]) -> t a -> [b]
concatMap ((header -> Bool)
-> MaxSlotNo
-> AnchoredFragment header
-> [AnchoredFragment header]
forall header.
HasHeader header =>
(header -> Bool)
-> MaxSlotNo
-> AnchoredFragment header
-> [AnchoredFragment header]
filterWithMaxSlotNo
                                   header -> Bool
notFetchedThisRound
                                   MaxSlotNo
maxSlotNoFetchedThisRound)
                                [AnchoredFragment header]
chainfragments
                guard (not (null fragments)) ?! FetchDeclineInFlightOtherPeer
                return fragments
              where
                notFetchedThisRound :: header -> Bool
notFetchedThisRound header
h =
                  header -> Point header
forall block. HasHeader block => block -> Point block
blockPoint header
h Point header -> Set (Point header) -> Bool
forall a. Ord a => a -> Set a -> Bool
`Set.notMember` Set (Point header)
blocksFetchedThisRound

        nConcurrentFetchPeers' :: Word
nConcurrentFetchPeers'
          -- increment if it was idle, and now will not be
          | PeerFetchInFlight header -> Word
forall header. PeerFetchInFlight header -> Word
peerFetchReqsInFlight PeerFetchInFlight header
inflight Word -> Word -> Bool
forall a. Eq a => a -> a -> Bool
== Word
0
          , Right{} <- FetchDecision (FetchRequest header)
decision = Word
nConcurrentFetchPeers Word -> Word -> Word
forall a. Num a => a -> a -> a
+ Word
1
          | Bool
otherwise           = Word
nConcurrentFetchPeers

        -- This is only for avoiding duplication between fetch requests in this
        -- round of decisions. Avoiding duplication with blocks that are already
        -- in flight is handled by filterNotAlreadyInFlightWithOtherPeers
        (Set (Point header)
blocksFetchedThisRound', MaxSlotNo
maxSlotNoFetchedThisRound') =
          case FetchDecision (FetchRequest header)
decision of
            Left FetchDecline
_                         ->
              (Set (Point header)
blocksFetchedThisRound, MaxSlotNo
maxSlotNoFetchedThisRound)
            Right (FetchRequest [AnchoredFragment header]
fragments) ->
              (Set (Point header)
blocksFetchedThisRound Set (Point header) -> Set (Point header) -> Set (Point header)
forall a. Ord a => Set a -> Set a -> Set a
`Set.union` Set (Point header)
blocksFetchedThisDecision,
               MaxSlotNo
maxSlotNoFetchedThisRound MaxSlotNo -> MaxSlotNo -> MaxSlotNo
forall a. Ord a => a -> a -> a
`max` MaxSlotNo
maxSlotNoFetchedThisDecision)
              where
                maxSlotNoFetchedThisDecision :: MaxSlotNo
maxSlotNoFetchedThisDecision =
                  (MaxSlotNo -> MaxSlotNo -> MaxSlotNo)
-> MaxSlotNo -> [MaxSlotNo] -> MaxSlotNo
forall b a. (b -> a -> b) -> b -> [a] -> b
forall (t :: * -> *) b a.
Foldable t =>
(b -> a -> b) -> b -> t a -> b
List.foldl' MaxSlotNo -> MaxSlotNo -> MaxSlotNo
forall a. Ord a => a -> a -> a
max MaxSlotNo
NoMaxSlotNo ([MaxSlotNo] -> MaxSlotNo) -> [MaxSlotNo] -> MaxSlotNo
forall a b. (a -> b) -> a -> b
$ (SlotNo -> MaxSlotNo) -> [SlotNo] -> [MaxSlotNo]
forall a b. (a -> b) -> [a] -> [b]
map SlotNo -> MaxSlotNo
MaxSlotNo ([SlotNo] -> [MaxSlotNo]) -> [SlotNo] -> [MaxSlotNo]
forall a b. (a -> b) -> a -> b
$
                  (AnchoredFragment header -> Maybe SlotNo)
-> [AnchoredFragment header] -> [SlotNo]
forall a b. (a -> Maybe b) -> [a] -> [b]
mapMaybe (WithOrigin SlotNo -> Maybe SlotNo
forall t. WithOrigin t -> Maybe t
withOriginToMaybe (WithOrigin SlotNo -> Maybe SlotNo)
-> (AnchoredFragment header -> WithOrigin SlotNo)
-> AnchoredFragment header
-> Maybe SlotNo
forall b c a. (b -> c) -> (a -> b) -> a -> c
. AnchoredFragment header -> WithOrigin SlotNo
forall block.
HasHeader block =>
AnchoredFragment block -> WithOrigin SlotNo
AF.headSlot) [AnchoredFragment header]
fragments

                blocksFetchedThisDecision :: Set (Point header)
blocksFetchedThisDecision =
                  [Point header] -> Set (Point header)
forall a. Ord a => [a] -> Set a
Set.fromList
                    [ header -> Point header
forall block. HasHeader block => block -> Point block
blockPoint header
header
                    | AnchoredFragment header
fragment <- [AnchoredFragment header]
fragments
                    , header
header   <- AnchoredFragment header -> [header]
forall v a b. AnchoredSeq v a b -> [b]
AF.toOldestFirst AnchoredFragment header
fragment ]

    nConcurrentFetchPeers0 :: Word
nConcurrentFetchPeers0 = Int -> Word
forall a b. (Integral a, Num b) => a -> b
fromIntegral (Int -> Word) -> Int -> Word
forall a b. (a -> b) -> a -> b
$ Set peer -> Int
forall a. Set a -> Int
Set.size Set peer
nActivePeers

    -- Set of peers with outstanding bytes.
    nActivePeers :: Set peer
    nActivePeers :: Set peer
nActivePeers =
        [peer] -> Set peer
forall a. Ord a => [a] -> Set a
Set.fromList
      ([peer] -> Set peer)
-> ([(FetchDecision [AnchoredFragment header],
      PeerFetchStatus header, PeerFetchInFlight header, PeerGSV, peer,
      extra)]
    -> [peer])
-> [(FetchDecision [AnchoredFragment header],
     PeerFetchStatus header, PeerFetchInFlight header, PeerGSV, peer,
     extra)]
-> Set peer
forall b c a. (b -> c) -> (a -> b) -> a -> c
. ((Word, peer) -> peer) -> [(Word, peer)] -> [peer]
forall a b. (a -> b) -> [a] -> [b]
map (Word, peer) -> peer
forall a b. (a, b) -> b
snd
      ([(Word, peer)] -> [peer])
-> ([(FetchDecision [AnchoredFragment header],
      PeerFetchStatus header, PeerFetchInFlight header, PeerGSV, peer,
      extra)]
    -> [(Word, peer)])
-> [(FetchDecision [AnchoredFragment header],
     PeerFetchStatus header, PeerFetchInFlight header, PeerGSV, peer,
     extra)]
-> [peer]
forall b c a. (b -> c) -> (a -> b) -> a -> c
. ((Word, peer) -> Bool) -> [(Word, peer)] -> [(Word, peer)]
forall a. (a -> Bool) -> [a] -> [a]
filter (\(Word
inFlight, peer
_) -> Word
inFlight Word -> Word -> Bool
forall a. Ord a => a -> a -> Bool
> Word
0)
      ([(Word, peer)] -> [(Word, peer)])
-> ([(FetchDecision [AnchoredFragment header],
      PeerFetchStatus header, PeerFetchInFlight header, PeerGSV, peer,
      extra)]
    -> [(Word, peer)])
-> [(FetchDecision [AnchoredFragment header],
     PeerFetchStatus header, PeerFetchInFlight header, PeerGSV, peer,
     extra)]
-> [(Word, peer)]
forall b c a. (b -> c) -> (a -> b) -> a -> c
. ((FetchDecision [AnchoredFragment header], PeerFetchStatus header,
  PeerFetchInFlight header, PeerGSV, peer, extra)
 -> (Word, peer))
-> [(FetchDecision [AnchoredFragment header],
     PeerFetchStatus header, PeerFetchInFlight header, PeerGSV, peer,
     extra)]
-> [(Word, peer)]
forall a b. (a -> b) -> [a] -> [b]
map (\(FetchDecision [AnchoredFragment header]
_, PeerFetchStatus header
_, PeerFetchInFlight{Word
peerFetchReqsInFlight :: forall header. PeerFetchInFlight header -> Word
peerFetchReqsInFlight :: Word
peerFetchReqsInFlight}, PeerGSV
_, peer
p, extra
_) ->
                       (Word
peerFetchReqsInFlight, peer
p))
      ([(FetchDecision [AnchoredFragment header], PeerFetchStatus header,
   PeerFetchInFlight header, PeerGSV, peer, extra)]
 -> Set peer)
-> [(FetchDecision [AnchoredFragment header],
     PeerFetchStatus header, PeerFetchInFlight header, PeerGSV, peer,
     extra)]
-> Set peer
forall a b. (a -> b) -> a -> b
$ [(FetchDecision [AnchoredFragment header], PeerFetchStatus header,
  PeerFetchInFlight header, PeerGSV, peer, extra)]
chains

    -- Order the peers based on current PeerGSV. The top performing peers will be
    -- permitted to go active even if we're above the desired maxConcurrentFetchPeers
    -- which will cause us to switch smoothly from a slower to faster peers.
    -- When switching from slow to faster peers we will be over the configured limit, but
    -- PeerGSV is expected to be updated rather infrequently so the set of preferred peers should
    -- be stable during 10s of seconds.
    nPreferedPeers :: [peer]
    nPreferedPeers :: [peer]
nPreferedPeers =
        ((PeerGSV, peer) -> peer) -> [(PeerGSV, peer)] -> [peer]
forall a b. (a -> b) -> [a] -> [b]
map (PeerGSV, peer) -> peer
forall a b. (a, b) -> b
snd
      ([(PeerGSV, peer)] -> [peer])
-> ([(FetchDecision [AnchoredFragment header],
      PeerFetchStatus header, PeerFetchInFlight header, PeerGSV, peer,
      extra)]
    -> [(PeerGSV, peer)])
-> [(FetchDecision [AnchoredFragment header],
     PeerFetchStatus header, PeerFetchInFlight header, PeerGSV, peer,
     extra)]
-> [peer]
forall b c a. (b -> c) -> (a -> b) -> a -> c
. Int -> [(PeerGSV, peer)] -> [(PeerGSV, peer)]
forall a. Int -> [a] -> [a]
take (Word -> Int
forall a b. (Integral a, Num b) => a -> b
fromIntegral Word
maxConcurrentFetchPeers)
      ([(PeerGSV, peer)] -> [(PeerGSV, peer)])
-> ([(FetchDecision [AnchoredFragment header],
      PeerFetchStatus header, PeerFetchInFlight header, PeerGSV, peer,
      extra)]
    -> [(PeerGSV, peer)])
-> [(FetchDecision [AnchoredFragment header],
     PeerFetchStatus header, PeerFetchInFlight header, PeerGSV, peer,
     extra)]
-> [(PeerGSV, peer)]
forall b c a. (b -> c) -> (a -> b) -> a -> c
. ((PeerGSV, peer) -> (PeerGSV, peer) -> Ordering)
-> [(PeerGSV, peer)] -> [(PeerGSV, peer)]
forall a. (a -> a -> Ordering) -> [a] -> [a]
sortBy (\(PeerGSV, peer)
a (PeerGSV, peer)
b -> Set peer -> Int -> (PeerGSV, peer) -> (PeerGSV, peer) -> Ordering
forall peer.
(Hashable peer, Ord peer) =>
Set peer -> Int -> (PeerGSV, peer) -> (PeerGSV, peer) -> Ordering
comparePeerGSV Set peer
nActivePeers (FetchDecisionPolicy header -> Int
forall header. FetchDecisionPolicy header -> Int
peerSalt FetchDecisionPolicy header
fetchDecisionPolicy) (PeerGSV, peer)
a (PeerGSV, peer)
b)
      ([(PeerGSV, peer)] -> [(PeerGSV, peer)])
-> ([(FetchDecision [AnchoredFragment header],
      PeerFetchStatus header, PeerFetchInFlight header, PeerGSV, peer,
      extra)]
    -> [(PeerGSV, peer)])
-> [(FetchDecision [AnchoredFragment header],
     PeerFetchStatus header, PeerFetchInFlight header, PeerGSV, peer,
     extra)]
-> [(PeerGSV, peer)]
forall b c a. (b -> c) -> (a -> b) -> a -> c
. ((FetchDecision [AnchoredFragment header], PeerFetchStatus header,
  PeerFetchInFlight header, PeerGSV, peer, extra)
 -> (PeerGSV, peer))
-> [(FetchDecision [AnchoredFragment header],
     PeerFetchStatus header, PeerFetchInFlight header, PeerGSV, peer,
     extra)]
-> [(PeerGSV, peer)]
forall a b. (a -> b) -> [a] -> [b]
map (\(FetchDecision [AnchoredFragment header]
_, PeerFetchStatus header
_, PeerFetchInFlight header
_, PeerGSV
gsv, peer
p, extra
_) -> (PeerGSV
gsv, peer
p))
      ([(FetchDecision [AnchoredFragment header], PeerFetchStatus header,
   PeerFetchInFlight header, PeerGSV, peer, extra)]
 -> [peer])
-> [(FetchDecision [AnchoredFragment header],
     PeerFetchStatus header, PeerFetchInFlight header, PeerGSV, peer,
     extra)]
-> [peer]
forall a b. (a -> b) -> a -> b
$ [(FetchDecision [AnchoredFragment header], PeerFetchStatus header,
  PeerFetchInFlight header, PeerGSV, peer, extra)]
chains

    maxConcurrentFetchPeers :: Word
    maxConcurrentFetchPeers :: Word
maxConcurrentFetchPeers =
      case FetchMode
fetchMode of
           FetchMode
FetchModeBulkSync -> FetchDecisionPolicy header -> Word
forall header. FetchDecisionPolicy header -> Word
maxConcurrencyBulkSync FetchDecisionPolicy header
fetchDecisionPolicy
           FetchMode
FetchModeDeadline -> FetchDecisionPolicy header -> Word
forall header. FetchDecisionPolicy header -> Word
maxConcurrencyDeadline FetchDecisionPolicy header
fetchDecisionPolicy


fetchRequestDecision
  :: HasHeader header
  => FetchDecisionPolicy header
  -> FetchMode
  -> Word
  -> PeerFetchInFlightLimits
  -> PeerFetchInFlight header
  -> PeerFetchStatus header
  -> FetchDecision [AnchoredFragment header]
  -> FetchDecision (FetchRequest  header)

fetchRequestDecision :: forall header.
HasHeader header =>
FetchDecisionPolicy header
-> FetchMode
-> Word
-> PeerFetchInFlightLimits
-> PeerFetchInFlight header
-> PeerFetchStatus header
-> FetchDecision [AnchoredFragment header]
-> FetchDecision (FetchRequest header)
fetchRequestDecision FetchDecisionPolicy header
_ FetchMode
_ Word
_ PeerFetchInFlightLimits
_ PeerFetchInFlight header
_ PeerFetchStatus header
_ (Left FetchDecline
decline)
  = FetchDecline -> Either FetchDecline (FetchRequest header)
forall a b. a -> Either a b
Left FetchDecline
decline

fetchRequestDecision FetchDecisionPolicy header
_ FetchMode
_ Word
_ PeerFetchInFlightLimits
_ PeerFetchInFlight header
_ PeerFetchStatus header
PeerFetchStatusShutdown Either FetchDecline [AnchoredFragment header]
_
  = FetchDecline -> Either FetchDecline (FetchRequest header)
forall a b. a -> Either a b
Left FetchDecline
FetchDeclinePeerShutdown

fetchRequestDecision FetchDecisionPolicy header
_ FetchMode
_ Word
_ PeerFetchInFlightLimits
_ PeerFetchInFlight header
_ PeerFetchStatus header
PeerFetchStatusStarting Either FetchDecline [AnchoredFragment header]
_
  = FetchDecline -> Either FetchDecline (FetchRequest header)
forall a b. a -> Either a b
Left FetchDecline
FetchDeclinePeerStarting

fetchRequestDecision FetchDecisionPolicy header
_ FetchMode
_ Word
_ PeerFetchInFlightLimits
_ PeerFetchInFlight header
_ PeerFetchStatus header
PeerFetchStatusAberrant Either FetchDecline [AnchoredFragment header]
_
  = FetchDecline -> Either FetchDecline (FetchRequest header)
forall a b. a -> Either a b
Left FetchDecline
FetchDeclinePeerSlow

fetchRequestDecision FetchDecisionPolicy {
                       Word
maxConcurrencyBulkSync :: forall header. FetchDecisionPolicy header -> Word
maxConcurrencyBulkSync :: Word
maxConcurrencyBulkSync,
                       Word
maxConcurrencyDeadline :: forall header. FetchDecisionPolicy header -> Word
maxConcurrencyDeadline :: Word
maxConcurrencyDeadline,
                       Word
maxInFlightReqsPerPeer :: forall header. FetchDecisionPolicy header -> Word
maxInFlightReqsPerPeer :: Word
maxInFlightReqsPerPeer,
                       header -> SizeInBytes
blockFetchSize :: forall header. FetchDecisionPolicy header -> header -> SizeInBytes
blockFetchSize :: header -> SizeInBytes
blockFetchSize
                     }
                     FetchMode
fetchMode
                     Word
nConcurrentFetchPeers
                     PeerFetchInFlightLimits {
                       SizeInBytes
inFlightBytesLowWatermark :: PeerFetchInFlightLimits -> SizeInBytes
inFlightBytesLowWatermark :: SizeInBytes
inFlightBytesLowWatermark,
                       SizeInBytes
inFlightBytesHighWatermark :: PeerFetchInFlightLimits -> SizeInBytes
inFlightBytesHighWatermark :: SizeInBytes
inFlightBytesHighWatermark
                     }
                     PeerFetchInFlight {
                       Word
peerFetchReqsInFlight :: forall header. PeerFetchInFlight header -> Word
peerFetchReqsInFlight :: Word
peerFetchReqsInFlight,
                       SizeInBytes
peerFetchBytesInFlight :: forall header. PeerFetchInFlight header -> SizeInBytes
peerFetchBytesInFlight :: SizeInBytes
peerFetchBytesInFlight
                     }
                     PeerFetchStatus header
peerFetchStatus
                     (Right [AnchoredFragment header]
fetchFragments)

  | Word
peerFetchReqsInFlight Word -> Word -> Bool
forall a. Ord a => a -> a -> Bool
>= Word
maxInFlightReqsPerPeer
  = FetchDecline -> Either FetchDecline (FetchRequest header)
forall a b. a -> Either a b
Left (FetchDecline -> Either FetchDecline (FetchRequest header))
-> FetchDecline -> Either FetchDecline (FetchRequest header)
forall a b. (a -> b) -> a -> b
$ Word -> FetchDecline
FetchDeclineReqsInFlightLimit
             Word
maxInFlightReqsPerPeer

  | SizeInBytes
peerFetchBytesInFlight SizeInBytes -> SizeInBytes -> Bool
forall a. Ord a => a -> a -> Bool
>= SizeInBytes
inFlightBytesHighWatermark
  = FetchDecline -> Either FetchDecline (FetchRequest header)
forall a b. a -> Either a b
Left (FetchDecline -> Either FetchDecline (FetchRequest header))
-> FetchDecline -> Either FetchDecline (FetchRequest header)
forall a b. (a -> b) -> a -> b
$ SizeInBytes -> SizeInBytes -> SizeInBytes -> FetchDecline
FetchDeclineBytesInFlightLimit
             SizeInBytes
peerFetchBytesInFlight
             SizeInBytes
inFlightBytesLowWatermark
             SizeInBytes
inFlightBytesHighWatermark

    -- This covers the case when we could still fit in more reqs or bytes, but
    -- we want to let it drop below a low water mark before sending more so we
    -- get a bit more batching behaviour, rather than lots of 1-block reqs.
  | PeerFetchStatus header
peerFetchStatus PeerFetchStatus header -> PeerFetchStatus header -> Bool
forall a. Eq a => a -> a -> Bool
== PeerFetchStatus header
forall header. PeerFetchStatus header
PeerFetchStatusBusy
  = FetchDecline -> Either FetchDecline (FetchRequest header)
forall a b. a -> Either a b
Left (FetchDecline -> Either FetchDecline (FetchRequest header))
-> FetchDecline -> Either FetchDecline (FetchRequest header)
forall a b. (a -> b) -> a -> b
$ SizeInBytes -> SizeInBytes -> SizeInBytes -> FetchDecline
FetchDeclinePeerBusy
             SizeInBytes
peerFetchBytesInFlight
             SizeInBytes
inFlightBytesLowWatermark
             SizeInBytes
inFlightBytesHighWatermark

    -- Refuse any blockrequest if we're above the concurrency limit.
  | let maxConcurrentFetchPeers :: Word
maxConcurrentFetchPeers = case FetchMode
fetchMode of
                                    FetchMode
FetchModeBulkSync -> Word
maxConcurrencyBulkSync
                                    FetchMode
FetchModeDeadline -> Word
maxConcurrencyDeadline
  , Word
nConcurrentFetchPeers Word -> Word -> Bool
forall a. Ord a => a -> a -> Bool
> Word
maxConcurrentFetchPeers
  = FetchDecline -> Either FetchDecline (FetchRequest header)
forall a b. a -> Either a b
Left (FetchDecline -> Either FetchDecline (FetchRequest header))
-> FetchDecline -> Either FetchDecline (FetchRequest header)
forall a b. (a -> b) -> a -> b
$ FetchMode -> Word -> FetchDecline
FetchDeclineConcurrencyLimit
             FetchMode
fetchMode Word
maxConcurrentFetchPeers

    -- If we're at the concurrency limit refuse any additional peers.
  | Word
peerFetchReqsInFlight Word -> Word -> Bool
forall a. Eq a => a -> a -> Bool
== Word
0
  , let maxConcurrentFetchPeers :: Word
maxConcurrentFetchPeers = case FetchMode
fetchMode of
                                    FetchMode
FetchModeBulkSync -> Word
maxConcurrencyBulkSync
                                    FetchMode
FetchModeDeadline -> Word
maxConcurrencyDeadline
  , Word
nConcurrentFetchPeers Word -> Word -> Bool
forall a. Eq a => a -> a -> Bool
== Word
maxConcurrentFetchPeers
  = FetchDecline -> Either FetchDecline (FetchRequest header)
forall a b. a -> Either a b
Left (FetchDecline -> Either FetchDecline (FetchRequest header))
-> FetchDecline -> Either FetchDecline (FetchRequest header)
forall a b. (a -> b) -> a -> b
$ FetchMode -> Word -> FetchDecline
FetchDeclineConcurrencyLimit
             FetchMode
fetchMode Word
maxConcurrentFetchPeers

    -- We've checked our request limit and our byte limit. We are then
    -- guaranteed to get at least one non-empty request range.
  | Bool
otherwise
  = Bool
-> Either FetchDecline (FetchRequest header)
-> Either FetchDecline (FetchRequest header)
forall a. HasCallStack => Bool -> a -> a
assert (Word
peerFetchReqsInFlight Word -> Word -> Bool
forall a. Ord a => a -> a -> Bool
< Word
maxInFlightReqsPerPeer) (Either FetchDecline (FetchRequest header)
 -> Either FetchDecline (FetchRequest header))
-> Either FetchDecline (FetchRequest header)
-> Either FetchDecline (FetchRequest header)
forall a b. (a -> b) -> a -> b
$
    Bool
-> Either FetchDecline (FetchRequest header)
-> Either FetchDecline (FetchRequest header)
forall a. HasCallStack => Bool -> a -> a
assert (Bool -> Bool
not ([AnchoredFragment header] -> Bool
forall a. [a] -> Bool
forall (t :: * -> *) a. Foldable t => t a -> Bool
null [AnchoredFragment header]
fetchFragments)) (Either FetchDecline (FetchRequest header)
 -> Either FetchDecline (FetchRequest header))
-> Either FetchDecline (FetchRequest header)
-> Either FetchDecline (FetchRequest header)
forall a b. (a -> b) -> a -> b
$

    FetchRequest header -> Either FetchDecline (FetchRequest header)
forall a b. b -> Either a b
Right (FetchRequest header -> Either FetchDecline (FetchRequest header))
-> FetchRequest header -> Either FetchDecline (FetchRequest header)
forall a b. (a -> b) -> a -> b
$ (header -> SizeInBytes)
-> Word
-> Word
-> SizeInBytes
-> SizeInBytes
-> [AnchoredFragment header]
-> FetchRequest header
forall header.
HasHeader header =>
(header -> SizeInBytes)
-> Word
-> Word
-> SizeInBytes
-> SizeInBytes
-> [AnchoredFragment header]
-> FetchRequest header
selectBlocksUpToLimits
              header -> SizeInBytes
blockFetchSize
              Word
peerFetchReqsInFlight
              Word
maxInFlightReqsPerPeer
              SizeInBytes
peerFetchBytesInFlight
              SizeInBytes
inFlightBytesHighWatermark
              [AnchoredFragment header]
fetchFragments


-- |
--
-- Precondition: The result will be non-empty if
--
-- Property: result is non-empty if preconditions satisfied
--
selectBlocksUpToLimits
  :: forall header. HasHeader header
  => (header -> SizeInBytes) -- ^ Block body size
  -> Word -- ^ Current number of requests in flight
  -> Word -- ^ Maximum number of requests in flight allowed
  -> SizeInBytes -- ^ Current number of bytes in flight
  -> SizeInBytes -- ^ Maximum number of bytes in flight allowed
  -> [AnchoredFragment header]
  -> FetchRequest header
selectBlocksUpToLimits :: forall header.
HasHeader header =>
(header -> SizeInBytes)
-> Word
-> Word
-> SizeInBytes
-> SizeInBytes
-> [AnchoredFragment header]
-> FetchRequest header
selectBlocksUpToLimits header -> SizeInBytes
blockFetchSize Word
nreqs0 Word
maxreqs SizeInBytes
nbytes0 SizeInBytes
maxbytes [AnchoredFragment header]
fragments =
    Bool -> FetchRequest header -> FetchRequest header
forall a. HasCallStack => Bool -> a -> a
assert (Word
nreqs0 Word -> Word -> Bool
forall a. Ord a => a -> a -> Bool
< Word
maxreqs Bool -> Bool -> Bool
&& SizeInBytes
nbytes0 SizeInBytes -> SizeInBytes -> Bool
forall a. Ord a => a -> a -> Bool
< SizeInBytes
maxbytes Bool -> Bool -> Bool
&& Bool -> Bool
not ([AnchoredFragment header] -> Bool
forall a. [a] -> Bool
forall (t :: * -> *) a. Foldable t => t a -> Bool
null [AnchoredFragment header]
fragments)) (FetchRequest header -> FetchRequest header)
-> FetchRequest header -> FetchRequest header
forall a b. (a -> b) -> a -> b
$
    -- The case that we are already over our limits has to be checked earlier,
    -- outside of this function. From here on however we check for limits.

    let fragments' :: [AnchoredFragment header]
fragments' = Word
-> SizeInBytes
-> [AnchoredFragment header]
-> [AnchoredFragment header]
goFrags Word
nreqs0 SizeInBytes
nbytes0 [AnchoredFragment header]
fragments in
    Bool -> FetchRequest header -> FetchRequest header
forall a. HasCallStack => Bool -> a -> a
assert ((AnchoredFragment header -> Bool)
-> [AnchoredFragment header] -> Bool
forall (t :: * -> *) a. Foldable t => (a -> Bool) -> t a -> Bool
all (Bool -> Bool
not (Bool -> Bool)
-> (AnchoredFragment header -> Bool)
-> AnchoredFragment header
-> Bool
forall b c a. (b -> c) -> (a -> b) -> a -> c
. AnchoredFragment header -> Bool
forall v a b. AnchoredSeq v a b -> Bool
AF.null) [AnchoredFragment header]
fragments') (FetchRequest header -> FetchRequest header)
-> FetchRequest header -> FetchRequest header
forall a b. (a -> b) -> a -> b
$
    [AnchoredFragment header] -> FetchRequest header
forall header. [AnchoredFragment header] -> FetchRequest header
FetchRequest [AnchoredFragment header]
fragments'
  where
    goFrags :: Word
            -> SizeInBytes
            -> [AnchoredFragment header] -> [AnchoredFragment header]
    goFrags :: Word
-> SizeInBytes
-> [AnchoredFragment header]
-> [AnchoredFragment header]
goFrags Word
_     SizeInBytes
_      []     = []
    goFrags Word
nreqs SizeInBytes
nbytes (AnchoredFragment header
c:[AnchoredFragment header]
cs)
      | Word
nreqsWord -> Word -> Word
forall a. Num a => a -> a -> a
+Word
1  Word -> Word -> Bool
forall a. Ord a => a -> a -> Bool
> Word
maxreqs      = []
      | Bool
otherwise               = Word
-> SizeInBytes
-> AnchoredFragment header
-> AnchoredFragment header
-> [AnchoredFragment header]
-> [AnchoredFragment header]
goFrag (Word
nreqsWord -> Word -> Word
forall a. Num a => a -> a -> a
+Word
1) SizeInBytes
nbytes (Anchor header -> AnchoredFragment header
forall v a b. Anchorable v a b => a -> AnchoredSeq v a b
AF.Empty (AnchoredFragment header -> Anchor header
forall v a b. AnchoredSeq v a b -> a
AF.anchor AnchoredFragment header
c)) AnchoredFragment header
c [AnchoredFragment header]
cs
      -- Each time we have to pick from a new discontiguous chain fragment then
      -- that will become a new request, which contributes to our in-flight
      -- request count. We never break the maxreqs limit.

    goFrag :: Word
           -> SizeInBytes
           -> AnchoredFragment header
           -> AnchoredFragment header
           -> [AnchoredFragment header] -> [AnchoredFragment header]
    goFrag :: Word
-> SizeInBytes
-> AnchoredFragment header
-> AnchoredFragment header
-> [AnchoredFragment header]
-> [AnchoredFragment header]
goFrag Word
nreqs SizeInBytes
nbytes AnchoredFragment header
c' (Empty Anchor header
_) [AnchoredFragment header]
cs = AnchoredFragment header
c' AnchoredFragment header
-> [AnchoredFragment header] -> [AnchoredFragment header]
forall a. a -> [a] -> [a]
: Word
-> SizeInBytes
-> [AnchoredFragment header]
-> [AnchoredFragment header]
goFrags Word
nreqs SizeInBytes
nbytes [AnchoredFragment header]
cs
    goFrag Word
nreqs SizeInBytes
nbytes AnchoredFragment header
c' (header
b :< AnchoredFragment header
c)  [AnchoredFragment header]
cs
      | SizeInBytes
nbytes' SizeInBytes -> SizeInBytes -> Bool
forall a. Ord a => a -> a -> Bool
>= SizeInBytes
maxbytes             = [AnchoredFragment header
c' AnchoredFragment header -> header -> AnchoredFragment header
forall v a b.
Anchorable v a b =>
AnchoredSeq v a b -> b -> AnchoredSeq v a b
:> header
b]
      | Bool
otherwise                       = Word
-> SizeInBytes
-> AnchoredFragment header
-> AnchoredFragment header
-> [AnchoredFragment header]
-> [AnchoredFragment header]
goFrag Word
nreqs SizeInBytes
nbytes' (AnchoredFragment header
c' AnchoredFragment header -> header -> AnchoredFragment header
forall v a b.
Anchorable v a b =>
AnchoredSeq v a b -> b -> AnchoredSeq v a b
:> header
b) AnchoredFragment header
c [AnchoredFragment header]
cs
      where
        nbytes' :: SizeInBytes
nbytes' = SizeInBytes
nbytes SizeInBytes -> SizeInBytes -> SizeInBytes
forall a. Num a => a -> a -> a
+ header -> SizeInBytes
blockFetchSize header
b
      -- Note that we always pick the one last block that crosses the maxbytes
      -- limit. This cover the case where we otherwise wouldn't even be able to
      -- request a single block, as it's too large.