{-# LANGUAGE DataKinds           #-}
{-# LANGUAGE FlexibleContexts    #-}
{-# LANGUAGE NamedFieldPuns      #-}
{-# LANGUAGE Rank2Types          #-}
{-# LANGUAGE ScopedTypeVariables #-}
{-# LANGUAGE TypeFamilies        #-}

module Ouroboros.Network.BlockFetch.Examples
  ( blockFetchExample0
  , blockFetchExample1
  , mockBlockFetchServer1
  , exampleFixedPeerGSVs
  ) where

import Codec.Serialise (Serialise (..))
import Data.ByteString.Lazy qualified as LBS
import Data.Foldable (traverse_)
import Data.List as List (foldl')
import Data.Map (Map)
import Data.Map.Strict qualified as Map
import Data.Maybe (fromMaybe)
import Data.Set (Set)
import Data.Set qualified as Set

import Control.Concurrent.Class.MonadSTM.Strict
import Control.Exception (assert)
import Control.Monad (forever)
import Control.Monad.Class.MonadAsync
import Control.Monad.Class.MonadFork
import Control.Monad.Class.MonadST
import Control.Monad.Class.MonadThrow
import Control.Monad.Class.MonadTime.SI
import Control.Monad.Class.MonadTimer.SI
import Control.Tracer (Tracer, contramap, nullTracer)

import Ouroboros.Network.AnchoredFragment (AnchoredFragment, anchorPoint)
import Ouroboros.Network.AnchoredFragment qualified as AnchoredFragment
import Ouroboros.Network.Block

import Network.TypedProtocol.Peer.Client

import Ouroboros.Network.AnchoredFragment qualified as AF
import Ouroboros.Network.BlockFetch
import Ouroboros.Network.BlockFetch.Client
import Ouroboros.Network.BlockFetch.ConsensusInterface (ChainSelStarvation (..))
import Ouroboros.Network.Channel
import Ouroboros.Network.ControlMessage
import Ouroboros.Network.DeltaQ
import Ouroboros.Network.Driver
import Ouroboros.Network.NodeToNode (NodeToNodeVersion (..))
import Ouroboros.Network.Protocol.BlockFetch.Codec
import Ouroboros.Network.Protocol.BlockFetch.Server
import Ouroboros.Network.Protocol.BlockFetch.Type
import Ouroboros.Network.Util.ShowProxy

import Ouroboros.Network.BlockFetch.Decision.Trace (TraceDecisionEvent)
import Ouroboros.Network.Mock.ConcreteBlock


-- | Run a single block fetch protocol until the chain is downloaded.
--
blockFetchExample0 :: forall m.
                      (MonadST m, MonadAsync m, MonadDelay m, MonadFork m,
                       MonadTime m, MonadTimer m, MonadMask m, MonadThrow (STM m))
                   => FetchMode
                   -> Tracer m (TraceDecisionEvent Int BlockHeader)
                   -> Tracer m (TraceLabelPeer Int
                                 (TraceFetchClientState BlockHeader))
                   -> Tracer m (TraceLabelPeer Int
                                 (TraceSendRecv (BlockFetch Block (Point Block))))
                   -> Maybe DiffTime -- ^ client's channel delay
                   -> Maybe DiffTime -- ^ servers's channel delay
                   -> ControlMessageSTM m
                   -> AnchoredFragment Block -- ^ Fixed current chain
                   -> AnchoredFragment Block -- ^ Fixed candidate chain
                   -> m ()
blockFetchExample0 :: forall (m :: * -> *).
(MonadST m, MonadAsync m, MonadDelay m, MonadFork m, MonadTime m,
 MonadTimer m, MonadMask m, MonadThrow (STM m)) =>
FetchMode
-> Tracer m (TraceDecisionEvent Int BlockHeader)
-> Tracer
     m (TraceLabelPeer Int (TraceFetchClientState BlockHeader))
-> Tracer
     m
     (TraceLabelPeer
        Int (TraceSendRecv (BlockFetch Block (Point Block))))
-> Maybe DiffTime
-> Maybe DiffTime
-> ControlMessageSTM m
-> AnchoredFragment Block
-> AnchoredFragment Block
-> m ()
blockFetchExample0 FetchMode
fetchMode Tracer m (TraceDecisionEvent Int BlockHeader)
decisionTracer Tracer m (TraceLabelPeer Int (TraceFetchClientState BlockHeader))
clientStateTracer Tracer
  m
  (TraceLabelPeer
     Int (TraceSendRecv (BlockFetch Block (Point Block))))
clientMsgTracer
                   Maybe DiffTime
clientDelay Maybe DiffTime
serverDelay
                   ControlMessageSTM m
controlMessageSTM
                   AnchoredFragment Block
currentChain AnchoredFragment Block
candidateChain = do

    registry    <- m (FetchClientRegistry Int BlockHeader Block m)
forall (m :: * -> *) peer header block.
MonadSTM m =>
m (FetchClientRegistry peer header block m)
newFetchClientRegistry :: m (FetchClientRegistry Int BlockHeader Block m)
    blockHeap   <- mkTestFetchedBlockHeap (anchoredChainPoints currentChain)

    (clientAsync, serverAsync, syncClientAsync, keepAliveAsync)
                <- runFetchClientAndServerAsync
                    (contramap (TraceLabelPeer peerno) clientMsgTracer)
                    (contramap (TraceLabelPeer peerno) serverMsgTracer)
                    (maxBound :: NodeToNodeVersion)
                    clientDelay serverDelay
                    registry peerno
                    (blockFetchClient (maxBound :: NodeToNodeVersion) controlMessageSTM nullTracer)
                    (mockBlockFetchServer1 candidateChain)

    fetchAsync  <- async $ do
      threadId <- myThreadId
      labelThread threadId "block-fetch-logic"
      blockFetch registry blockHeap
    driverAsync <- async $ do
      threadId <- myThreadId
      labelThread threadId "driver"
      driver blockHeap

    -- Order of shutdown is important for this example: must kill the fetch
    -- thread before the peer threads, or otherwise the first assertion in
    -- `fetchDecisionsForStateSnapshot` can be triggered.
    atomically $ controlMessageSTM >>= check . (Terminate ==)
    cancel fetchAsync
    _ <- waitAnyCancel [ driverAsync, clientAsync, serverAsync
                       , syncClientAsync, keepAliveAsync
                       ]
    return ()

  where
    peerno :: Int
peerno = Int
1

    serverMsgTracer :: Tracer m a
serverMsgTracer = Tracer m a
forall (m :: * -> *) a. Applicative m => Tracer m a
nullTracer

    currentChainHeaders :: AnchoredFragment BlockHeader
currentChainHeaders =
      (Block -> BlockHeader)
-> AnchoredFragment Block -> AnchoredFragment BlockHeader
forall block2 block1.
(HasHeader block2, HeaderHash block1 ~ HeaderHash block2) =>
(block1 -> block2)
-> AnchoredFragment block1 -> AnchoredFragment block2
AnchoredFragment.mapAnchoredFragment Block -> BlockHeader
blockHeader AnchoredFragment Block
currentChain

    candidateChainHeaders :: Map Int (AnchoredFragment BlockHeader)
candidateChainHeaders =
      [(Int, AnchoredFragment BlockHeader)]
-> Map Int (AnchoredFragment BlockHeader)
forall k a. Ord k => [(k, a)] -> Map k a
Map.fromList ([(Int, AnchoredFragment BlockHeader)]
 -> Map Int (AnchoredFragment BlockHeader))
-> [(Int, AnchoredFragment BlockHeader)]
-> Map Int (AnchoredFragment BlockHeader)
forall a b. (a -> b) -> a -> b
$ [Int]
-> [AnchoredFragment BlockHeader]
-> [(Int, AnchoredFragment BlockHeader)]
forall a b. [a] -> [b] -> [(a, b)]
zip [Int
1..] ([AnchoredFragment BlockHeader]
 -> [(Int, AnchoredFragment BlockHeader)])
-> [AnchoredFragment BlockHeader]
-> [(Int, AnchoredFragment BlockHeader)]
forall a b. (a -> b) -> a -> b
$
      (AnchoredFragment Block -> AnchoredFragment BlockHeader)
-> [AnchoredFragment Block] -> [AnchoredFragment BlockHeader]
forall a b. (a -> b) -> [a] -> [b]
map ((Block -> BlockHeader)
-> AnchoredFragment Block -> AnchoredFragment BlockHeader
forall block2 block1.
(HasHeader block2, HeaderHash block1 ~ HeaderHash block2) =>
(block1 -> block2)
-> AnchoredFragment block1 -> AnchoredFragment block2
AnchoredFragment.mapAnchoredFragment Block -> BlockHeader
blockHeader) [AnchoredFragment Block
candidateChain]

    anchoredChainPoints :: AnchoredFragment block -> [Point block]
anchoredChainPoints AnchoredFragment block
c = AnchoredFragment block -> Point block
forall block. AnchoredFragment block -> Point block
anchorPoint AnchoredFragment block
c
                          Point block -> [Point block] -> [Point block]
forall a. a -> [a] -> [a]
: (block -> Point block) -> [block] -> [Point block]
forall a b. (a -> b) -> [a] -> [b]
map block -> Point block
forall block. HasHeader block => block -> Point block
blockPoint (AnchoredFragment block -> [block]
forall v a b. AnchoredSeq v a b -> [b]
AnchoredFragment.toOldestFirst AnchoredFragment block
c)

    blockFetch :: FetchClientRegistry Int BlockHeader Block m
               -> TestFetchedBlockHeap m Block
               -> m ()
    blockFetch :: FetchClientRegistry Int BlockHeader Block m
-> TestFetchedBlockHeap m Block -> m ()
blockFetch FetchClientRegistry Int BlockHeader Block m
registry TestFetchedBlockHeap m Block
blockHeap =
        Tracer m (TraceDecisionEvent Int BlockHeader)
-> Tracer
     m (TraceLabelPeer Int (TraceFetchClientState BlockHeader))
-> BlockFetchConsensusInterface Int BlockHeader Block m
-> FetchClientRegistry Int BlockHeader Block m
-> BlockFetchConfiguration
-> m Void
forall addr header block (m :: * -> *).
(HasHeader header, HasHeader block,
 HeaderHash header ~ HeaderHash block, MonadDelay m, MonadTimer m,
 Ord addr, Hashable addr) =>
Tracer m (TraceDecisionEvent addr header)
-> Tracer m (TraceLabelPeer addr (TraceFetchClientState header))
-> BlockFetchConsensusInterface addr header block m
-> FetchClientRegistry addr header block m
-> BlockFetchConfiguration
-> m Void
blockFetchLogic
          Tracer m (TraceDecisionEvent Int BlockHeader)
decisionTracer Tracer m (TraceLabelPeer Int (TraceFetchClientState BlockHeader))
clientStateTracer
          (FetchMode
-> (forall x. HasHeader x => FromConsensus x -> STM m UTCTime)
-> TestFetchedBlockHeap m Block
-> AnchoredFragment BlockHeader
-> Map Int (AnchoredFragment BlockHeader)
-> BlockFetchConsensusInterface Int BlockHeader Block m
forall (m :: * -> *) header block peer.
(MonadSTM m, HasHeader header, HasHeader block) =>
FetchMode
-> (forall x. HasHeader x => FromConsensus x -> STM m UTCTime)
-> TestFetchedBlockHeap m block
-> AnchoredFragment header
-> Map peer (AnchoredFragment header)
-> BlockFetchConsensusInterface peer header block m
sampleBlockFetchPolicy1 FetchMode
fetchMode FromConsensus x -> STM m UTCTime
forall x. HasHeader x => FromConsensus x -> STM m UTCTime
forall {f :: * -> *} {b}.
(Applicative f, HasHeader b) =>
FromConsensus b -> f UTCTime
headerForgeUTCTime TestFetchedBlockHeap m Block
blockHeap AnchoredFragment BlockHeader
currentChainHeaders Map Int (AnchoredFragment BlockHeader)
candidateChainHeaders)
          FetchClientRegistry Int BlockHeader Block m
registry
          (BlockFetchConfiguration {
            bfcMaxConcurrencyBulkSync :: Word
bfcMaxConcurrencyBulkSync = Word
1,
            bfcMaxConcurrencyDeadline :: Word
bfcMaxConcurrencyDeadline = Word
2,
            bfcMaxRequestsInflight :: Word
bfcMaxRequestsInflight    = Word
10,
            bfcDecisionLoopIntervalGenesis :: DiffTime
bfcDecisionLoopIntervalGenesis = DiffTime
0.04,
            bfcDecisionLoopIntervalPraos :: DiffTime
bfcDecisionLoopIntervalPraos = DiffTime
0.01,
            bfcSalt :: Int
bfcSalt                   = Int
0,
            bfcGenesisBFConfig :: GenesisBlockFetchConfiguration
bfcGenesisBFConfig        = GenesisBlockFetchConfiguration
              { gbfcGracePeriod :: DiffTime
gbfcGracePeriod = DiffTime
10 -- seconds
              }
          })
        m Void -> m () -> m ()
forall a b. m a -> m b -> m b
forall (m :: * -> *) a b. Monad m => m a -> m b -> m b
>> () -> m ()
forall a. a -> m a
forall (m :: * -> *) a. Monad m => a -> m a
return ()

    headerForgeUTCTime :: FromConsensus b -> f UTCTime
headerForgeUTCTime (FromConsensus b
x) =
        UTCTime -> f UTCTime
forall a. a -> f a
forall (f :: * -> *) a. Applicative f => a -> f a
pure (UTCTime -> f UTCTime) -> UTCTime -> f UTCTime
forall a b. (a -> b) -> a -> b
$ SlotNo -> UTCTime
convertSlotToTimeForTestsAssumingNoHardFork (b -> SlotNo
forall b. HasHeader b => b -> SlotNo
blockSlot b
x)

    driver :: TestFetchedBlockHeap m Block -> m ()
    driver :: TestFetchedBlockHeap m Block -> m ()
driver TestFetchedBlockHeap m Block
blockHeap = do
      STM m () -> m ()
forall a. HasCallStack => STM m a -> m a
forall (m :: * -> *) a.
(MonadSTM m, HasCallStack) =>
STM m a -> m a
atomically (STM m () -> m ()) -> STM m () -> m ()
forall a b. (a -> b) -> a -> b
$ do
        heap <- TestFetchedBlockHeap m Block -> STM m (Set (Point Block))
forall (m :: * -> *) block.
TestFetchedBlockHeap m block -> STM m (Set (Point block))
getTestFetchedBlocks TestFetchedBlockHeap m Block
blockHeap
        check $
          all (\AnchoredFragment Block
c -> AnchoredFragment Block -> Point Block
forall block.
HasHeader block =>
AnchoredFragment block -> Point block
AnchoredFragment.headPoint AnchoredFragment Block
c Point Block -> Set (Point Block) -> Bool
forall a. Ord a => a -> Set a -> Bool
`Set.member` Set (Point Block)
heap)
              [candidateChain]


--
-- Sample setups of block fetch logic with fetch clients and peers
--

-- | End to end test of block fetching with fixed chain and candidates.
--
-- The setup is the block fetch logic thread and a bunch of peers each with a
-- chain. The current chain and candidate chains are fixed and the peers never
-- fail or go slowly.
--
-- Run the block fetch until all the chains are downloaded. So this assumes
-- all the candidates do intersect the current chain, and are longer, so we
-- will be interested in downloading them all.
--
blockFetchExample1 :: forall m.
                      (MonadST m, MonadAsync m, MonadDelay m, MonadFork m,
                       MonadTime m, MonadTimer m, MonadMask m, MonadThrow (STM m))
                   => FetchMode
                   -> Tracer m (TraceDecisionEvent Int BlockHeader)
                   -> Tracer m (TraceLabelPeer Int
                                 (TraceFetchClientState BlockHeader))
                   -> Tracer m (TraceLabelPeer Int
                                 (TraceSendRecv (BlockFetch Block (Point Block))))
                   -> Maybe DiffTime -- ^ client's channel delay
                   -> Maybe DiffTime -- ^ server's channel delay
                   -> AnchoredFragment Block   -- ^ Fixed current chain
                   -> [AnchoredFragment Block] -- ^ Fixed candidate chains
                   -> m ()
blockFetchExample1 :: forall (m :: * -> *).
(MonadST m, MonadAsync m, MonadDelay m, MonadFork m, MonadTime m,
 MonadTimer m, MonadMask m, MonadThrow (STM m)) =>
FetchMode
-> Tracer m (TraceDecisionEvent Int BlockHeader)
-> Tracer
     m (TraceLabelPeer Int (TraceFetchClientState BlockHeader))
-> Tracer
     m
     (TraceLabelPeer
        Int (TraceSendRecv (BlockFetch Block (Point Block))))
-> Maybe DiffTime
-> Maybe DiffTime
-> AnchoredFragment Block
-> [AnchoredFragment Block]
-> m ()
blockFetchExample1 FetchMode
fetchMode Tracer m (TraceDecisionEvent Int BlockHeader)
decisionTracer Tracer m (TraceLabelPeer Int (TraceFetchClientState BlockHeader))
clientStateTracer Tracer
  m
  (TraceLabelPeer
     Int (TraceSendRecv (BlockFetch Block (Point Block))))
clientMsgTracer
                   Maybe DiffTime
clientDelay Maybe DiffTime
serverDelay
                   AnchoredFragment Block
currentChain [AnchoredFragment Block]
candidateChains = do
    controlMessageVar <- ControlMessage -> m (StrictTVar m ControlMessage)
forall (m :: * -> *) a. MonadSTM m => a -> m (StrictTVar m a)
newTVarIO ControlMessage
Continue
    let controlMessageSTM = StrictTVar m ControlMessage -> STM m ControlMessage
forall (m :: * -> *) a. MonadSTM m => StrictTVar m a -> STM m a
readTVar StrictTVar m ControlMessage
controlMessageVar

    registry    <- newFetchClientRegistry
    blockHeap   <- mkTestFetchedBlockHeap (anchoredChainPoints currentChain)

    peerAsyncs  <- sequence
                    [ runFetchClientAndServerAsync
                        (contramap (TraceLabelPeer peerno) clientMsgTracer)
                        (contramap (TraceLabelPeer peerno) serverMsgTracer)
                        (maxBound :: NodeToNodeVersion)
                        clientDelay serverDelay
                        registry peerno
                        (blockFetchClient (maxBound :: NodeToNodeVersion) controlMessageSTM nullTracer)
                        (mockBlockFetchServer1 candidateChain)
                    | (peerno, candidateChain) <- zip [1..] candidateChains
                    ]
    fetchAsync  <- async $ do
      threadId <- myThreadId
      labelThread threadId "block-fetch-logic"
      blockFetch registry blockHeap
    driverAsync <- async $ do
      threadId <- myThreadId
      labelThread threadId "block-fetch-driver"
      downloadTimer

    -- Order of shutdown here is important for this example: must kill off the
    -- fetch thread before the peer threads.
    _ <- waitAnyCancel $ [ fetchAsync, driverAsync ]
                      ++ [ peerAsync
                         | (_, server, sync, ks) <- peerAsyncs
                         , peerAsync <- [server, sync, ks] ]

    -- let the client side protocols terminate gracefully.
    atomically $ writeTVar controlMessageVar Terminate
    traverse_ (\(Async m ()
client,Async m ()
_,Async m ()
_,Async m ()
_) -> Async m () -> m (Either SomeException ())
forall a. Async m a -> m (Either SomeException a)
forall (m :: * -> *) a.
MonadAsync m =>
Async m a -> m (Either SomeException a)
waitCatch Async m ()
client) peerAsyncs

  where
    serverMsgTracer :: Tracer m a
serverMsgTracer = Tracer m a
forall (m :: * -> *) a. Applicative m => Tracer m a
nullTracer

    currentChainHeaders :: AnchoredFragment BlockHeader
currentChainHeaders =
      (Block -> BlockHeader)
-> AnchoredFragment Block -> AnchoredFragment BlockHeader
forall block2 block1.
(HasHeader block2, HeaderHash block1 ~ HeaderHash block2) =>
(block1 -> block2)
-> AnchoredFragment block1 -> AnchoredFragment block2
AnchoredFragment.mapAnchoredFragment Block -> BlockHeader
blockHeader AnchoredFragment Block
currentChain

    candidateChainHeaders :: Map Int (AnchoredFragment BlockHeader)
candidateChainHeaders =
      [(Int, AnchoredFragment BlockHeader)]
-> Map Int (AnchoredFragment BlockHeader)
forall k a. Ord k => [(k, a)] -> Map k a
Map.fromList ([(Int, AnchoredFragment BlockHeader)]
 -> Map Int (AnchoredFragment BlockHeader))
-> [(Int, AnchoredFragment BlockHeader)]
-> Map Int (AnchoredFragment BlockHeader)
forall a b. (a -> b) -> a -> b
$ [Int]
-> [AnchoredFragment BlockHeader]
-> [(Int, AnchoredFragment BlockHeader)]
forall a b. [a] -> [b] -> [(a, b)]
zip [Int
1..] ([AnchoredFragment BlockHeader]
 -> [(Int, AnchoredFragment BlockHeader)])
-> [AnchoredFragment BlockHeader]
-> [(Int, AnchoredFragment BlockHeader)]
forall a b. (a -> b) -> a -> b
$
      (AnchoredFragment Block -> AnchoredFragment BlockHeader)
-> [AnchoredFragment Block] -> [AnchoredFragment BlockHeader]
forall a b. (a -> b) -> [a] -> [b]
map ((Block -> BlockHeader)
-> AnchoredFragment Block -> AnchoredFragment BlockHeader
forall block2 block1.
(HasHeader block2, HeaderHash block1 ~ HeaderHash block2) =>
(block1 -> block2)
-> AnchoredFragment block1 -> AnchoredFragment block2
AnchoredFragment.mapAnchoredFragment Block -> BlockHeader
blockHeader) [AnchoredFragment Block]
candidateChains

    anchoredChainPoints :: AnchoredFragment block -> [Point block]
anchoredChainPoints AnchoredFragment block
c = AnchoredFragment block -> Point block
forall block. AnchoredFragment block -> Point block
anchorPoint AnchoredFragment block
c
                          Point block -> [Point block] -> [Point block]
forall a. a -> [a] -> [a]
: (block -> Point block) -> [block] -> [Point block]
forall a b. (a -> b) -> [a] -> [b]
map block -> Point block
forall block. HasHeader block => block -> Point block
blockPoint (AnchoredFragment block -> [block]
forall v a b. AnchoredSeq v a b -> [b]
AnchoredFragment.toOldestFirst AnchoredFragment block
c)

    blockFetch :: FetchClientRegistry Int BlockHeader Block m
               -> TestFetchedBlockHeap m Block
               -> m ()
    blockFetch :: FetchClientRegistry Int BlockHeader Block m
-> TestFetchedBlockHeap m Block -> m ()
blockFetch FetchClientRegistry Int BlockHeader Block m
registry TestFetchedBlockHeap m Block
blockHeap =
        Tracer m (TraceDecisionEvent Int BlockHeader)
-> Tracer
     m (TraceLabelPeer Int (TraceFetchClientState BlockHeader))
-> BlockFetchConsensusInterface Int BlockHeader Block m
-> FetchClientRegistry Int BlockHeader Block m
-> BlockFetchConfiguration
-> m Void
forall addr header block (m :: * -> *).
(HasHeader header, HasHeader block,
 HeaderHash header ~ HeaderHash block, MonadDelay m, MonadTimer m,
 Ord addr, Hashable addr) =>
Tracer m (TraceDecisionEvent addr header)
-> Tracer m (TraceLabelPeer addr (TraceFetchClientState header))
-> BlockFetchConsensusInterface addr header block m
-> FetchClientRegistry addr header block m
-> BlockFetchConfiguration
-> m Void
blockFetchLogic
          Tracer m (TraceDecisionEvent Int BlockHeader)
decisionTracer Tracer m (TraceLabelPeer Int (TraceFetchClientState BlockHeader))
clientStateTracer
          (FetchMode
-> (forall x. HasHeader x => FromConsensus x -> STM m UTCTime)
-> TestFetchedBlockHeap m Block
-> AnchoredFragment BlockHeader
-> Map Int (AnchoredFragment BlockHeader)
-> BlockFetchConsensusInterface Int BlockHeader Block m
forall (m :: * -> *) header block peer.
(MonadSTM m, HasHeader header, HasHeader block) =>
FetchMode
-> (forall x. HasHeader x => FromConsensus x -> STM m UTCTime)
-> TestFetchedBlockHeap m block
-> AnchoredFragment header
-> Map peer (AnchoredFragment header)
-> BlockFetchConsensusInterface peer header block m
sampleBlockFetchPolicy1 FetchMode
fetchMode FromConsensus x -> STM m UTCTime
forall x. HasHeader x => FromConsensus x -> STM m UTCTime
forall {f :: * -> *} {b}.
(Applicative f, HasHeader b) =>
FromConsensus b -> f UTCTime
headerForgeUTCTime TestFetchedBlockHeap m Block
blockHeap AnchoredFragment BlockHeader
currentChainHeaders Map Int (AnchoredFragment BlockHeader)
candidateChainHeaders)
          FetchClientRegistry Int BlockHeader Block m
registry
          (BlockFetchConfiguration {
            bfcMaxConcurrencyBulkSync :: Word
bfcMaxConcurrencyBulkSync = Word
1,
            bfcMaxConcurrencyDeadline :: Word
bfcMaxConcurrencyDeadline = Word
2,
            bfcMaxRequestsInflight :: Word
bfcMaxRequestsInflight    = Word
10,
            bfcDecisionLoopIntervalGenesis :: DiffTime
bfcDecisionLoopIntervalGenesis = DiffTime
0.04,
            bfcDecisionLoopIntervalPraos :: DiffTime
bfcDecisionLoopIntervalPraos = DiffTime
0.01,
            bfcSalt :: Int
bfcSalt                   = Int
0,
            bfcGenesisBFConfig :: GenesisBlockFetchConfiguration
bfcGenesisBFConfig        = GenesisBlockFetchConfiguration
              { gbfcGracePeriod :: DiffTime
gbfcGracePeriod = DiffTime
10 -- seconds
              }
          })
        m Void -> m () -> m ()
forall a b. m a -> m b -> m b
forall (m :: * -> *) a b. Monad m => m a -> m b -> m b
>> () -> m ()
forall a. a -> m a
forall (m :: * -> *) a. Monad m => a -> m a
return ()

    headerForgeUTCTime :: FromConsensus b -> f UTCTime
headerForgeUTCTime (FromConsensus b
x) =
        UTCTime -> f UTCTime
forall a. a -> f a
forall (f :: * -> *) a. Applicative f => a -> f a
pure (UTCTime -> f UTCTime) -> UTCTime -> f UTCTime
forall a b. (a -> b) -> a -> b
$ SlotNo -> UTCTime
convertSlotToTimeForTestsAssumingNoHardFork (b -> SlotNo
forall b. HasHeader b => b -> SlotNo
blockSlot b
x)

    -- | Terminates after 1 second per block in the candidate chains.
    downloadTimer :: m ()
    downloadTimer :: m ()
downloadTimer =
      let totalBlocks :: Int
totalBlocks = [Int] -> Int
forall a. Num a => [a] -> a
forall (t :: * -> *) a. (Foldable t, Num a) => t a -> a
sum ([Int] -> Int) -> [Int] -> Int
forall a b. (a -> b) -> a -> b
$ (AnchoredFragment Block -> Int)
-> [AnchoredFragment Block] -> [Int]
forall a b. (a -> b) -> [a] -> [b]
map AnchoredFragment Block -> Int
forall v a b. Anchorable v a b => AnchoredSeq v a b -> Int
AF.length [AnchoredFragment Block]
candidateChains
       in DiffTime -> m ()
forall (m :: * -> *). MonadDelay m => DiffTime -> m ()
threadDelay (Int -> DiffTime
forall a b. (Integral a, Num b) => a -> b
fromIntegral Int
totalBlocks)

--
-- Sample block fetch configurations
--

sampleBlockFetchPolicy1 :: (MonadSTM m, HasHeader header, HasHeader block)
                        => FetchMode
                        -> (forall x. HasHeader x => FromConsensus x -> STM m UTCTime)
                        -> TestFetchedBlockHeap m block
                        -> AnchoredFragment header
                        -> Map peer (AnchoredFragment header)
                        -> BlockFetchConsensusInterface peer header block m
sampleBlockFetchPolicy1 :: forall (m :: * -> *) header block peer.
(MonadSTM m, HasHeader header, HasHeader block) =>
FetchMode
-> (forall x. HasHeader x => FromConsensus x -> STM m UTCTime)
-> TestFetchedBlockHeap m block
-> AnchoredFragment header
-> Map peer (AnchoredFragment header)
-> BlockFetchConsensusInterface peer header block m
sampleBlockFetchPolicy1 FetchMode
fetchMode forall x. HasHeader x => FromConsensus x -> STM m UTCTime
headerFieldsForgeUTCTime TestFetchedBlockHeap m block
blockHeap AnchoredFragment header
currentChain Map peer (AnchoredFragment header)
candidateChains =
    BlockFetchConsensusInterface {
      readCandidateChains :: STM m (Map peer (AnchoredFragment header))
readCandidateChains    = Map peer (AnchoredFragment header)
-> STM m (Map peer (AnchoredFragment header))
forall a. a -> STM m a
forall (m :: * -> *) a. Monad m => a -> m a
return Map peer (AnchoredFragment header)
candidateChains,
      readCurrentChain :: STM m (AnchoredFragment header)
readCurrentChain       = AnchoredFragment header -> STM m (AnchoredFragment header)
forall a. a -> STM m a
forall (m :: * -> *) a. Monad m => a -> m a
return AnchoredFragment header
currentChain,
      readFetchMode :: STM m FetchMode
readFetchMode          = FetchMode -> STM m FetchMode
forall a. a -> STM m a
forall (m :: * -> *) a. Monad m => a -> m a
return FetchMode
fetchMode,
      readFetchedBlocks :: STM m (Point block -> Bool)
readFetchedBlocks      = (Point block -> Set (Point block) -> Bool)
-> Set (Point block) -> Point block -> Bool
forall a b c. (a -> b -> c) -> b -> a -> c
flip Point block -> Set (Point block) -> Bool
forall a. Ord a => a -> Set a -> Bool
Set.member (Set (Point block) -> Point block -> Bool)
-> STM m (Set (Point block)) -> STM m (Point block -> Bool)
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$>
                                 TestFetchedBlockHeap m block -> STM m (Set (Point block))
forall (m :: * -> *) block.
TestFetchedBlockHeap m block -> STM m (Set (Point block))
getTestFetchedBlocks TestFetchedBlockHeap m block
blockHeap,
      readFetchedMaxSlotNo :: STM m MaxSlotNo
readFetchedMaxSlotNo   = (MaxSlotNo -> MaxSlotNo -> MaxSlotNo)
-> MaxSlotNo -> [MaxSlotNo] -> MaxSlotNo
forall b a. (b -> a -> b) -> b -> [a] -> b
forall (t :: * -> *) b a.
Foldable t =>
(b -> a -> b) -> b -> t a -> b
List.foldl' MaxSlotNo -> MaxSlotNo -> MaxSlotNo
forall a. Ord a => a -> a -> a
max MaxSlotNo
NoMaxSlotNo ([MaxSlotNo] -> MaxSlotNo)
-> (Set (Point block) -> [MaxSlotNo])
-> Set (Point block)
-> MaxSlotNo
forall b c a. (b -> c) -> (a -> b) -> a -> c
.
                               (Point block -> MaxSlotNo) -> [Point block] -> [MaxSlotNo]
forall a b. (a -> b) -> [a] -> [b]
map (WithOrigin SlotNo -> MaxSlotNo
maxSlotNoFromWithOrigin (WithOrigin SlotNo -> MaxSlotNo)
-> (Point block -> WithOrigin SlotNo) -> Point block -> MaxSlotNo
forall b c a. (b -> c) -> (a -> b) -> a -> c
. Point block -> WithOrigin SlotNo
forall {k} (block :: k). Point block -> WithOrigin SlotNo
pointSlot) ([Point block] -> [MaxSlotNo])
-> (Set (Point block) -> [Point block])
-> Set (Point block)
-> [MaxSlotNo]
forall b c a. (b -> c) -> (a -> b) -> a -> c
.
                               Set (Point block) -> [Point block]
forall a. Set a -> [a]
Set.elems (Set (Point block) -> MaxSlotNo)
-> STM m (Set (Point block)) -> STM m MaxSlotNo
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$>
                               TestFetchedBlockHeap m block -> STM m (Set (Point block))
forall (m :: * -> *) block.
TestFetchedBlockHeap m block -> STM m (Set (Point block))
getTestFetchedBlocks TestFetchedBlockHeap m block
blockHeap,
      mkAddFetchedBlock :: STM m (Point block -> block -> m ())
mkAddFetchedBlock      = (Point block -> block -> m ())
-> STM m (Point block -> block -> m ())
forall a. a -> STM m a
forall (f :: * -> *) a. Applicative f => a -> f a
pure ((Point block -> block -> m ())
 -> STM m (Point block -> block -> m ()))
-> (Point block -> block -> m ())
-> STM m (Point block -> block -> m ())
forall a b. (a -> b) -> a -> b
$ TestFetchedBlockHeap m block -> Point block -> block -> m ()
forall (m :: * -> *) block.
TestFetchedBlockHeap m block -> Point block -> block -> m ()
addTestFetchedBlock TestFetchedBlockHeap m block
blockHeap,

      HasCallStack =>
AnchoredFragment header -> AnchoredFragment header -> Bool
AnchoredFragment header -> AnchoredFragment header -> Bool
forall {block} {block}.
(HasHeader block, HasHeader block) =>
AnchoredFragment block -> AnchoredFragment block -> Bool
plausibleCandidateChain :: forall {block} {block}.
(HasHeader block, HasHeader block) =>
AnchoredFragment block -> AnchoredFragment block -> Bool
plausibleCandidateChain :: HasCallStack =>
AnchoredFragment header -> AnchoredFragment header -> Bool
plausibleCandidateChain,
      HasCallStack =>
AnchoredFragment header -> AnchoredFragment header -> Ordering
AnchoredFragment header -> AnchoredFragment header -> Ordering
forall {block} {block}.
(HasHeader block, HasHeader block) =>
AnchoredFragment block -> AnchoredFragment block -> Ordering
compareCandidateChains :: forall {block} {block}.
(HasHeader block, HasHeader block) =>
AnchoredFragment block -> AnchoredFragment block -> Ordering
compareCandidateChains :: HasCallStack =>
AnchoredFragment header -> AnchoredFragment header -> Ordering
compareCandidateChains,

      blockFetchSize :: header -> SizeInBytes
blockFetchSize         = \header
_ -> SizeInBytes
2000,
      blockMatchesHeader :: header -> block -> Bool
blockMatchesHeader     = \header
_ block
_ -> Bool
True,

      headerForgeUTCTime :: FromConsensus header -> STM m UTCTime
headerForgeUTCTime     = FromConsensus header -> STM m UTCTime
forall x. HasHeader x => FromConsensus x -> STM m UTCTime
headerFieldsForgeUTCTime,
      readChainSelStarvation :: STM m ChainSelStarvation
readChainSelStarvation = ChainSelStarvation -> STM m ChainSelStarvation
forall a. a -> STM m a
forall (f :: * -> *) a. Applicative f => a -> f a
pure (Time -> ChainSelStarvation
ChainSelStarvationEndedAt (DiffTime -> Time
Time DiffTime
0)),

      demoteChainSyncJumpingDynamo :: peer -> m ()
demoteChainSyncJumpingDynamo = \peer
_ -> () -> m ()
forall a. a -> m a
forall (f :: * -> *) a. Applicative f => a -> f a
pure ()
      }
  where
    plausibleCandidateChain :: AnchoredFragment block -> AnchoredFragment block -> Bool
plausibleCandidateChain AnchoredFragment block
cur AnchoredFragment block
candidate =
      AnchoredFragment block -> WithOrigin BlockNo
forall block.
HasHeader block =>
AnchoredFragment block -> WithOrigin BlockNo
AnchoredFragment.headBlockNo AnchoredFragment block
candidate WithOrigin BlockNo -> WithOrigin BlockNo -> Bool
forall a. Ord a => a -> a -> Bool
> AnchoredFragment block -> WithOrigin BlockNo
forall block.
HasHeader block =>
AnchoredFragment block -> WithOrigin BlockNo
AnchoredFragment.headBlockNo AnchoredFragment block
cur

    compareCandidateChains :: AnchoredFragment block -> AnchoredFragment block -> Ordering
compareCandidateChains AnchoredFragment block
c1 AnchoredFragment block
c2 =
      AnchoredFragment block -> WithOrigin BlockNo
forall block.
HasHeader block =>
AnchoredFragment block -> WithOrigin BlockNo
AnchoredFragment.headBlockNo AnchoredFragment block
c1 WithOrigin BlockNo -> WithOrigin BlockNo -> Ordering
forall a. Ord a => a -> a -> Ordering
`compare` AnchoredFragment block -> WithOrigin BlockNo
forall block.
HasHeader block =>
AnchoredFragment block -> WithOrigin BlockNo
AnchoredFragment.headBlockNo AnchoredFragment block
c2

-- | Roughly 10ms ping time and 1MBit\/s bandwidth, leads to ~2200 bytes in
-- flight minimum.
--
exampleFixedPeerGSVs :: PeerGSV
exampleFixedPeerGSVs :: PeerGSV
exampleFixedPeerGSVs =
    PeerGSV{Time
sampleTime :: Time
sampleTime :: Time
sampleTime, GSV
outboundGSV :: GSV
outboundGSV :: GSV
outboundGSV, GSV
inboundGSV :: GSV
inboundGSV :: GSV
inboundGSV}
  where
    inboundGSV :: GSV
inboundGSV  = DiffTime -> DiffTime -> Distribution DiffTime -> GSV
ballisticGSV DiffTime
10e-3 DiffTime
10e-6 (DiffTime -> Distribution DiffTime
forall n. n -> Distribution n
degenerateDistribution DiffTime
0)
    outboundGSV :: GSV
outboundGSV = GSV
inboundGSV
    sampleTime :: Time
sampleTime  = DiffTime -> Time
Time DiffTime
0


--
-- Utils to run fetch clients and servers
--

runFetchClient :: (MonadAsync m, MonadFork m, MonadMask m, MonadThrow (STM m),
                   MonadST m, MonadTime m, MonadTimer m, Ord peerid, Serialise
                   block, Serialise point, ShowProxy block)
                => Tracer m (TraceSendRecv (BlockFetch block point))
                -> version
                -> FetchClientRegistry peerid header block m
                -> peerid
                -> Channel m LBS.ByteString
                -> (  FetchClientContext header block m
                   -> ClientPipelined (BlockFetch block point) BFIdle m a)
                -> m a
runFetchClient :: forall (m :: * -> *) peerid block point version header a.
(MonadAsync m, MonadFork m, MonadMask m, MonadThrow (STM m),
 MonadST m, MonadTime m, MonadTimer m, Ord peerid, Serialise block,
 Serialise point, ShowProxy block) =>
Tracer m (TraceSendRecv (BlockFetch block point))
-> version
-> FetchClientRegistry peerid header block m
-> peerid
-> Channel m ByteString
-> (FetchClientContext header block m
    -> ClientPipelined (BlockFetch block point) 'BFIdle m a)
-> m a
runFetchClient Tracer m (TraceSendRecv (BlockFetch block point))
tracer version
version FetchClientRegistry peerid header block m
registry peerid
peerid Channel m ByteString
channel FetchClientContext header block m
-> ClientPipelined (BlockFetch block point) 'BFIdle m a
client =
    FetchClientRegistry peerid header block m
-> version
-> peerid
-> (FetchClientContext header block m -> m a)
-> m a
forall (m :: * -> *) a peer header block version.
(MonadFork m, MonadMask m, MonadTimer m, Ord peer) =>
FetchClientRegistry peer header block m
-> version
-> peer
-> (FetchClientContext header block m -> m a)
-> m a
bracketFetchClient FetchClientRegistry peerid header block m
registry version
version peerid
peerid ((FetchClientContext header block m -> m a) -> m a)
-> (FetchClientContext header block m -> m a) -> m a
forall a b. (a -> b) -> a -> b
$ \FetchClientContext header block m
clientCtx ->
      (a, Maybe ByteString) -> a
forall a b. (a, b) -> a
fst ((a, Maybe ByteString) -> a) -> m (a, Maybe ByteString) -> m a
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$>
        Tracer m (TraceSendRecv (BlockFetch block point))
-> Codec (BlockFetch block point) DeserialiseFailure m ByteString
-> ProtocolSizeLimits (BlockFetch block point) ByteString
-> ProtocolTimeLimits (BlockFetch block point)
-> Channel m ByteString
-> ClientPipelined (BlockFetch block point) 'BFIdle m a
-> m (a, Maybe ByteString)
forall ps (st :: ps) (pr :: PeerRole) failure bytes (m :: * -> *)
       a.
(MonadAsync m, MonadFork m, MonadMask m, MonadTimer m,
 MonadThrow (STM m), ShowProxy ps,
 forall (st' :: ps) stok. (stok ~ StateToken st') => Show stok,
 Show failure) =>
Tracer m (TraceSendRecv ps)
-> Codec ps failure m bytes
-> ProtocolSizeLimits ps bytes
-> ProtocolTimeLimits ps
-> Channel m bytes
-> PeerPipelined ps pr st m a
-> m (a, Maybe bytes)
runPipelinedPeerWithLimits Tracer m (TraceSendRecv (BlockFetch block point))
tracer Codec (BlockFetch block point) DeserialiseFailure m ByteString
codec ((ByteString -> Word)
-> ProtocolSizeLimits (BlockFetch block point) ByteString
forall {k} {k1} bytes (block :: k) (point :: k1).
(bytes -> Word)
-> ProtocolSizeLimits (BlockFetch block point) bytes
byteLimitsBlockFetch (Int64 -> Word
forall a b. (Integral a, Num b) => a -> b
fromIntegral (Int64 -> Word) -> (ByteString -> Int64) -> ByteString -> Word
forall b c a. (b -> c) -> (a -> b) -> a -> c
. ByteString -> Int64
LBS.length))
          ProtocolTimeLimits (BlockFetch block point)
forall {k} {k1} (block :: k) (point :: k1).
ProtocolTimeLimits (BlockFetch block point)
timeLimitsBlockFetch Channel m ByteString
channel (FetchClientContext header block m
-> ClientPipelined (BlockFetch block point) 'BFIdle m a
client FetchClientContext header block m
clientCtx)
  where
    codec :: Codec (BlockFetch block point) DeserialiseFailure m ByteString
codec = (block -> Encoding)
-> (forall s. Decoder s block)
-> (point -> Encoding)
-> (forall s. Decoder s point)
-> Codec (BlockFetch block point) DeserialiseFailure m ByteString
forall block point (m :: * -> *).
MonadST m =>
(block -> Encoding)
-> (forall s. Decoder s block)
-> (point -> Encoding)
-> (forall s. Decoder s point)
-> Codec (BlockFetch block point) DeserialiseFailure m ByteString
codecBlockFetch block -> Encoding
forall a. Serialise a => a -> Encoding
encode Decoder s block
forall s. Decoder s block
forall a s. Serialise a => Decoder s a
decode point -> Encoding
forall a. Serialise a => a -> Encoding
encode Decoder s point
forall s. Decoder s point
forall a s. Serialise a => Decoder s a
decode

runFetchServer :: (MonadAsync m, MonadFork m, MonadMask m, MonadThrow (STM m),
                   MonadST m, MonadTime m, MonadTimer m,
                   Serialise block, Serialise point,
                   ShowProxy block)
                => Tracer m (TraceSendRecv (BlockFetch block point))
                -> Channel m LBS.ByteString
                -> BlockFetchServer block point m a
                -> m a
runFetchServer :: forall (m :: * -> *) block point a.
(MonadAsync m, MonadFork m, MonadMask m, MonadThrow (STM m),
 MonadST m, MonadTime m, MonadTimer m, Serialise block,
 Serialise point, ShowProxy block) =>
Tracer m (TraceSendRecv (BlockFetch block point))
-> Channel m ByteString -> BlockFetchServer block point m a -> m a
runFetchServer Tracer m (TraceSendRecv (BlockFetch block point))
tracer Channel m ByteString
channel BlockFetchServer block point m a
server =
    (a, Maybe ByteString) -> a
forall a b. (a, b) -> a
fst ((a, Maybe ByteString) -> a) -> m (a, Maybe ByteString) -> m a
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$>
      Tracer m (TraceSendRecv (BlockFetch block point))
-> Codec (BlockFetch block point) DeserialiseFailure m ByteString
-> ProtocolSizeLimits (BlockFetch block point) ByteString
-> ProtocolTimeLimits (BlockFetch block point)
-> Channel m ByteString
-> Peer
     (BlockFetch block point) 'AsServer 'NonPipelined 'BFIdle m a
-> m (a, Maybe ByteString)
forall ps (st :: ps) (pr :: PeerRole) failure bytes (m :: * -> *)
       a.
(MonadAsync m, MonadFork m, MonadMask m, MonadThrow (STM m),
 MonadTimer m, ShowProxy ps,
 forall (st' :: ps) stok. (stok ~ StateToken st') => Show stok,
 Show failure) =>
Tracer m (TraceSendRecv ps)
-> Codec ps failure m bytes
-> ProtocolSizeLimits ps bytes
-> ProtocolTimeLimits ps
-> Channel m bytes
-> Peer ps pr 'NonPipelined st m a
-> m (a, Maybe bytes)
runPeerWithLimits Tracer m (TraceSendRecv (BlockFetch block point))
tracer Codec (BlockFetch block point) DeserialiseFailure m ByteString
codec ((ByteString -> Word)
-> ProtocolSizeLimits (BlockFetch block point) ByteString
forall {k} {k1} bytes (block :: k) (point :: k1).
(bytes -> Word)
-> ProtocolSizeLimits (BlockFetch block point) bytes
byteLimitsBlockFetch (Int64 -> Word
forall a b. (Integral a, Num b) => a -> b
fromIntegral (Int64 -> Word) -> (ByteString -> Int64) -> ByteString -> Word
forall b c a. (b -> c) -> (a -> b) -> a -> c
. ByteString -> Int64
LBS.length))
        ProtocolTimeLimits (BlockFetch block point)
forall {k} {k1} (block :: k) (point :: k1).
ProtocolTimeLimits (BlockFetch block point)
timeLimitsBlockFetch Channel m ByteString
channel (BlockFetchServer block point m a
-> Peer
     (BlockFetch block point) 'AsServer 'NonPipelined 'BFIdle m a
forall block point (m :: * -> *) a.
Functor m =>
BlockFetchServer block point m a
-> Server (BlockFetch block point) 'NonPipelined 'BFIdle m a
blockFetchServerPeer BlockFetchServer block point m a
server)
  where
    codec :: Codec (BlockFetch block point) DeserialiseFailure m ByteString
codec = (block -> Encoding)
-> (forall s. Decoder s block)
-> (point -> Encoding)
-> (forall s. Decoder s point)
-> Codec (BlockFetch block point) DeserialiseFailure m ByteString
forall block point (m :: * -> *).
MonadST m =>
(block -> Encoding)
-> (forall s. Decoder s block)
-> (point -> Encoding)
-> (forall s. Decoder s point)
-> Codec (BlockFetch block point) DeserialiseFailure m ByteString
codecBlockFetch block -> Encoding
forall a. Serialise a => a -> Encoding
encode Decoder s block
forall s. Decoder s block
forall a s. Serialise a => Decoder s a
decode point -> Encoding
forall a. Serialise a => a -> Encoding
encode Decoder s point
forall s. Decoder s point
forall a s. Serialise a => Decoder s a
decode

runFetchClientAndServerAsync
               :: forall peerid block header version m a b.
                  (MonadAsync m, MonadDelay m, MonadFork m, MonadMask m,
                   MonadThrow (STM m), MonadST m, MonadTime m, MonadTimer m,
                   Ord peerid, Show peerid,
                   Serialise block, Serialise (HeaderHash block),
                   ShowProxy block)
                => Tracer m (TraceSendRecv (BlockFetch block (Point block)))
                -> Tracer m (TraceSendRecv (BlockFetch block (Point block)))
                -> version
                -> Maybe DiffTime -- ^ client's channel delay
                -> Maybe DiffTime -- ^ server's channel delay
                -> FetchClientRegistry peerid header block m
                -> peerid
                -> (  FetchClientContext header block m
                   -> ClientPipelined (BlockFetch block (Point block)) BFIdle m a)
                -> BlockFetchServer block (Point block) m b
                -> m (Async m a, Async m b, Async m (), Async m ())
runFetchClientAndServerAsync :: forall peerid block header version (m :: * -> *) a b.
(MonadAsync m, MonadDelay m, MonadFork m, MonadMask m,
 MonadThrow (STM m), MonadST m, MonadTime m, MonadTimer m,
 Ord peerid, Show peerid, Serialise block,
 Serialise (HeaderHash block), ShowProxy block) =>
Tracer m (TraceSendRecv (BlockFetch block (Point block)))
-> Tracer m (TraceSendRecv (BlockFetch block (Point block)))
-> version
-> Maybe DiffTime
-> Maybe DiffTime
-> FetchClientRegistry peerid header block m
-> peerid
-> (FetchClientContext header block m
    -> ClientPipelined (BlockFetch block (Point block)) 'BFIdle m a)
-> BlockFetchServer block (Point block) m b
-> m (Async m a, Async m b, Async m (), Async m ())
runFetchClientAndServerAsync Tracer m (TraceSendRecv (BlockFetch block (Point block)))
clientTracer Tracer m (TraceSendRecv (BlockFetch block (Point block)))
serverTracer
                             version
version
                             Maybe DiffTime
clientDelay  Maybe DiffTime
serverDelay
                             FetchClientRegistry peerid header block m
registry peerid
peerid FetchClientContext header block m
-> ClientPipelined (BlockFetch block (Point block)) 'BFIdle m a
client BlockFetchServer block (Point block) m b
server = do
    (clientChannel, serverChannel) <- m (Channel m ByteString, Channel m ByteString)
forall (m :: * -> *) a. MonadSTM m => m (Channel m a, Channel m a)
createConnectedChannels

    clientAsync <- async $ do
      threadId <- myThreadId
      labelThread threadId ("block-fetch-client-" ++ show peerid)
      runFetchClient
        clientTracer
        version
        registry peerid
        (fromMaybe id (delayChannel <$> clientDelay) clientChannel)
        client

    serverAsync <- async $ do
      threadId <- myThreadId
      labelThread threadId ("block-fetch-server-" ++ show peerid)
      runFetchServer
        serverTracer
        (fromMaybe id (delayChannel <$> serverDelay) serverChannel)
        server

    -- we are tagging messages with the current peerid, not the target
    -- one, this is different than what's intended but it's fine to do that in
    -- these examples;
    syncClientAsync <- async $ do
      threadId <- myThreadId
      labelThread threadId ("registry-" ++ show peerid)
      bracketSyncWithFetchClient
        registry peerid
        (forever (threadDelay 1000) >> return ())
    keepAliveAsync <- async $ do
      threadId <- myThreadId
      labelThread threadId ("keep-alive-" ++ show peerid)
      bracketKeepAliveClient
        registry peerid
        (\StrictTVar m (Map peerid PeerGSV)
_ -> m () -> m Any
forall (f :: * -> *) a b. Applicative f => f a -> f b
forever (DiffTime -> m ()
forall (m :: * -> *). MonadDelay m => DiffTime -> m ()
threadDelay DiffTime
1000) m Any -> m () -> m ()
forall a b. m a -> m b -> m b
forall (m :: * -> *) a b. Monad m => m a -> m b -> m b
>> () -> m ()
forall a. a -> m a
forall (m :: * -> *) a. Monad m => a -> m a
return ())

    return (clientAsync, serverAsync, syncClientAsync, keepAliveAsync)


--
-- Mock block fetch servers
--

-- | A demo server for the block fetch protocol.
--
-- It serves up ranges on a single given 'AnchoredFragment'. It does not
-- simulate any delays, so is not suitable for timing-accurate simulations.
--
mockBlockFetchServer1 :: forall block m.
                        (MonadSTM m, HasHeader block)
                      => AnchoredFragment block
                      -> BlockFetchServer block (Point block) m ()
mockBlockFetchServer1 :: forall block (m :: * -> *).
(MonadSTM m, HasHeader block) =>
AnchoredFragment block -> BlockFetchServer block (Point block) m ()
mockBlockFetchServer1 AnchoredFragment block
chain =
    BlockFetchServer block (Point block) m ()
senderSide
  where
    senderSide :: BlockFetchServer block (Point block) m ()
    senderSide :: BlockFetchServer block (Point block) m ()
senderSide = (ChainRange (Point block)
 -> m (BlockFetchBlockSender block (Point block) m ()))
-> () -> BlockFetchServer block (Point block) m ()
forall point (m :: * -> *) block a.
(ChainRange point -> m (BlockFetchBlockSender block point m a))
-> a -> BlockFetchServer block point m a
BlockFetchServer ChainRange (Point block)
-> m (BlockFetchBlockSender block (Point block) m ())
receiveReq ()

    receiveReq :: ChainRange (Point block)
               -> m (BlockFetchBlockSender block (Point block) m ())
    receiveReq :: ChainRange (Point block)
-> m (BlockFetchBlockSender block (Point block) m ())
receiveReq (ChainRange Point block
lpoint Point block
upoint) =
      -- We can only assert this for tests, not for the real thing.
      Bool
-> m (BlockFetchBlockSender block (Point block) m ())
-> m (BlockFetchBlockSender block (Point block) m ())
forall a. HasCallStack => Bool -> a -> a
assert (Point block -> WithOrigin SlotNo
forall {k} (block :: k). Point block -> WithOrigin SlotNo
pointSlot Point block
lpoint WithOrigin SlotNo -> WithOrigin SlotNo -> Bool
forall a. Ord a => a -> a -> Bool
<= Point block -> WithOrigin SlotNo
forall {k} (block :: k). Point block -> WithOrigin SlotNo
pointSlot Point block
upoint) (m (BlockFetchBlockSender block (Point block) m ())
 -> m (BlockFetchBlockSender block (Point block) m ()))
-> m (BlockFetchBlockSender block (Point block) m ())
-> m (BlockFetchBlockSender block (Point block) m ())
forall a b. (a -> b) -> a -> b
$
      case AnchoredFragment block
-> Point block -> Point block -> Maybe (AnchoredFragment block)
forall block.
HasHeader block =>
AnchoredFragment block
-> Point block -> Point block -> Maybe (AnchoredFragment block)
AnchoredFragment.sliceRange AnchoredFragment block
chain Point block
lpoint Point block
upoint of
        Maybe (AnchoredFragment block)
Nothing     -> BlockFetchBlockSender block (Point block) m ()
-> m (BlockFetchBlockSender block (Point block) m ())
forall a. a -> m a
forall (m :: * -> *) a. Monad m => a -> m a
return (BlockFetchBlockSender block (Point block) m ()
 -> m (BlockFetchBlockSender block (Point block) m ()))
-> BlockFetchBlockSender block (Point block) m ()
-> m (BlockFetchBlockSender block (Point block) m ())
forall a b. (a -> b) -> a -> b
$ m (BlockFetchServer block (Point block) m ())
-> BlockFetchBlockSender block (Point block) m ()
forall (m :: * -> *) block point a.
m (BlockFetchServer block point m a)
-> BlockFetchBlockSender block point m a
SendMsgNoBlocks (BlockFetchServer block (Point block) m ()
-> m (BlockFetchServer block (Point block) m ())
forall a. a -> m a
forall (m :: * -> *) a. Monad m => a -> m a
return BlockFetchServer block (Point block) m ()
senderSide)
        Just AnchoredFragment block
chain' -> BlockFetchBlockSender block (Point block) m ()
-> m (BlockFetchBlockSender block (Point block) m ())
forall a. a -> m a
forall (m :: * -> *) a. Monad m => a -> m a
return (BlockFetchBlockSender block (Point block) m ()
 -> m (BlockFetchBlockSender block (Point block) m ()))
-> BlockFetchBlockSender block (Point block) m ()
-> m (BlockFetchBlockSender block (Point block) m ())
forall a b. (a -> b) -> a -> b
$ m (BlockFetchSendBlocks block (Point block) m ())
-> BlockFetchBlockSender block (Point block) m ()
forall (m :: * -> *) block point a.
m (BlockFetchSendBlocks block point m a)
-> BlockFetchBlockSender block point m a
SendMsgStartBatch ([block] -> m (BlockFetchSendBlocks block (Point block) m ())
sendBlocks [block]
blocks)
          where blocks :: [block]
blocks = AnchoredFragment block -> [block]
forall v a b. AnchoredSeq v a b -> [b]
AnchoredFragment.toOldestFirst AnchoredFragment block
chain'


    sendBlocks :: [block] -> m (BlockFetchSendBlocks block (Point block) m ())
    sendBlocks :: [block] -> m (BlockFetchSendBlocks block (Point block) m ())
sendBlocks []     = BlockFetchSendBlocks block (Point block) m ()
-> m (BlockFetchSendBlocks block (Point block) m ())
forall a. a -> m a
forall (m :: * -> *) a. Monad m => a -> m a
return (BlockFetchSendBlocks block (Point block) m ()
 -> m (BlockFetchSendBlocks block (Point block) m ()))
-> BlockFetchSendBlocks block (Point block) m ()
-> m (BlockFetchSendBlocks block (Point block) m ())
forall a b. (a -> b) -> a -> b
$ m (BlockFetchServer block (Point block) m ())
-> BlockFetchSendBlocks block (Point block) m ()
forall (m :: * -> *) block point a.
m (BlockFetchServer block point m a)
-> BlockFetchSendBlocks block point m a
SendMsgBatchDone (BlockFetchServer block (Point block) m ()
-> m (BlockFetchServer block (Point block) m ())
forall a. a -> m a
forall (m :: * -> *) a. Monad m => a -> m a
return BlockFetchServer block (Point block) m ()
senderSide)
    sendBlocks (block
b:[block]
bs) = BlockFetchSendBlocks block (Point block) m ()
-> m (BlockFetchSendBlocks block (Point block) m ())
forall a. a -> m a
forall (m :: * -> *) a. Monad m => a -> m a
return (BlockFetchSendBlocks block (Point block) m ()
 -> m (BlockFetchSendBlocks block (Point block) m ()))
-> BlockFetchSendBlocks block (Point block) m ()
-> m (BlockFetchSendBlocks block (Point block) m ())
forall a b. (a -> b) -> a -> b
$ block
-> m (BlockFetchSendBlocks block (Point block) m ())
-> BlockFetchSendBlocks block (Point block) m ()
forall block (m :: * -> *) point a.
block
-> m (BlockFetchSendBlocks block point m a)
-> BlockFetchSendBlocks block point m a
SendMsgBlock block
b ([block] -> m (BlockFetchSendBlocks block (Point block) m ())
sendBlocks [block]
bs)


--
-- Mock downloaded block heap
--

-- | This provides an interface to a collection of dowloaded blocks. This is
-- enough to implement the 'addFetchedBlock' and 'readFetchedBlocks' methods
-- in the 'BlockFetchConsensusInterface' and related interfaces.
--
-- The interface is enough to use in examples and tests.
--
data TestFetchedBlockHeap m block = TestFetchedBlockHeap {
       forall (m :: * -> *) block.
TestFetchedBlockHeap m block -> STM m (Set (Point block))
getTestFetchedBlocks :: STM m (Set (Point block)),
       forall (m :: * -> *) block.
TestFetchedBlockHeap m block -> Point block -> block -> m ()
addTestFetchedBlock  :: Point block -> block -> m ()
     }

-- | Make a 'TestFetchedBlockHeap' using a simple in-memory 'Map', stored in an
-- 'STM' 'TVar'.
--
-- This is suitable for examples and tests.
--
mkTestFetchedBlockHeap :: (MonadSTM m, Ord (Point block))
                       => [Point block]
                       -> m (TestFetchedBlockHeap m block)
mkTestFetchedBlockHeap :: forall (m :: * -> *) block.
(MonadSTM m, Ord (Point block)) =>
[Point block] -> m (TestFetchedBlockHeap m block)
mkTestFetchedBlockHeap [Point block]
points = do
    v <- Set (Point block) -> m (StrictTVar m (Set (Point block)))
forall (m :: * -> *) a. MonadSTM m => a -> m (StrictTVar m a)
newTVarIO ([Point block] -> Set (Point block)
forall a. Ord a => [a] -> Set a
Set.fromList [Point block]
points)
    return TestFetchedBlockHeap {
      getTestFetchedBlocks = readTVar v,
      addTestFetchedBlock  = \Point block
p block
_b -> STM m () -> m ()
forall a. HasCallStack => STM m a -> m a
forall (m :: * -> *) a.
(MonadSTM m, HasCallStack) =>
STM m a -> m a
atomically (StrictTVar m (Set (Point block))
-> (Set (Point block) -> Set (Point block)) -> STM m ()
forall (m :: * -> *) a.
MonadSTM m =>
StrictTVar m a -> (a -> a) -> STM m ()
modifyTVar StrictTVar m (Set (Point block))
v (Point block -> Set (Point block) -> Set (Point block)
forall a. Ord a => a -> Set a -> Set a
Set.insert Point block
p))
    }