{-# LANGUAGE DataKinds           #-}
{-# LANGUAGE FlexibleContexts    #-}
{-# LANGUAGE NamedFieldPuns      #-}
{-# LANGUAGE Rank2Types          #-}
{-# LANGUAGE ScopedTypeVariables #-}
{-# LANGUAGE TypeFamilies        #-}

module Ouroboros.Network.BlockFetch.Examples
  ( blockFetchExample0
  , blockFetchExample1
  , mockBlockFetchServer1
  , exampleFixedPeerGSVs
  ) where

import Codec.Serialise (Serialise (..))
import Data.ByteString.Lazy qualified as LBS
import Data.List as List (foldl')
import Data.Map (Map)
import Data.Map.Strict qualified as Map
import Data.Maybe (fromMaybe)
import Data.Set (Set)
import Data.Set qualified as Set
import Data.Typeable (Typeable)

import Control.Concurrent.Class.MonadSTM.Strict
import Control.Exception (assert)
import Control.Monad (forever)
import Control.Monad.Class.MonadAsync
import Control.Monad.Class.MonadFork
import Control.Monad.Class.MonadST
import Control.Monad.Class.MonadThrow
import Control.Monad.Class.MonadTime.SI
import Control.Monad.Class.MonadTimer.SI
import Control.Tracer (Tracer, contramap, nullTracer)

import Ouroboros.Network.AnchoredFragment (AnchoredFragment, anchorPoint)
import Ouroboros.Network.AnchoredFragment qualified as AnchoredFragment
import Ouroboros.Network.Block

import Network.TypedProtocol.Core
import Network.TypedProtocol.Pipelined

import Ouroboros.Network.ControlMessage (ControlMessageSTM)

import Ouroboros.Network.BlockFetch
import Ouroboros.Network.BlockFetch.Client
import Ouroboros.Network.Channel
import Ouroboros.Network.DeltaQ
import Ouroboros.Network.Driver
import Ouroboros.Network.NodeToNode (NodeToNodeVersion (..))
import Ouroboros.Network.NodeToNode.Version qualified as NodeToNode
import Ouroboros.Network.Protocol.BlockFetch.Codec
import Ouroboros.Network.Protocol.BlockFetch.Server
import Ouroboros.Network.Protocol.BlockFetch.Type
import Ouroboros.Network.Util.ShowProxy

import Ouroboros.Network.Mock.ConcreteBlock


-- | Run a single block fetch protocol until the chain is downloaded.
--
blockFetchExample0 :: forall m.
                      (MonadSTM m, MonadST m, MonadAsync m, MonadDelay m,
                       MonadFork m, MonadTime m, MonadTimer m, MonadMask m,
                       MonadThrow (STM m))
                   => Tracer m [TraceLabelPeer Int
                                 (FetchDecision [Point BlockHeader])]
                   -> Tracer m (TraceLabelPeer Int
                                 (TraceFetchClientState BlockHeader))
                   -> Tracer m (TraceLabelPeer Int
                                 (TraceSendRecv (BlockFetch Block (Point Block))))
                   -> Maybe DiffTime -- ^ client's channel delay
                   -> Maybe DiffTime -- ^ servers's channel delay
                   -> ControlMessageSTM m
                   -> AnchoredFragment Block -- ^ Fixed current chain
                   -> AnchoredFragment Block -- ^ Fixed candidate chain
                   -> m ()
blockFetchExample0 :: forall (m :: * -> *).
(MonadSTM m, MonadST m, MonadAsync m, MonadDelay m, MonadFork m,
 MonadTime m, MonadTimer m, MonadMask m, MonadThrow (STM m)) =>
Tracer m [TraceLabelPeer Int (FetchDecision [Point BlockHeader])]
-> Tracer
     m (TraceLabelPeer Int (TraceFetchClientState BlockHeader))
-> Tracer
     m
     (TraceLabelPeer
        Int (TraceSendRecv (BlockFetch Block (Point Block))))
-> Maybe DiffTime
-> Maybe DiffTime
-> ControlMessageSTM m
-> AnchoredFragment Block
-> AnchoredFragment Block
-> m ()
blockFetchExample0 Tracer m [TraceLabelPeer Int (FetchDecision [Point BlockHeader])]
decisionTracer Tracer m (TraceLabelPeer Int (TraceFetchClientState BlockHeader))
clientStateTracer Tracer
  m
  (TraceLabelPeer
     Int (TraceSendRecv (BlockFetch Block (Point Block))))
clientMsgTracer
                   Maybe DiffTime
clientDelay Maybe DiffTime
serverDelay
                   ControlMessageSTM m
controlMessageSTM
                   AnchoredFragment Block
currentChain AnchoredFragment Block
candidateChain = do

    registry    <- m (FetchClientRegistry Int BlockHeader Block m)
forall (m :: * -> *) peer header block.
MonadSTM m =>
m (FetchClientRegistry peer header block m)
newFetchClientRegistry :: m (FetchClientRegistry Int BlockHeader Block m)
    blockHeap   <- mkTestFetchedBlockHeap (anchoredChainPoints currentChain)

    (clientAsync, serverAsync, syncClientAsync, keepAliveAsync)
                <- runFetchClientAndServerAsync
                    (contramap (TraceLabelPeer peerno) clientMsgTracer)
                    (contramap (TraceLabelPeer peerno) serverMsgTracer)
                    (maxBound :: NodeToNodeVersion)
                    NodeToNode.isPipeliningEnabled
                    clientDelay serverDelay
                    registry peerno
                    (blockFetchClient NodeToNodeV_7 controlMessageSTM nullTracer)
                    (mockBlockFetchServer1 candidateChain)

    fetchAsync  <- async $ do
      threadId <- myThreadId
      labelThread threadId "block-fetch-logic"
      blockFetch registry blockHeap
    driverAsync <- async $ do
      threadId <- myThreadId
      labelThread threadId "driver"
      driver blockHeap

    -- Order of shutdown here is important for this example: must kill off the
    -- fetch thread before the peer threads.
    _ <- waitAnyCancel $ [ fetchAsync, driverAsync,
                           clientAsync, serverAsync,
                           syncClientAsync, keepAliveAsync]
    return ()

  where
    peerno :: Int
peerno = Int
1

    serverMsgTracer :: Tracer m a
serverMsgTracer = Tracer m a
forall (m :: * -> *) a. Applicative m => Tracer m a
nullTracer

    currentChainHeaders :: AnchoredFragment BlockHeader
currentChainHeaders =
      (Block -> BlockHeader)
-> AnchoredFragment Block -> AnchoredFragment BlockHeader
forall block2 block1.
(HasHeader block2, HeaderHash block1 ~ HeaderHash block2) =>
(block1 -> block2)
-> AnchoredFragment block1 -> AnchoredFragment block2
AnchoredFragment.mapAnchoredFragment Block -> BlockHeader
blockHeader AnchoredFragment Block
currentChain

    candidateChainHeaders :: Map Int (AnchoredFragment BlockHeader)
candidateChainHeaders =
      [(Int, AnchoredFragment BlockHeader)]
-> Map Int (AnchoredFragment BlockHeader)
forall k a. Ord k => [(k, a)] -> Map k a
Map.fromList ([(Int, AnchoredFragment BlockHeader)]
 -> Map Int (AnchoredFragment BlockHeader))
-> [(Int, AnchoredFragment BlockHeader)]
-> Map Int (AnchoredFragment BlockHeader)
forall a b. (a -> b) -> a -> b
$ [Int]
-> [AnchoredFragment BlockHeader]
-> [(Int, AnchoredFragment BlockHeader)]
forall a b. [a] -> [b] -> [(a, b)]
zip [Int
1..] ([AnchoredFragment BlockHeader]
 -> [(Int, AnchoredFragment BlockHeader)])
-> [AnchoredFragment BlockHeader]
-> [(Int, AnchoredFragment BlockHeader)]
forall a b. (a -> b) -> a -> b
$
      (AnchoredFragment Block -> AnchoredFragment BlockHeader)
-> [AnchoredFragment Block] -> [AnchoredFragment BlockHeader]
forall a b. (a -> b) -> [a] -> [b]
map ((Block -> BlockHeader)
-> AnchoredFragment Block -> AnchoredFragment BlockHeader
forall block2 block1.
(HasHeader block2, HeaderHash block1 ~ HeaderHash block2) =>
(block1 -> block2)
-> AnchoredFragment block1 -> AnchoredFragment block2
AnchoredFragment.mapAnchoredFragment Block -> BlockHeader
blockHeader) [AnchoredFragment Block
candidateChain]

    anchoredChainPoints :: AnchoredFragment block -> [Point block]
anchoredChainPoints AnchoredFragment block
c = AnchoredFragment block -> Point block
forall block. AnchoredFragment block -> Point block
anchorPoint AnchoredFragment block
c
                          Point block -> [Point block] -> [Point block]
forall a. a -> [a] -> [a]
: (block -> Point block) -> [block] -> [Point block]
forall a b. (a -> b) -> [a] -> [b]
map block -> Point block
forall block. HasHeader block => block -> Point block
blockPoint (AnchoredFragment block -> [block]
forall v a b. AnchoredSeq v a b -> [b]
AnchoredFragment.toOldestFirst AnchoredFragment block
c)

    blockFetch :: FetchClientRegistry Int BlockHeader Block m
               -> TestFetchedBlockHeap m Block
               -> m ()
    blockFetch :: FetchClientRegistry Int BlockHeader Block m
-> TestFetchedBlockHeap m Block -> m ()
blockFetch FetchClientRegistry Int BlockHeader Block m
registry TestFetchedBlockHeap m Block
blockHeap =
        Tracer m [TraceLabelPeer Int (FetchDecision [Point BlockHeader])]
-> Tracer
     m (TraceLabelPeer Int (TraceFetchClientState BlockHeader))
-> BlockFetchConsensusInterface Int BlockHeader Block m
-> FetchClientRegistry Int BlockHeader Block m
-> BlockFetchConfiguration
-> m Void
forall addr header block (m :: * -> *).
(HasHeader header, HasHeader block,
 HeaderHash header ~ HeaderHash block, MonadDelay m, MonadSTM m,
 Ord addr, Hashable addr) =>
Tracer m [TraceLabelPeer addr (FetchDecision [Point header])]
-> Tracer m (TraceLabelPeer addr (TraceFetchClientState header))
-> BlockFetchConsensusInterface addr header block m
-> FetchClientRegistry addr header block m
-> BlockFetchConfiguration
-> m Void
blockFetchLogic
          Tracer m [TraceLabelPeer Int (FetchDecision [Point BlockHeader])]
decisionTracer Tracer m (TraceLabelPeer Int (TraceFetchClientState BlockHeader))
clientStateTracer
          ((forall x. HasHeader x => FromConsensus x -> STM m UTCTime)
-> TestFetchedBlockHeap m Block
-> AnchoredFragment BlockHeader
-> Map Int (AnchoredFragment BlockHeader)
-> BlockFetchConsensusInterface Int BlockHeader Block m
forall (m :: * -> *) header block peer.
(MonadSTM m, HasHeader header, HasHeader block) =>
(forall x. HasHeader x => FromConsensus x -> STM m UTCTime)
-> TestFetchedBlockHeap m block
-> AnchoredFragment header
-> Map peer (AnchoredFragment header)
-> BlockFetchConsensusInterface peer header block m
sampleBlockFetchPolicy1 FromConsensus x -> STM m UTCTime
forall x. HasHeader x => FromConsensus x -> STM m UTCTime
forall {f :: * -> *} {b}.
(Applicative f, HasHeader b) =>
FromConsensus b -> f UTCTime
headerForgeUTCTime TestFetchedBlockHeap m Block
blockHeap AnchoredFragment BlockHeader
currentChainHeaders Map Int (AnchoredFragment BlockHeader)
candidateChainHeaders)
          FetchClientRegistry Int BlockHeader Block m
registry
          (BlockFetchConfiguration {
            bfcMaxConcurrencyBulkSync :: Word
bfcMaxConcurrencyBulkSync = Word
1,
            bfcMaxConcurrencyDeadline :: Word
bfcMaxConcurrencyDeadline = Word
2,
            bfcMaxRequestsInflight :: Word
bfcMaxRequestsInflight    = Word
10,
            bfcDecisionLoopInterval :: DiffTime
bfcDecisionLoopInterval   = DiffTime
0.01,
            bfcSalt :: Int
bfcSalt                   = Int
0
          })
        m Void -> m () -> m ()
forall a b. m a -> m b -> m b
forall (m :: * -> *) a b. Monad m => m a -> m b -> m b
>> () -> m ()
forall a. a -> m a
forall (m :: * -> *) a. Monad m => a -> m a
return ()

    headerForgeUTCTime :: FromConsensus b -> f UTCTime
headerForgeUTCTime (FromConsensus b
x) =
        UTCTime -> f UTCTime
forall a. a -> f a
forall (f :: * -> *) a. Applicative f => a -> f a
pure (UTCTime -> f UTCTime) -> UTCTime -> f UTCTime
forall a b. (a -> b) -> a -> b
$ SlotNo -> UTCTime
convertSlotToTimeForTestsAssumingNoHardFork (b -> SlotNo
forall b. HasHeader b => b -> SlotNo
blockSlot b
x)

    driver :: TestFetchedBlockHeap m Block -> m ()
    driver :: TestFetchedBlockHeap m Block -> m ()
driver TestFetchedBlockHeap m Block
blockHeap = do
      STM m () -> m ()
forall a. HasCallStack => STM m a -> m a
forall (m :: * -> *) a.
(MonadSTM m, HasCallStack) =>
STM m a -> m a
atomically (STM m () -> m ()) -> STM m () -> m ()
forall a b. (a -> b) -> a -> b
$ do
        heap <- TestFetchedBlockHeap m Block -> STM m (Set (Point Block))
forall (m :: * -> *) block.
TestFetchedBlockHeap m block -> STM m (Set (Point block))
getTestFetchedBlocks TestFetchedBlockHeap m Block
blockHeap
        check $
          all (\AnchoredFragment Block
c -> AnchoredFragment Block -> Point Block
forall block.
HasHeader block =>
AnchoredFragment block -> Point block
AnchoredFragment.headPoint AnchoredFragment Block
c Point Block -> Set (Point Block) -> Bool
forall a. Ord a => a -> Set a -> Bool
`Set.member` Set (Point Block)
heap)
              [candidateChain]


--
-- Sample setups of block fetch logic with fetch clients and peers
--

-- | End to end test of block fetching with fixed chain and candidates.
--
-- The setup is the block fetch logic thread and a bunch of peers each with a
-- chain. The current chain and candidate chains are fixed and the peers never
-- fail or go slowly.
--
-- Run the block fetch until all the chains are downloaded. So this assumes
-- all the candidates do intersect the current chain, and are longer, so we
-- will be interested in downloading them all.
--
blockFetchExample1 :: forall m.
                      (MonadSTM m, MonadST m, MonadAsync m, MonadDelay m,
                       MonadFork m, MonadTime m, MonadTimer m, MonadMask m,
                       MonadThrow (STM m))
                   => Tracer m [TraceLabelPeer Int
                                 (FetchDecision [Point BlockHeader])]
                   -> Tracer m (TraceLabelPeer Int
                                 (TraceFetchClientState BlockHeader))
                   -> Tracer m (TraceLabelPeer Int
                                 (TraceSendRecv (BlockFetch Block (Point Block))))
                   -> Maybe DiffTime -- ^ client's channel delay
                   -> Maybe DiffTime -- ^ server's channel delay
                   -> ControlMessageSTM m
                   -> AnchoredFragment Block   -- ^ Fixed current chain
                   -> [AnchoredFragment Block] -- ^ Fixed candidate chains
                   -> m ()
blockFetchExample1 :: forall (m :: * -> *).
(MonadSTM m, MonadST m, MonadAsync m, MonadDelay m, MonadFork m,
 MonadTime m, MonadTimer m, MonadMask m, MonadThrow (STM m)) =>
Tracer m [TraceLabelPeer Int (FetchDecision [Point BlockHeader])]
-> Tracer
     m (TraceLabelPeer Int (TraceFetchClientState BlockHeader))
-> Tracer
     m
     (TraceLabelPeer
        Int (TraceSendRecv (BlockFetch Block (Point Block))))
-> Maybe DiffTime
-> Maybe DiffTime
-> ControlMessageSTM m
-> AnchoredFragment Block
-> [AnchoredFragment Block]
-> m ()
blockFetchExample1 Tracer m [TraceLabelPeer Int (FetchDecision [Point BlockHeader])]
decisionTracer Tracer m (TraceLabelPeer Int (TraceFetchClientState BlockHeader))
clientStateTracer Tracer
  m
  (TraceLabelPeer
     Int (TraceSendRecv (BlockFetch Block (Point Block))))
clientMsgTracer
                   Maybe DiffTime
clientDelay Maybe DiffTime
serverDelay
                   ControlMessageSTM m
controlMessageSTM
                   AnchoredFragment Block
currentChain [AnchoredFragment Block]
candidateChains = do

    registry    <- m (FetchClientRegistry Int BlockHeader Block m)
forall (m :: * -> *) peer header block.
MonadSTM m =>
m (FetchClientRegistry peer header block m)
newFetchClientRegistry
    blockHeap   <- mkTestFetchedBlockHeap (anchoredChainPoints currentChain)

    peerAsyncs  <- sequence
                    [ runFetchClientAndServerAsync
                        (contramap (TraceLabelPeer peerno) clientMsgTracer)
                        (contramap (TraceLabelPeer peerno) serverMsgTracer)
                        (maxBound :: NodeToNodeVersion)
                        NodeToNode.isPipeliningEnabled
                        clientDelay serverDelay
                        registry peerno
                        (blockFetchClient NodeToNodeV_7 controlMessageSTM nullTracer)
                        (mockBlockFetchServer1 candidateChain)
                    | (peerno, candidateChain) <- zip [1..] candidateChains
                    ]
    fetchAsync  <- async $ do
      threadId <- myThreadId
      labelThread threadId "block-fetch-logic"
      blockFetch registry blockHeap
    driverAsync <- async $ do
      threadId <- myThreadId
      labelThread threadId "block-fetch-driver"
      driver blockHeap

    -- Order of shutdown here is important for this example: must kill off the
    -- fetch thread before the peer threads.
    _ <- waitAnyCancel $ [ fetchAsync, driverAsync ]
                      ++ [ peerAsync
                         | (client, server, sync, ks) <- peerAsyncs
                         , peerAsync <- [client, server, sync, ks] ]
    return ()

  where
    serverMsgTracer :: Tracer m a
serverMsgTracer = Tracer m a
forall (m :: * -> *) a. Applicative m => Tracer m a
nullTracer

    currentChainHeaders :: AnchoredFragment BlockHeader
currentChainHeaders =
      (Block -> BlockHeader)
-> AnchoredFragment Block -> AnchoredFragment BlockHeader
forall block2 block1.
(HasHeader block2, HeaderHash block1 ~ HeaderHash block2) =>
(block1 -> block2)
-> AnchoredFragment block1 -> AnchoredFragment block2
AnchoredFragment.mapAnchoredFragment Block -> BlockHeader
blockHeader AnchoredFragment Block
currentChain

    candidateChainHeaders :: Map Int (AnchoredFragment BlockHeader)
candidateChainHeaders =
      [(Int, AnchoredFragment BlockHeader)]
-> Map Int (AnchoredFragment BlockHeader)
forall k a. Ord k => [(k, a)] -> Map k a
Map.fromList ([(Int, AnchoredFragment BlockHeader)]
 -> Map Int (AnchoredFragment BlockHeader))
-> [(Int, AnchoredFragment BlockHeader)]
-> Map Int (AnchoredFragment BlockHeader)
forall a b. (a -> b) -> a -> b
$ [Int]
-> [AnchoredFragment BlockHeader]
-> [(Int, AnchoredFragment BlockHeader)]
forall a b. [a] -> [b] -> [(a, b)]
zip [Int
1..] ([AnchoredFragment BlockHeader]
 -> [(Int, AnchoredFragment BlockHeader)])
-> [AnchoredFragment BlockHeader]
-> [(Int, AnchoredFragment BlockHeader)]
forall a b. (a -> b) -> a -> b
$
      (AnchoredFragment Block -> AnchoredFragment BlockHeader)
-> [AnchoredFragment Block] -> [AnchoredFragment BlockHeader]
forall a b. (a -> b) -> [a] -> [b]
map ((Block -> BlockHeader)
-> AnchoredFragment Block -> AnchoredFragment BlockHeader
forall block2 block1.
(HasHeader block2, HeaderHash block1 ~ HeaderHash block2) =>
(block1 -> block2)
-> AnchoredFragment block1 -> AnchoredFragment block2
AnchoredFragment.mapAnchoredFragment Block -> BlockHeader
blockHeader) [AnchoredFragment Block]
candidateChains

    anchoredChainPoints :: AnchoredFragment block -> [Point block]
anchoredChainPoints AnchoredFragment block
c = AnchoredFragment block -> Point block
forall block. AnchoredFragment block -> Point block
anchorPoint AnchoredFragment block
c
                          Point block -> [Point block] -> [Point block]
forall a. a -> [a] -> [a]
: (block -> Point block) -> [block] -> [Point block]
forall a b. (a -> b) -> [a] -> [b]
map block -> Point block
forall block. HasHeader block => block -> Point block
blockPoint (AnchoredFragment block -> [block]
forall v a b. AnchoredSeq v a b -> [b]
AnchoredFragment.toOldestFirst AnchoredFragment block
c)

    blockFetch :: FetchClientRegistry Int BlockHeader Block m
               -> TestFetchedBlockHeap m Block
               -> m ()
    blockFetch :: FetchClientRegistry Int BlockHeader Block m
-> TestFetchedBlockHeap m Block -> m ()
blockFetch FetchClientRegistry Int BlockHeader Block m
registry TestFetchedBlockHeap m Block
blockHeap =
        Tracer m [TraceLabelPeer Int (FetchDecision [Point BlockHeader])]
-> Tracer
     m (TraceLabelPeer Int (TraceFetchClientState BlockHeader))
-> BlockFetchConsensusInterface Int BlockHeader Block m
-> FetchClientRegistry Int BlockHeader Block m
-> BlockFetchConfiguration
-> m Void
forall addr header block (m :: * -> *).
(HasHeader header, HasHeader block,
 HeaderHash header ~ HeaderHash block, MonadDelay m, MonadSTM m,
 Ord addr, Hashable addr) =>
Tracer m [TraceLabelPeer addr (FetchDecision [Point header])]
-> Tracer m (TraceLabelPeer addr (TraceFetchClientState header))
-> BlockFetchConsensusInterface addr header block m
-> FetchClientRegistry addr header block m
-> BlockFetchConfiguration
-> m Void
blockFetchLogic
          Tracer m [TraceLabelPeer Int (FetchDecision [Point BlockHeader])]
decisionTracer Tracer m (TraceLabelPeer Int (TraceFetchClientState BlockHeader))
clientStateTracer
          ((forall x. HasHeader x => FromConsensus x -> STM m UTCTime)
-> TestFetchedBlockHeap m Block
-> AnchoredFragment BlockHeader
-> Map Int (AnchoredFragment BlockHeader)
-> BlockFetchConsensusInterface Int BlockHeader Block m
forall (m :: * -> *) header block peer.
(MonadSTM m, HasHeader header, HasHeader block) =>
(forall x. HasHeader x => FromConsensus x -> STM m UTCTime)
-> TestFetchedBlockHeap m block
-> AnchoredFragment header
-> Map peer (AnchoredFragment header)
-> BlockFetchConsensusInterface peer header block m
sampleBlockFetchPolicy1 FromConsensus x -> STM m UTCTime
forall x. HasHeader x => FromConsensus x -> STM m UTCTime
forall {f :: * -> *} {b}.
(Applicative f, HasHeader b) =>
FromConsensus b -> f UTCTime
headerForgeUTCTime TestFetchedBlockHeap m Block
blockHeap AnchoredFragment BlockHeader
currentChainHeaders Map Int (AnchoredFragment BlockHeader)
candidateChainHeaders)
          FetchClientRegistry Int BlockHeader Block m
registry
          (BlockFetchConfiguration {
            bfcMaxConcurrencyBulkSync :: Word
bfcMaxConcurrencyBulkSync = Word
1,
            bfcMaxConcurrencyDeadline :: Word
bfcMaxConcurrencyDeadline = Word
2,
            bfcMaxRequestsInflight :: Word
bfcMaxRequestsInflight    = Word
10,
            bfcDecisionLoopInterval :: DiffTime
bfcDecisionLoopInterval   = DiffTime
0.01,
            bfcSalt :: Int
bfcSalt                   = Int
0
          })
        m Void -> m () -> m ()
forall a b. m a -> m b -> m b
forall (m :: * -> *) a b. Monad m => m a -> m b -> m b
>> () -> m ()
forall a. a -> m a
forall (m :: * -> *) a. Monad m => a -> m a
return ()

    headerForgeUTCTime :: FromConsensus b -> f UTCTime
headerForgeUTCTime (FromConsensus b
x) =
        UTCTime -> f UTCTime
forall a. a -> f a
forall (f :: * -> *) a. Applicative f => a -> f a
pure (UTCTime -> f UTCTime) -> UTCTime -> f UTCTime
forall a b. (a -> b) -> a -> b
$ SlotNo -> UTCTime
convertSlotToTimeForTestsAssumingNoHardFork (b -> SlotNo
forall b. HasHeader b => b -> SlotNo
blockSlot b
x)

    driver :: TestFetchedBlockHeap m Block -> m ()
    driver :: TestFetchedBlockHeap m Block -> m ()
driver TestFetchedBlockHeap m Block
blockHeap = do
      STM m () -> m ()
forall a. HasCallStack => STM m a -> m a
forall (m :: * -> *) a.
(MonadSTM m, HasCallStack) =>
STM m a -> m a
atomically (STM m () -> m ()) -> STM m () -> m ()
forall a b. (a -> b) -> a -> b
$ do
        heap <- TestFetchedBlockHeap m Block -> STM m (Set (Point Block))
forall (m :: * -> *) block.
TestFetchedBlockHeap m block -> STM m (Set (Point block))
getTestFetchedBlocks TestFetchedBlockHeap m Block
blockHeap
        check $
          all (\AnchoredFragment Block
c -> AnchoredFragment Block -> Point Block
forall block.
HasHeader block =>
AnchoredFragment block -> Point block
AnchoredFragment.headPoint AnchoredFragment Block
c Point Block -> Set (Point Block) -> Bool
forall a. Ord a => a -> Set a -> Bool
`Set.member` Set (Point Block)
heap)
              candidateChains


--
-- Sample block fetch configurations
--

sampleBlockFetchPolicy1 :: (MonadSTM m, HasHeader header, HasHeader block)
                        => (forall x. HasHeader x => FromConsensus x -> STM m UTCTime)
                        -> TestFetchedBlockHeap m block
                        -> AnchoredFragment header
                        -> Map peer (AnchoredFragment header)
                        -> BlockFetchConsensusInterface peer header block m
sampleBlockFetchPolicy1 :: forall (m :: * -> *) header block peer.
(MonadSTM m, HasHeader header, HasHeader block) =>
(forall x. HasHeader x => FromConsensus x -> STM m UTCTime)
-> TestFetchedBlockHeap m block
-> AnchoredFragment header
-> Map peer (AnchoredFragment header)
-> BlockFetchConsensusInterface peer header block m
sampleBlockFetchPolicy1 forall x. HasHeader x => FromConsensus x -> STM m UTCTime
headerFieldsForgeUTCTime TestFetchedBlockHeap m block
blockHeap AnchoredFragment header
currentChain Map peer (AnchoredFragment header)
candidateChains =
    BlockFetchConsensusInterface {
      readCandidateChains :: STM m (Map peer (AnchoredFragment header))
readCandidateChains    = Map peer (AnchoredFragment header)
-> STM m (Map peer (AnchoredFragment header))
forall a. a -> STM m a
forall (m :: * -> *) a. Monad m => a -> m a
return Map peer (AnchoredFragment header)
candidateChains,
      readCurrentChain :: STM m (AnchoredFragment header)
readCurrentChain       = AnchoredFragment header -> STM m (AnchoredFragment header)
forall a. a -> STM m a
forall (m :: * -> *) a. Monad m => a -> m a
return AnchoredFragment header
currentChain,
      readFetchMode :: STM m FetchMode
readFetchMode          = FetchMode -> STM m FetchMode
forall a. a -> STM m a
forall (m :: * -> *) a. Monad m => a -> m a
return FetchMode
FetchModeBulkSync,
      readFetchedBlocks :: STM m (Point block -> Bool)
readFetchedBlocks      = (Point block -> Set (Point block) -> Bool)
-> Set (Point block) -> Point block -> Bool
forall a b c. (a -> b -> c) -> b -> a -> c
flip Point block -> Set (Point block) -> Bool
forall a. Ord a => a -> Set a -> Bool
Set.member (Set (Point block) -> Point block -> Bool)
-> STM m (Set (Point block)) -> STM m (Point block -> Bool)
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$>
                                 TestFetchedBlockHeap m block -> STM m (Set (Point block))
forall (m :: * -> *) block.
TestFetchedBlockHeap m block -> STM m (Set (Point block))
getTestFetchedBlocks TestFetchedBlockHeap m block
blockHeap,
      readFetchedMaxSlotNo :: STM m MaxSlotNo
readFetchedMaxSlotNo   = (MaxSlotNo -> MaxSlotNo -> MaxSlotNo)
-> MaxSlotNo -> [MaxSlotNo] -> MaxSlotNo
forall b a. (b -> a -> b) -> b -> [a] -> b
forall (t :: * -> *) b a.
Foldable t =>
(b -> a -> b) -> b -> t a -> b
List.foldl' MaxSlotNo -> MaxSlotNo -> MaxSlotNo
forall a. Ord a => a -> a -> a
max MaxSlotNo
NoMaxSlotNo ([MaxSlotNo] -> MaxSlotNo)
-> (Set (Point block) -> [MaxSlotNo])
-> Set (Point block)
-> MaxSlotNo
forall b c a. (b -> c) -> (a -> b) -> a -> c
.
                               (Point block -> MaxSlotNo) -> [Point block] -> [MaxSlotNo]
forall a b. (a -> b) -> [a] -> [b]
map (WithOrigin SlotNo -> MaxSlotNo
maxSlotNoFromWithOrigin (WithOrigin SlotNo -> MaxSlotNo)
-> (Point block -> WithOrigin SlotNo) -> Point block -> MaxSlotNo
forall b c a. (b -> c) -> (a -> b) -> a -> c
. Point block -> WithOrigin SlotNo
forall {k} (block :: k). Point block -> WithOrigin SlotNo
pointSlot) ([Point block] -> [MaxSlotNo])
-> (Set (Point block) -> [Point block])
-> Set (Point block)
-> [MaxSlotNo]
forall b c a. (b -> c) -> (a -> b) -> a -> c
.
                               Set (Point block) -> [Point block]
forall a. Set a -> [a]
Set.elems (Set (Point block) -> MaxSlotNo)
-> STM m (Set (Point block)) -> STM m MaxSlotNo
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$>
                               TestFetchedBlockHeap m block -> STM m (Set (Point block))
forall (m :: * -> *) block.
TestFetchedBlockHeap m block -> STM m (Set (Point block))
getTestFetchedBlocks TestFetchedBlockHeap m block
blockHeap,
      mkAddFetchedBlock :: WhetherReceivingTentativeBlocks
-> STM m (Point block -> block -> m ())
mkAddFetchedBlock      = \WhetherReceivingTentativeBlocks
_enablePipelining -> (Point block -> block -> m ())
-> STM m (Point block -> block -> m ())
forall a. a -> STM m a
forall (f :: * -> *) a. Applicative f => a -> f a
pure ((Point block -> block -> m ())
 -> STM m (Point block -> block -> m ()))
-> (Point block -> block -> m ())
-> STM m (Point block -> block -> m ())
forall a b. (a -> b) -> a -> b
$ TestFetchedBlockHeap m block -> Point block -> block -> m ()
forall (m :: * -> *) block.
TestFetchedBlockHeap m block -> Point block -> block -> m ()
addTestFetchedBlock TestFetchedBlockHeap m block
blockHeap,

      HasCallStack =>
AnchoredFragment header -> AnchoredFragment header -> Bool
AnchoredFragment header -> AnchoredFragment header -> Bool
forall {block} {block}.
(HasHeader block, HasHeader block) =>
AnchoredFragment block -> AnchoredFragment block -> Bool
plausibleCandidateChain :: forall {block} {block}.
(HasHeader block, HasHeader block) =>
AnchoredFragment block -> AnchoredFragment block -> Bool
plausibleCandidateChain :: HasCallStack =>
AnchoredFragment header -> AnchoredFragment header -> Bool
plausibleCandidateChain,
      HasCallStack =>
AnchoredFragment header -> AnchoredFragment header -> Ordering
AnchoredFragment header -> AnchoredFragment header -> Ordering
forall {block} {block}.
(HasHeader block, HasHeader block) =>
AnchoredFragment block -> AnchoredFragment block -> Ordering
compareCandidateChains :: forall {block} {block}.
(HasHeader block, HasHeader block) =>
AnchoredFragment block -> AnchoredFragment block -> Ordering
compareCandidateChains :: HasCallStack =>
AnchoredFragment header -> AnchoredFragment header -> Ordering
compareCandidateChains,

      blockFetchSize :: header -> SizeInBytes
blockFetchSize         = \header
_ -> SizeInBytes
2000,
      blockMatchesHeader :: header -> block -> Bool
blockMatchesHeader     = \header
_ block
_ -> Bool
True,

      headerForgeUTCTime :: FromConsensus header -> STM m UTCTime
headerForgeUTCTime     = FromConsensus header -> STM m UTCTime
forall x. HasHeader x => FromConsensus x -> STM m UTCTime
headerFieldsForgeUTCTime,
      blockForgeUTCTime :: FromConsensus block -> STM m UTCTime
blockForgeUTCTime      = FromConsensus block -> STM m UTCTime
forall x. HasHeader x => FromConsensus x -> STM m UTCTime
headerFieldsForgeUTCTime
      }
  where
    plausibleCandidateChain :: AnchoredFragment block -> AnchoredFragment block -> Bool
plausibleCandidateChain AnchoredFragment block
cur AnchoredFragment block
candidate =
      AnchoredFragment block -> WithOrigin BlockNo
forall block.
HasHeader block =>
AnchoredFragment block -> WithOrigin BlockNo
AnchoredFragment.headBlockNo AnchoredFragment block
candidate WithOrigin BlockNo -> WithOrigin BlockNo -> Bool
forall a. Ord a => a -> a -> Bool
> AnchoredFragment block -> WithOrigin BlockNo
forall block.
HasHeader block =>
AnchoredFragment block -> WithOrigin BlockNo
AnchoredFragment.headBlockNo AnchoredFragment block
cur

    compareCandidateChains :: AnchoredFragment block -> AnchoredFragment block -> Ordering
compareCandidateChains AnchoredFragment block
c1 AnchoredFragment block
c2 =
      AnchoredFragment block -> WithOrigin BlockNo
forall block.
HasHeader block =>
AnchoredFragment block -> WithOrigin BlockNo
AnchoredFragment.headBlockNo AnchoredFragment block
c1 WithOrigin BlockNo -> WithOrigin BlockNo -> Ordering
forall a. Ord a => a -> a -> Ordering
`compare` AnchoredFragment block -> WithOrigin BlockNo
forall block.
HasHeader block =>
AnchoredFragment block -> WithOrigin BlockNo
AnchoredFragment.headBlockNo AnchoredFragment block
c2

-- | Roughly 10ms ping time and 1MBit\/s bandwidth, leads to ~2200 bytes in
-- flight minimum.
--
exampleFixedPeerGSVs :: PeerGSV
exampleFixedPeerGSVs :: PeerGSV
exampleFixedPeerGSVs =
    PeerGSV{Time
sampleTime :: Time
sampleTime :: Time
sampleTime, GSV
outboundGSV :: GSV
outboundGSV :: GSV
outboundGSV, GSV
inboundGSV :: GSV
inboundGSV :: GSV
inboundGSV}
  where
    inboundGSV :: GSV
inboundGSV  = DiffTime -> DiffTime -> Distribution DiffTime -> GSV
ballisticGSV DiffTime
10e-3 DiffTime
10e-6 (DiffTime -> Distribution DiffTime
forall n. n -> Distribution n
degenerateDistribution DiffTime
0)
    outboundGSV :: GSV
outboundGSV = GSV
inboundGSV
    sampleTime :: Time
sampleTime  = DiffTime -> Time
Time DiffTime
0


--
-- Utils to run fetch clients and servers
--

runFetchClient :: (MonadAsync m, MonadDelay m, MonadFork m, MonadMask m,
                   MonadThrow (STM m), MonadST m, MonadTime m, MonadTimer m,
                   Ord peerid, Serialise block, Serialise point,
                   Typeable block, ShowProxy block)
                => Tracer m (TraceSendRecv (BlockFetch block point))
                -> version
                -> (version -> WhetherReceivingTentativeBlocks)
                -> FetchClientRegistry peerid header block m
                -> peerid
                -> Channel m LBS.ByteString
                -> (  FetchClientContext header block m
                   -> PeerPipelined (BlockFetch block point) AsClient BFIdle m a)
                -> m a
runFetchClient :: forall (m :: * -> *) peerid block point version header a.
(MonadAsync m, MonadDelay m, MonadFork m, MonadMask m,
 MonadThrow (STM m), MonadST m, MonadTime m, MonadTimer m,
 Ord peerid, Serialise block, Serialise point, Typeable block,
 ShowProxy block) =>
Tracer m (TraceSendRecv (BlockFetch block point))
-> version
-> (version -> WhetherReceivingTentativeBlocks)
-> FetchClientRegistry peerid header block m
-> peerid
-> Channel m ByteString
-> (FetchClientContext header block m
    -> PeerPipelined (BlockFetch block point) 'AsClient 'BFIdle m a)
-> m a
runFetchClient Tracer m (TraceSendRecv (BlockFetch block point))
tracer version
version version -> WhetherReceivingTentativeBlocks
isPipeliningEnabled FetchClientRegistry peerid header block m
registry peerid
peerid Channel m ByteString
channel FetchClientContext header block m
-> PeerPipelined (BlockFetch block point) 'AsClient 'BFIdle m a
client =
    FetchClientRegistry peerid header block m
-> version
-> (version -> WhetherReceivingTentativeBlocks)
-> peerid
-> (FetchClientContext header block m -> m a)
-> m a
forall (m :: * -> *) a peer header block version.
(MonadFork m, MonadMask m, MonadTimer m, Ord peer) =>
FetchClientRegistry peer header block m
-> version
-> (version -> WhetherReceivingTentativeBlocks)
-> peer
-> (FetchClientContext header block m -> m a)
-> m a
bracketFetchClient FetchClientRegistry peerid header block m
registry version
version version -> WhetherReceivingTentativeBlocks
isPipeliningEnabled peerid
peerid ((FetchClientContext header block m -> m a) -> m a)
-> (FetchClientContext header block m -> m a) -> m a
forall a b. (a -> b) -> a -> b
$ \FetchClientContext header block m
clientCtx ->
      (a, Maybe ByteString) -> a
forall a b. (a, b) -> a
fst ((a, Maybe ByteString) -> a) -> m (a, Maybe ByteString) -> m a
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$>
        Tracer m (TraceSendRecv (BlockFetch block point))
-> Codec (BlockFetch block point) DeserialiseFailure m ByteString
-> ProtocolSizeLimits (BlockFetch block point) ByteString
-> ProtocolTimeLimits (BlockFetch block point)
-> Channel m ByteString
-> PeerPipelined (BlockFetch block point) 'AsClient 'BFIdle m a
-> m (a, Maybe ByteString)
forall ps (st :: ps) (pr :: PeerRole) failure bytes (m :: * -> *)
       a.
(MonadAsync m, MonadFork m, MonadMask m, MonadThrow (STM m),
 MonadTimer m, forall (st' :: ps). Show (ClientHasAgency st'),
 forall (st' :: ps). Show (ServerHasAgency st'), ShowProxy ps,
 Show failure) =>
Tracer m (TraceSendRecv ps)
-> Codec ps failure m bytes
-> ProtocolSizeLimits ps bytes
-> ProtocolTimeLimits ps
-> Channel m bytes
-> PeerPipelined ps pr st m a
-> m (a, Maybe bytes)
runPipelinedPeerWithLimits Tracer m (TraceSendRecv (BlockFetch block point))
tracer Codec (BlockFetch block point) DeserialiseFailure m ByteString
codec ((ByteString -> Word)
-> ProtocolSizeLimits (BlockFetch block point) ByteString
forall {k} {k1} bytes (block :: k) (point :: k1).
(bytes -> Word)
-> ProtocolSizeLimits (BlockFetch block point) bytes
byteLimitsBlockFetch (Int64 -> Word
forall a b. (Integral a, Num b) => a -> b
fromIntegral (Int64 -> Word) -> (ByteString -> Int64) -> ByteString -> Word
forall b c a. (b -> c) -> (a -> b) -> a -> c
. ByteString -> Int64
LBS.length))
          ProtocolTimeLimits (BlockFetch block point)
forall {k} {k1} (block :: k) (point :: k1).
ProtocolTimeLimits (BlockFetch block point)
timeLimitsBlockFetch Channel m ByteString
channel (FetchClientContext header block m
-> PeerPipelined (BlockFetch block point) 'AsClient 'BFIdle m a
client FetchClientContext header block m
clientCtx)
  where
    codec :: Codec (BlockFetch block point) DeserialiseFailure m ByteString
codec = (block -> Encoding)
-> (forall s. Decoder s block)
-> (point -> Encoding)
-> (forall s. Decoder s point)
-> Codec (BlockFetch block point) DeserialiseFailure m ByteString
forall block point (m :: * -> *).
MonadST m =>
(block -> Encoding)
-> (forall s. Decoder s block)
-> (point -> Encoding)
-> (forall s. Decoder s point)
-> Codec (BlockFetch block point) DeserialiseFailure m ByteString
codecBlockFetch block -> Encoding
forall a. Serialise a => a -> Encoding
encode Decoder s block
forall s. Decoder s block
forall a s. Serialise a => Decoder s a
decode point -> Encoding
forall a. Serialise a => a -> Encoding
encode Decoder s point
forall s. Decoder s point
forall a s. Serialise a => Decoder s a
decode

runFetchServer :: (MonadAsync m, MonadFork m, MonadMask m, MonadThrow (STM m),
                   MonadST m, MonadTime m, MonadTimer m,
                   Serialise block, Serialise point,
                   Typeable block,
                   ShowProxy block)
                => Tracer m (TraceSendRecv (BlockFetch block point))
                -> Channel m LBS.ByteString
                -> BlockFetchServer block point m a
                -> m a
runFetchServer :: forall (m :: * -> *) block point a.
(MonadAsync m, MonadFork m, MonadMask m, MonadThrow (STM m),
 MonadST m, MonadTime m, MonadTimer m, Serialise block,
 Serialise point, Typeable block, ShowProxy block) =>
Tracer m (TraceSendRecv (BlockFetch block point))
-> Channel m ByteString -> BlockFetchServer block point m a -> m a
runFetchServer Tracer m (TraceSendRecv (BlockFetch block point))
tracer Channel m ByteString
channel BlockFetchServer block point m a
server =
    (a, Maybe ByteString) -> a
forall a b. (a, b) -> a
fst ((a, Maybe ByteString) -> a) -> m (a, Maybe ByteString) -> m a
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$>
      Tracer m (TraceSendRecv (BlockFetch block point))
-> Codec (BlockFetch block point) DeserialiseFailure m ByteString
-> ProtocolSizeLimits (BlockFetch block point) ByteString
-> ProtocolTimeLimits (BlockFetch block point)
-> Channel m ByteString
-> Peer (BlockFetch block point) 'AsServer 'BFIdle m a
-> m (a, Maybe ByteString)
forall ps (st :: ps) (pr :: PeerRole) failure bytes (m :: * -> *)
       a.
(MonadAsync m, MonadFork m, MonadMask m, MonadThrow (STM m),
 MonadTimer m, forall (st' :: ps). Show (ClientHasAgency st'),
 forall (st' :: ps). Show (ServerHasAgency st'), ShowProxy ps,
 Show failure) =>
Tracer m (TraceSendRecv ps)
-> Codec ps failure m bytes
-> ProtocolSizeLimits ps bytes
-> ProtocolTimeLimits ps
-> Channel m bytes
-> Peer ps pr st m a
-> m (a, Maybe bytes)
runPeerWithLimits Tracer m (TraceSendRecv (BlockFetch block point))
tracer Codec (BlockFetch block point) DeserialiseFailure m ByteString
codec ((ByteString -> Word)
-> ProtocolSizeLimits (BlockFetch block point) ByteString
forall {k} {k1} bytes (block :: k) (point :: k1).
(bytes -> Word)
-> ProtocolSizeLimits (BlockFetch block point) bytes
byteLimitsBlockFetch (Int64 -> Word
forall a b. (Integral a, Num b) => a -> b
fromIntegral (Int64 -> Word) -> (ByteString -> Int64) -> ByteString -> Word
forall b c a. (b -> c) -> (a -> b) -> a -> c
. ByteString -> Int64
LBS.length))
        ProtocolTimeLimits (BlockFetch block point)
forall {k} {k1} (block :: k) (point :: k1).
ProtocolTimeLimits (BlockFetch block point)
timeLimitsBlockFetch Channel m ByteString
channel (BlockFetchServer block point m a
-> Peer (BlockFetch block point) 'AsServer 'BFIdle m a
forall block point (m :: * -> *) a.
Functor m =>
BlockFetchServer block point m a
-> Peer (BlockFetch block point) 'AsServer 'BFIdle m a
blockFetchServerPeer BlockFetchServer block point m a
server)
  where
    codec :: Codec (BlockFetch block point) DeserialiseFailure m ByteString
codec = (block -> Encoding)
-> (forall s. Decoder s block)
-> (point -> Encoding)
-> (forall s. Decoder s point)
-> Codec (BlockFetch block point) DeserialiseFailure m ByteString
forall block point (m :: * -> *).
MonadST m =>
(block -> Encoding)
-> (forall s. Decoder s block)
-> (point -> Encoding)
-> (forall s. Decoder s point)
-> Codec (BlockFetch block point) DeserialiseFailure m ByteString
codecBlockFetch block -> Encoding
forall a. Serialise a => a -> Encoding
encode Decoder s block
forall s. Decoder s block
forall a s. Serialise a => Decoder s a
decode point -> Encoding
forall a. Serialise a => a -> Encoding
encode Decoder s point
forall s. Decoder s point
forall a s. Serialise a => Decoder s a
decode

runFetchClientAndServerAsync
               :: forall peerid block header version m a b.
                  (MonadAsync m, MonadDelay m, MonadFork m, MonadMask m,
                   MonadThrow (STM m), MonadST m, MonadTime m, MonadTimer m,
                   Ord peerid, Show peerid,
                   Serialise header, Serialise block,
                   Serialise (HeaderHash block),
                   Typeable block,
                   ShowProxy block)
                => Tracer m (TraceSendRecv (BlockFetch block (Point block)))
                -> Tracer m (TraceSendRecv (BlockFetch block (Point block)))
                -> version
                -> (version -> WhetherReceivingTentativeBlocks)
                -- ^ is pipelining enabled function
                -> Maybe DiffTime -- ^ client's channel delay
                -> Maybe DiffTime -- ^ server's channel delay
                -> FetchClientRegistry peerid header block m
                -> peerid
                -> (  FetchClientContext header block m
                   -> PeerPipelined (BlockFetch block (Point block)) AsClient BFIdle m a)
                -> BlockFetchServer block (Point block) m b
                -> m (Async m a, Async m b, Async m (), Async m ())
runFetchClientAndServerAsync :: forall peerid block header version (m :: * -> *) a b.
(MonadAsync m, MonadDelay m, MonadFork m, MonadMask m,
 MonadThrow (STM m), MonadST m, MonadTime m, MonadTimer m,
 Ord peerid, Show peerid, Serialise header, Serialise block,
 Serialise (HeaderHash block), Typeable block, ShowProxy block) =>
Tracer m (TraceSendRecv (BlockFetch block (Point block)))
-> Tracer m (TraceSendRecv (BlockFetch block (Point block)))
-> version
-> (version -> WhetherReceivingTentativeBlocks)
-> Maybe DiffTime
-> Maybe DiffTime
-> FetchClientRegistry peerid header block m
-> peerid
-> (FetchClientContext header block m
    -> PeerPipelined
         (BlockFetch block (Point block)) 'AsClient 'BFIdle m a)
-> BlockFetchServer block (Point block) m b
-> m (Async m a, Async m b, Async m (), Async m ())
runFetchClientAndServerAsync Tracer m (TraceSendRecv (BlockFetch block (Point block)))
clientTracer Tracer m (TraceSendRecv (BlockFetch block (Point block)))
serverTracer
                             version
version      version -> WhetherReceivingTentativeBlocks
isPipeliningEnabled
                             Maybe DiffTime
clientDelay  Maybe DiffTime
serverDelay
                             FetchClientRegistry peerid header block m
registry peerid
peerid FetchClientContext header block m
-> PeerPipelined
     (BlockFetch block (Point block)) 'AsClient 'BFIdle m a
client BlockFetchServer block (Point block) m b
server = do
    (clientChannel, serverChannel) <- m (Channel m ByteString, Channel m ByteString)
forall (m :: * -> *) a. MonadSTM m => m (Channel m a, Channel m a)
createConnectedChannels

    clientAsync <- async $ do
      threadId <- myThreadId
      labelThread threadId ("block-fetch-client-" ++ show peerid)
      runFetchClient
        clientTracer
        version
        isPipeliningEnabled
        registry peerid
        (fromMaybe id (delayChannel <$> clientDelay) clientChannel)
        client

    serverAsync <- async $ do
      threadId <- myThreadId
      labelThread threadId ("block-fetch-server-" ++ show peerid)
      runFetchServer
        serverTracer
        (fromMaybe id (delayChannel <$> serverDelay) serverChannel)
        server

    -- we are tagging messages with the current peerid, not the target
    -- one, this is different than what's intended but it's fine to do that in
    -- these examples;
    syncClientAsync <- async $ do
      threadId <- myThreadId
      labelThread threadId ("registry-" ++ show peerid)
      bracketSyncWithFetchClient
        registry peerid
        (forever (threadDelay 1000) >> return ())
    keepAliveAsync <- async $ do
      threadId <- myThreadId
      labelThread threadId ("keep-alive-" ++ show peerid)
      bracketKeepAliveClient
        registry peerid
        (\StrictTVar m (Map peerid PeerGSV)
_ -> m () -> m Any
forall (f :: * -> *) a b. Applicative f => f a -> f b
forever (DiffTime -> m ()
forall (m :: * -> *). MonadDelay m => DiffTime -> m ()
threadDelay DiffTime
1000) m Any -> m () -> m ()
forall a b. m a -> m b -> m b
forall (m :: * -> *) a b. Monad m => m a -> m b -> m b
>> () -> m ()
forall a. a -> m a
forall (m :: * -> *) a. Monad m => a -> m a
return ())

    return (clientAsync, serverAsync, syncClientAsync, keepAliveAsync)


--
-- Mock block fetch servers
--

-- | A demo server for the block fetch protocol.
--
-- It serves up ranges on a single given 'AnchoredFragment'. It does not
-- simulate any delays, so is not suitable for timing-accurate simulations.
--
mockBlockFetchServer1 :: forall block m.
                        (MonadSTM m, HasHeader block)
                      => AnchoredFragment block
                      -> BlockFetchServer block (Point block) m ()
mockBlockFetchServer1 :: forall block (m :: * -> *).
(MonadSTM m, HasHeader block) =>
AnchoredFragment block -> BlockFetchServer block (Point block) m ()
mockBlockFetchServer1 AnchoredFragment block
chain =
    BlockFetchServer block (Point block) m ()
senderSide
  where
    senderSide :: BlockFetchServer block (Point block) m ()
    senderSide :: BlockFetchServer block (Point block) m ()
senderSide = (ChainRange (Point block)
 -> m (BlockFetchBlockSender block (Point block) m ()))
-> () -> BlockFetchServer block (Point block) m ()
forall point (m :: * -> *) block a.
(ChainRange point -> m (BlockFetchBlockSender block point m a))
-> a -> BlockFetchServer block point m a
BlockFetchServer ChainRange (Point block)
-> m (BlockFetchBlockSender block (Point block) m ())
receiveReq ()

    receiveReq :: ChainRange (Point block)
               -> m (BlockFetchBlockSender block (Point block) m ())
    receiveReq :: ChainRange (Point block)
-> m (BlockFetchBlockSender block (Point block) m ())
receiveReq (ChainRange Point block
lpoint Point block
upoint) =
      -- We can only assert this for tests, not for the real thing.
      Bool
-> m (BlockFetchBlockSender block (Point block) m ())
-> m (BlockFetchBlockSender block (Point block) m ())
forall a. HasCallStack => Bool -> a -> a
assert (Point block -> WithOrigin SlotNo
forall {k} (block :: k). Point block -> WithOrigin SlotNo
pointSlot Point block
lpoint WithOrigin SlotNo -> WithOrigin SlotNo -> Bool
forall a. Ord a => a -> a -> Bool
<= Point block -> WithOrigin SlotNo
forall {k} (block :: k). Point block -> WithOrigin SlotNo
pointSlot Point block
upoint) (m (BlockFetchBlockSender block (Point block) m ())
 -> m (BlockFetchBlockSender block (Point block) m ()))
-> m (BlockFetchBlockSender block (Point block) m ())
-> m (BlockFetchBlockSender block (Point block) m ())
forall a b. (a -> b) -> a -> b
$
      case AnchoredFragment block
-> Point block -> Point block -> Maybe (AnchoredFragment block)
forall block.
HasHeader block =>
AnchoredFragment block
-> Point block -> Point block -> Maybe (AnchoredFragment block)
AnchoredFragment.sliceRange AnchoredFragment block
chain Point block
lpoint Point block
upoint of
        Maybe (AnchoredFragment block)
Nothing     -> BlockFetchBlockSender block (Point block) m ()
-> m (BlockFetchBlockSender block (Point block) m ())
forall a. a -> m a
forall (m :: * -> *) a. Monad m => a -> m a
return (BlockFetchBlockSender block (Point block) m ()
 -> m (BlockFetchBlockSender block (Point block) m ()))
-> BlockFetchBlockSender block (Point block) m ()
-> m (BlockFetchBlockSender block (Point block) m ())
forall a b. (a -> b) -> a -> b
$ m (BlockFetchServer block (Point block) m ())
-> BlockFetchBlockSender block (Point block) m ()
forall (m :: * -> *) block point a.
m (BlockFetchServer block point m a)
-> BlockFetchBlockSender block point m a
SendMsgNoBlocks (BlockFetchServer block (Point block) m ()
-> m (BlockFetchServer block (Point block) m ())
forall a. a -> m a
forall (m :: * -> *) a. Monad m => a -> m a
return BlockFetchServer block (Point block) m ()
senderSide)
        Just AnchoredFragment block
chain' -> BlockFetchBlockSender block (Point block) m ()
-> m (BlockFetchBlockSender block (Point block) m ())
forall a. a -> m a
forall (m :: * -> *) a. Monad m => a -> m a
return (BlockFetchBlockSender block (Point block) m ()
 -> m (BlockFetchBlockSender block (Point block) m ()))
-> BlockFetchBlockSender block (Point block) m ()
-> m (BlockFetchBlockSender block (Point block) m ())
forall a b. (a -> b) -> a -> b
$ m (BlockFetchSendBlocks block (Point block) m ())
-> BlockFetchBlockSender block (Point block) m ()
forall (m :: * -> *) block point a.
m (BlockFetchSendBlocks block point m a)
-> BlockFetchBlockSender block point m a
SendMsgStartBatch ([block] -> m (BlockFetchSendBlocks block (Point block) m ())
sendBlocks [block]
blocks)
          where blocks :: [block]
blocks = AnchoredFragment block -> [block]
forall v a b. AnchoredSeq v a b -> [b]
AnchoredFragment.toOldestFirst AnchoredFragment block
chain'


    sendBlocks :: [block] -> m (BlockFetchSendBlocks block (Point block) m ())
    sendBlocks :: [block] -> m (BlockFetchSendBlocks block (Point block) m ())
sendBlocks []     = BlockFetchSendBlocks block (Point block) m ()
-> m (BlockFetchSendBlocks block (Point block) m ())
forall a. a -> m a
forall (m :: * -> *) a. Monad m => a -> m a
return (BlockFetchSendBlocks block (Point block) m ()
 -> m (BlockFetchSendBlocks block (Point block) m ()))
-> BlockFetchSendBlocks block (Point block) m ()
-> m (BlockFetchSendBlocks block (Point block) m ())
forall a b. (a -> b) -> a -> b
$ m (BlockFetchServer block (Point block) m ())
-> BlockFetchSendBlocks block (Point block) m ()
forall (m :: * -> *) block point a.
m (BlockFetchServer block point m a)
-> BlockFetchSendBlocks block point m a
SendMsgBatchDone (BlockFetchServer block (Point block) m ()
-> m (BlockFetchServer block (Point block) m ())
forall a. a -> m a
forall (m :: * -> *) a. Monad m => a -> m a
return BlockFetchServer block (Point block) m ()
senderSide)
    sendBlocks (block
b:[block]
bs) = BlockFetchSendBlocks block (Point block) m ()
-> m (BlockFetchSendBlocks block (Point block) m ())
forall a. a -> m a
forall (m :: * -> *) a. Monad m => a -> m a
return (BlockFetchSendBlocks block (Point block) m ()
 -> m (BlockFetchSendBlocks block (Point block) m ()))
-> BlockFetchSendBlocks block (Point block) m ()
-> m (BlockFetchSendBlocks block (Point block) m ())
forall a b. (a -> b) -> a -> b
$ block
-> m (BlockFetchSendBlocks block (Point block) m ())
-> BlockFetchSendBlocks block (Point block) m ()
forall block (m :: * -> *) point a.
block
-> m (BlockFetchSendBlocks block point m a)
-> BlockFetchSendBlocks block point m a
SendMsgBlock block
b ([block] -> m (BlockFetchSendBlocks block (Point block) m ())
sendBlocks [block]
bs)


--
-- Mock downloaded block heap
--

-- | This provides an interface to a collection of dowloaded blocks. This is
-- enough to implement the 'addFetchedBlock' and 'readFetchedBlocks' methods
-- in the 'BlockFetchConsensusInterface' and related interfaces.
--
-- The interface is enough to use in examples and tests.
--
data TestFetchedBlockHeap m block = TestFetchedBlockHeap {
       forall (m :: * -> *) block.
TestFetchedBlockHeap m block -> STM m (Set (Point block))
getTestFetchedBlocks :: STM m (Set (Point block)),
       forall (m :: * -> *) block.
TestFetchedBlockHeap m block -> Point block -> block -> m ()
addTestFetchedBlock  :: Point block -> block -> m ()
     }

-- | Make a 'TestFetchedBlockHeap' using a simple in-memory 'Map', stored in an
-- 'STM' 'TVar'.
--
-- This is suitable for examples and tests.
--
mkTestFetchedBlockHeap :: (MonadSTM m, Ord (Point block))
                       => [Point block]
                       -> m (TestFetchedBlockHeap m block)
mkTestFetchedBlockHeap :: forall (m :: * -> *) block.
(MonadSTM m, Ord (Point block)) =>
[Point block] -> m (TestFetchedBlockHeap m block)
mkTestFetchedBlockHeap [Point block]
points = do
    v <- Set (Point block) -> m (StrictTVar m (Set (Point block)))
forall (m :: * -> *) a. MonadSTM m => a -> m (StrictTVar m a)
newTVarIO ([Point block] -> Set (Point block)
forall a. Ord a => [a] -> Set a
Set.fromList [Point block]
points)
    return TestFetchedBlockHeap {
      getTestFetchedBlocks = readTVar v,
      addTestFetchedBlock  = \Point block
p block
_b -> STM m () -> m ()
forall a. HasCallStack => STM m a -> m a
forall (m :: * -> *) a.
(MonadSTM m, HasCallStack) =>
STM m a -> m a
atomically (StrictTVar m (Set (Point block))
-> (Set (Point block) -> Set (Point block)) -> STM m ()
forall (m :: * -> *) a.
MonadSTM m =>
StrictTVar m a -> (a -> a) -> STM m ()
modifyTVar StrictTVar m (Set (Point block))
v (Point block -> Set (Point block) -> Set (Point block)
forall a. Ord a => a -> Set a -> Set a
Set.insert Point block
p))
    }