{-# LANGUAGE DataKinds #-}
{-# LANGUAGE FlexibleContexts #-}
{-# LANGUAGE NamedFieldPuns #-}
{-# LANGUAGE Rank2Types #-}
{-# LANGUAGE ScopedTypeVariables #-}
{-# LANGUAGE TypeFamilies #-}
module Ouroboros.Network.BlockFetch.Examples
( blockFetchExample0
, blockFetchExample1
, mockBlockFetchServer1
, exampleFixedPeerGSVs
) where
import Codec.Serialise (Serialise (..))
import Data.ByteString.Lazy qualified as LBS
import Data.Foldable (traverse_)
import Data.List as List (foldl')
import Data.Map (Map)
import Data.Map.Strict qualified as Map
import Data.Maybe (fromMaybe)
import Data.Set (Set)
import Data.Set qualified as Set
import Control.Concurrent.Class.MonadSTM.Strict
import Control.Exception (assert)
import Control.Monad (forever)
import Control.Monad.Class.MonadAsync
import Control.Monad.Class.MonadFork
import Control.Monad.Class.MonadST
import Control.Monad.Class.MonadThrow
import Control.Monad.Class.MonadTime.SI
import Control.Monad.Class.MonadTimer.SI
import Control.Tracer (Tracer, contramap, nullTracer)
import Ouroboros.Network.AnchoredFragment (AnchoredFragment, anchorPoint)
import Ouroboros.Network.AnchoredFragment qualified as AnchoredFragment
import Ouroboros.Network.Block
import Network.TypedProtocol.Peer.Client
import Ouroboros.Network.AnchoredFragment qualified as AF
import Ouroboros.Network.BlockFetch
import Ouroboros.Network.BlockFetch.Client
import Ouroboros.Network.BlockFetch.ConsensusInterface (ChainSelStarvation (..))
import Ouroboros.Network.Channel
import Ouroboros.Network.ControlMessage
import Ouroboros.Network.DeltaQ
import Ouroboros.Network.Driver
import Ouroboros.Network.NodeToNode (NodeToNodeVersion (..))
import Ouroboros.Network.Protocol.BlockFetch.Codec
import Ouroboros.Network.Protocol.BlockFetch.Server
import Ouroboros.Network.Protocol.BlockFetch.Type
import Ouroboros.Network.Util.ShowProxy
import Ouroboros.Network.BlockFetch.Decision.Trace (TraceDecisionEvent)
import Ouroboros.Network.Mock.ConcreteBlock
blockFetchExample0 :: forall m.
(MonadST m, MonadAsync m, MonadDelay m, MonadFork m,
MonadTime m, MonadTimer m, MonadMask m, MonadThrow (STM m))
=> FetchMode
-> Tracer m (TraceDecisionEvent Int BlockHeader)
-> Tracer m (TraceLabelPeer Int
(TraceFetchClientState BlockHeader))
-> Tracer m (TraceLabelPeer Int
(TraceSendRecv (BlockFetch Block (Point Block))))
-> Maybe DiffTime
-> Maybe DiffTime
-> ControlMessageSTM m
-> AnchoredFragment Block
-> AnchoredFragment Block
-> m ()
blockFetchExample0 :: forall (m :: * -> *).
(MonadST m, MonadAsync m, MonadDelay m, MonadFork m, MonadTime m,
MonadTimer m, MonadMask m, MonadThrow (STM m)) =>
FetchMode
-> Tracer m (TraceDecisionEvent Int BlockHeader)
-> Tracer
m (TraceLabelPeer Int (TraceFetchClientState BlockHeader))
-> Tracer
m
(TraceLabelPeer
Int (TraceSendRecv (BlockFetch Block (Point Block))))
-> Maybe DiffTime
-> Maybe DiffTime
-> ControlMessageSTM m
-> AnchoredFragment Block
-> AnchoredFragment Block
-> m ()
blockFetchExample0 FetchMode
fetchMode Tracer m (TraceDecisionEvent Int BlockHeader)
decisionTracer Tracer m (TraceLabelPeer Int (TraceFetchClientState BlockHeader))
clientStateTracer Tracer
m
(TraceLabelPeer
Int (TraceSendRecv (BlockFetch Block (Point Block))))
clientMsgTracer
Maybe DiffTime
clientDelay Maybe DiffTime
serverDelay
ControlMessageSTM m
controlMessageSTM
AnchoredFragment Block
currentChain AnchoredFragment Block
candidateChain = do
registry <- m (FetchClientRegistry Int BlockHeader Block m)
forall (m :: * -> *) peer header block.
MonadSTM m =>
m (FetchClientRegistry peer header block m)
newFetchClientRegistry :: m (FetchClientRegistry Int BlockHeader Block m)
blockHeap <- mkTestFetchedBlockHeap (anchoredChainPoints currentChain)
(clientAsync, serverAsync, syncClientAsync, keepAliveAsync)
<- runFetchClientAndServerAsync
(contramap (TraceLabelPeer peerno) clientMsgTracer)
(contramap (TraceLabelPeer peerno) serverMsgTracer)
(maxBound :: NodeToNodeVersion)
clientDelay serverDelay
registry peerno
(blockFetchClient (maxBound :: NodeToNodeVersion) controlMessageSTM nullTracer)
(mockBlockFetchServer1 candidateChain)
fetchAsync <- async $ do
threadId <- myThreadId
labelThread threadId "block-fetch-logic"
blockFetch registry blockHeap
driverAsync <- async $ do
threadId <- myThreadId
labelThread threadId "driver"
driver blockHeap
atomically $ controlMessageSTM >>= check . (Terminate ==)
cancel fetchAsync
_ <- waitAnyCancel [ driverAsync, clientAsync, serverAsync
, syncClientAsync, keepAliveAsync
]
return ()
where
peerno :: Int
peerno = Int
1
serverMsgTracer :: Tracer m a
serverMsgTracer = Tracer m a
forall (m :: * -> *) a. Applicative m => Tracer m a
nullTracer
currentChainHeaders :: AnchoredFragment BlockHeader
currentChainHeaders =
(Block -> BlockHeader)
-> AnchoredFragment Block -> AnchoredFragment BlockHeader
forall block2 block1.
(HasHeader block2, HeaderHash block1 ~ HeaderHash block2) =>
(block1 -> block2)
-> AnchoredFragment block1 -> AnchoredFragment block2
AnchoredFragment.mapAnchoredFragment Block -> BlockHeader
blockHeader AnchoredFragment Block
currentChain
candidateChainHeaders :: Map Int (AnchoredFragment BlockHeader)
candidateChainHeaders =
[(Int, AnchoredFragment BlockHeader)]
-> Map Int (AnchoredFragment BlockHeader)
forall k a. Ord k => [(k, a)] -> Map k a
Map.fromList ([(Int, AnchoredFragment BlockHeader)]
-> Map Int (AnchoredFragment BlockHeader))
-> [(Int, AnchoredFragment BlockHeader)]
-> Map Int (AnchoredFragment BlockHeader)
forall a b. (a -> b) -> a -> b
$ [Int]
-> [AnchoredFragment BlockHeader]
-> [(Int, AnchoredFragment BlockHeader)]
forall a b. [a] -> [b] -> [(a, b)]
zip [Int
1..] ([AnchoredFragment BlockHeader]
-> [(Int, AnchoredFragment BlockHeader)])
-> [AnchoredFragment BlockHeader]
-> [(Int, AnchoredFragment BlockHeader)]
forall a b. (a -> b) -> a -> b
$
(AnchoredFragment Block -> AnchoredFragment BlockHeader)
-> [AnchoredFragment Block] -> [AnchoredFragment BlockHeader]
forall a b. (a -> b) -> [a] -> [b]
map ((Block -> BlockHeader)
-> AnchoredFragment Block -> AnchoredFragment BlockHeader
forall block2 block1.
(HasHeader block2, HeaderHash block1 ~ HeaderHash block2) =>
(block1 -> block2)
-> AnchoredFragment block1 -> AnchoredFragment block2
AnchoredFragment.mapAnchoredFragment Block -> BlockHeader
blockHeader) [AnchoredFragment Block
candidateChain]
anchoredChainPoints :: AnchoredFragment block -> [Point block]
anchoredChainPoints AnchoredFragment block
c = AnchoredFragment block -> Point block
forall block. AnchoredFragment block -> Point block
anchorPoint AnchoredFragment block
c
Point block -> [Point block] -> [Point block]
forall a. a -> [a] -> [a]
: (block -> Point block) -> [block] -> [Point block]
forall a b. (a -> b) -> [a] -> [b]
map block -> Point block
forall block. HasHeader block => block -> Point block
blockPoint (AnchoredFragment block -> [block]
forall v a b. AnchoredSeq v a b -> [b]
AnchoredFragment.toOldestFirst AnchoredFragment block
c)
blockFetch :: FetchClientRegistry Int BlockHeader Block m
-> TestFetchedBlockHeap m Block
-> m ()
blockFetch :: FetchClientRegistry Int BlockHeader Block m
-> TestFetchedBlockHeap m Block -> m ()
blockFetch FetchClientRegistry Int BlockHeader Block m
registry TestFetchedBlockHeap m Block
blockHeap =
Tracer m (TraceDecisionEvent Int BlockHeader)
-> Tracer
m (TraceLabelPeer Int (TraceFetchClientState BlockHeader))
-> BlockFetchConsensusInterface Int BlockHeader Block m
-> FetchClientRegistry Int BlockHeader Block m
-> BlockFetchConfiguration
-> m Void
forall addr header block (m :: * -> *).
(HasHeader header, HasHeader block,
HeaderHash header ~ HeaderHash block, MonadDelay m, MonadTimer m,
Ord addr, Hashable addr) =>
Tracer m (TraceDecisionEvent addr header)
-> Tracer m (TraceLabelPeer addr (TraceFetchClientState header))
-> BlockFetchConsensusInterface addr header block m
-> FetchClientRegistry addr header block m
-> BlockFetchConfiguration
-> m Void
blockFetchLogic
Tracer m (TraceDecisionEvent Int BlockHeader)
decisionTracer Tracer m (TraceLabelPeer Int (TraceFetchClientState BlockHeader))
clientStateTracer
(FetchMode
-> (forall x. HasHeader x => FromConsensus x -> STM m UTCTime)
-> TestFetchedBlockHeap m Block
-> AnchoredFragment BlockHeader
-> Map Int (AnchoredFragment BlockHeader)
-> BlockFetchConsensusInterface Int BlockHeader Block m
forall (m :: * -> *) header block peer.
(MonadSTM m, HasHeader header, HasHeader block) =>
FetchMode
-> (forall x. HasHeader x => FromConsensus x -> STM m UTCTime)
-> TestFetchedBlockHeap m block
-> AnchoredFragment header
-> Map peer (AnchoredFragment header)
-> BlockFetchConsensusInterface peer header block m
sampleBlockFetchPolicy1 FetchMode
fetchMode FromConsensus x -> STM m UTCTime
forall x. HasHeader x => FromConsensus x -> STM m UTCTime
forall {f :: * -> *} {b}.
(Applicative f, HasHeader b) =>
FromConsensus b -> f UTCTime
headerForgeUTCTime TestFetchedBlockHeap m Block
blockHeap AnchoredFragment BlockHeader
currentChainHeaders Map Int (AnchoredFragment BlockHeader)
candidateChainHeaders)
FetchClientRegistry Int BlockHeader Block m
registry
(BlockFetchConfiguration {
bfcMaxConcurrencyBulkSync :: Word
bfcMaxConcurrencyBulkSync = Word
1,
bfcMaxConcurrencyDeadline :: Word
bfcMaxConcurrencyDeadline = Word
2,
bfcMaxRequestsInflight :: Word
bfcMaxRequestsInflight = Word
10,
bfcDecisionLoopIntervalGenesis :: DiffTime
bfcDecisionLoopIntervalGenesis = DiffTime
0.04,
bfcDecisionLoopIntervalPraos :: DiffTime
bfcDecisionLoopIntervalPraos = DiffTime
0.01,
bfcSalt :: Int
bfcSalt = Int
0,
bfcGenesisBFConfig :: GenesisBlockFetchConfiguration
bfcGenesisBFConfig = GenesisBlockFetchConfiguration
{ gbfcGracePeriod :: DiffTime
gbfcGracePeriod = DiffTime
10
}
})
m Void -> m () -> m ()
forall a b. m a -> m b -> m b
forall (m :: * -> *) a b. Monad m => m a -> m b -> m b
>> () -> m ()
forall a. a -> m a
forall (m :: * -> *) a. Monad m => a -> m a
return ()
headerForgeUTCTime :: FromConsensus b -> f UTCTime
headerForgeUTCTime (FromConsensus b
x) =
UTCTime -> f UTCTime
forall a. a -> f a
forall (f :: * -> *) a. Applicative f => a -> f a
pure (UTCTime -> f UTCTime) -> UTCTime -> f UTCTime
forall a b. (a -> b) -> a -> b
$ SlotNo -> UTCTime
convertSlotToTimeForTestsAssumingNoHardFork (b -> SlotNo
forall b. HasHeader b => b -> SlotNo
blockSlot b
x)
driver :: TestFetchedBlockHeap m Block -> m ()
driver :: TestFetchedBlockHeap m Block -> m ()
driver TestFetchedBlockHeap m Block
blockHeap = do
STM m () -> m ()
forall a. HasCallStack => STM m a -> m a
forall (m :: * -> *) a.
(MonadSTM m, HasCallStack) =>
STM m a -> m a
atomically (STM m () -> m ()) -> STM m () -> m ()
forall a b. (a -> b) -> a -> b
$ do
heap <- TestFetchedBlockHeap m Block -> STM m (Set (Point Block))
forall (m :: * -> *) block.
TestFetchedBlockHeap m block -> STM m (Set (Point block))
getTestFetchedBlocks TestFetchedBlockHeap m Block
blockHeap
check $
all (\AnchoredFragment Block
c -> AnchoredFragment Block -> Point Block
forall block.
HasHeader block =>
AnchoredFragment block -> Point block
AnchoredFragment.headPoint AnchoredFragment Block
c Point Block -> Set (Point Block) -> Bool
forall a. Ord a => a -> Set a -> Bool
`Set.member` Set (Point Block)
heap)
[candidateChain]
blockFetchExample1 :: forall m.
(MonadST m, MonadAsync m, MonadDelay m, MonadFork m,
MonadTime m, MonadTimer m, MonadMask m, MonadThrow (STM m))
=> FetchMode
-> Tracer m (TraceDecisionEvent Int BlockHeader)
-> Tracer m (TraceLabelPeer Int
(TraceFetchClientState BlockHeader))
-> Tracer m (TraceLabelPeer Int
(TraceSendRecv (BlockFetch Block (Point Block))))
-> Maybe DiffTime
-> Maybe DiffTime
-> AnchoredFragment Block
-> [AnchoredFragment Block]
-> m ()
blockFetchExample1 :: forall (m :: * -> *).
(MonadST m, MonadAsync m, MonadDelay m, MonadFork m, MonadTime m,
MonadTimer m, MonadMask m, MonadThrow (STM m)) =>
FetchMode
-> Tracer m (TraceDecisionEvent Int BlockHeader)
-> Tracer
m (TraceLabelPeer Int (TraceFetchClientState BlockHeader))
-> Tracer
m
(TraceLabelPeer
Int (TraceSendRecv (BlockFetch Block (Point Block))))
-> Maybe DiffTime
-> Maybe DiffTime
-> AnchoredFragment Block
-> [AnchoredFragment Block]
-> m ()
blockFetchExample1 FetchMode
fetchMode Tracer m (TraceDecisionEvent Int BlockHeader)
decisionTracer Tracer m (TraceLabelPeer Int (TraceFetchClientState BlockHeader))
clientStateTracer Tracer
m
(TraceLabelPeer
Int (TraceSendRecv (BlockFetch Block (Point Block))))
clientMsgTracer
Maybe DiffTime
clientDelay Maybe DiffTime
serverDelay
AnchoredFragment Block
currentChain [AnchoredFragment Block]
candidateChains = do
controlMessageVar <- ControlMessage -> m (StrictTVar m ControlMessage)
forall (m :: * -> *) a. MonadSTM m => a -> m (StrictTVar m a)
newTVarIO ControlMessage
Continue
let controlMessageSTM = StrictTVar m ControlMessage -> STM m ControlMessage
forall (m :: * -> *) a. MonadSTM m => StrictTVar m a -> STM m a
readTVar StrictTVar m ControlMessage
controlMessageVar
registry <- newFetchClientRegistry
blockHeap <- mkTestFetchedBlockHeap (anchoredChainPoints currentChain)
peerAsyncs <- sequence
[ runFetchClientAndServerAsync
(contramap (TraceLabelPeer peerno) clientMsgTracer)
(contramap (TraceLabelPeer peerno) serverMsgTracer)
(maxBound :: NodeToNodeVersion)
clientDelay serverDelay
registry peerno
(blockFetchClient (maxBound :: NodeToNodeVersion) controlMessageSTM nullTracer)
(mockBlockFetchServer1 candidateChain)
| (peerno, candidateChain) <- zip [1..] candidateChains
]
fetchAsync <- async $ do
threadId <- myThreadId
labelThread threadId "block-fetch-logic"
blockFetch registry blockHeap
driverAsync <- async $ do
threadId <- myThreadId
labelThread threadId "block-fetch-driver"
downloadTimer
_ <- waitAnyCancel $ [ fetchAsync, driverAsync ]
++ [ peerAsync
| (_, server, sync, ks) <- peerAsyncs
, peerAsync <- [server, sync, ks] ]
atomically $ writeTVar controlMessageVar Terminate
traverse_ (\(Async m ()
client,Async m ()
_,Async m ()
_,Async m ()
_) -> Async m () -> m (Either SomeException ())
forall a. Async m a -> m (Either SomeException a)
forall (m :: * -> *) a.
MonadAsync m =>
Async m a -> m (Either SomeException a)
waitCatch Async m ()
client) peerAsyncs
where
serverMsgTracer :: Tracer m a
serverMsgTracer = Tracer m a
forall (m :: * -> *) a. Applicative m => Tracer m a
nullTracer
currentChainHeaders :: AnchoredFragment BlockHeader
currentChainHeaders =
(Block -> BlockHeader)
-> AnchoredFragment Block -> AnchoredFragment BlockHeader
forall block2 block1.
(HasHeader block2, HeaderHash block1 ~ HeaderHash block2) =>
(block1 -> block2)
-> AnchoredFragment block1 -> AnchoredFragment block2
AnchoredFragment.mapAnchoredFragment Block -> BlockHeader
blockHeader AnchoredFragment Block
currentChain
candidateChainHeaders :: Map Int (AnchoredFragment BlockHeader)
candidateChainHeaders =
[(Int, AnchoredFragment BlockHeader)]
-> Map Int (AnchoredFragment BlockHeader)
forall k a. Ord k => [(k, a)] -> Map k a
Map.fromList ([(Int, AnchoredFragment BlockHeader)]
-> Map Int (AnchoredFragment BlockHeader))
-> [(Int, AnchoredFragment BlockHeader)]
-> Map Int (AnchoredFragment BlockHeader)
forall a b. (a -> b) -> a -> b
$ [Int]
-> [AnchoredFragment BlockHeader]
-> [(Int, AnchoredFragment BlockHeader)]
forall a b. [a] -> [b] -> [(a, b)]
zip [Int
1..] ([AnchoredFragment BlockHeader]
-> [(Int, AnchoredFragment BlockHeader)])
-> [AnchoredFragment BlockHeader]
-> [(Int, AnchoredFragment BlockHeader)]
forall a b. (a -> b) -> a -> b
$
(AnchoredFragment Block -> AnchoredFragment BlockHeader)
-> [AnchoredFragment Block] -> [AnchoredFragment BlockHeader]
forall a b. (a -> b) -> [a] -> [b]
map ((Block -> BlockHeader)
-> AnchoredFragment Block -> AnchoredFragment BlockHeader
forall block2 block1.
(HasHeader block2, HeaderHash block1 ~ HeaderHash block2) =>
(block1 -> block2)
-> AnchoredFragment block1 -> AnchoredFragment block2
AnchoredFragment.mapAnchoredFragment Block -> BlockHeader
blockHeader) [AnchoredFragment Block]
candidateChains
anchoredChainPoints :: AnchoredFragment block -> [Point block]
anchoredChainPoints AnchoredFragment block
c = AnchoredFragment block -> Point block
forall block. AnchoredFragment block -> Point block
anchorPoint AnchoredFragment block
c
Point block -> [Point block] -> [Point block]
forall a. a -> [a] -> [a]
: (block -> Point block) -> [block] -> [Point block]
forall a b. (a -> b) -> [a] -> [b]
map block -> Point block
forall block. HasHeader block => block -> Point block
blockPoint (AnchoredFragment block -> [block]
forall v a b. AnchoredSeq v a b -> [b]
AnchoredFragment.toOldestFirst AnchoredFragment block
c)
blockFetch :: FetchClientRegistry Int BlockHeader Block m
-> TestFetchedBlockHeap m Block
-> m ()
blockFetch :: FetchClientRegistry Int BlockHeader Block m
-> TestFetchedBlockHeap m Block -> m ()
blockFetch FetchClientRegistry Int BlockHeader Block m
registry TestFetchedBlockHeap m Block
blockHeap =
Tracer m (TraceDecisionEvent Int BlockHeader)
-> Tracer
m (TraceLabelPeer Int (TraceFetchClientState BlockHeader))
-> BlockFetchConsensusInterface Int BlockHeader Block m
-> FetchClientRegistry Int BlockHeader Block m
-> BlockFetchConfiguration
-> m Void
forall addr header block (m :: * -> *).
(HasHeader header, HasHeader block,
HeaderHash header ~ HeaderHash block, MonadDelay m, MonadTimer m,
Ord addr, Hashable addr) =>
Tracer m (TraceDecisionEvent addr header)
-> Tracer m (TraceLabelPeer addr (TraceFetchClientState header))
-> BlockFetchConsensusInterface addr header block m
-> FetchClientRegistry addr header block m
-> BlockFetchConfiguration
-> m Void
blockFetchLogic
Tracer m (TraceDecisionEvent Int BlockHeader)
decisionTracer Tracer m (TraceLabelPeer Int (TraceFetchClientState BlockHeader))
clientStateTracer
(FetchMode
-> (forall x. HasHeader x => FromConsensus x -> STM m UTCTime)
-> TestFetchedBlockHeap m Block
-> AnchoredFragment BlockHeader
-> Map Int (AnchoredFragment BlockHeader)
-> BlockFetchConsensusInterface Int BlockHeader Block m
forall (m :: * -> *) header block peer.
(MonadSTM m, HasHeader header, HasHeader block) =>
FetchMode
-> (forall x. HasHeader x => FromConsensus x -> STM m UTCTime)
-> TestFetchedBlockHeap m block
-> AnchoredFragment header
-> Map peer (AnchoredFragment header)
-> BlockFetchConsensusInterface peer header block m
sampleBlockFetchPolicy1 FetchMode
fetchMode FromConsensus x -> STM m UTCTime
forall x. HasHeader x => FromConsensus x -> STM m UTCTime
forall {f :: * -> *} {b}.
(Applicative f, HasHeader b) =>
FromConsensus b -> f UTCTime
headerForgeUTCTime TestFetchedBlockHeap m Block
blockHeap AnchoredFragment BlockHeader
currentChainHeaders Map Int (AnchoredFragment BlockHeader)
candidateChainHeaders)
FetchClientRegistry Int BlockHeader Block m
registry
(BlockFetchConfiguration {
bfcMaxConcurrencyBulkSync :: Word
bfcMaxConcurrencyBulkSync = Word
1,
bfcMaxConcurrencyDeadline :: Word
bfcMaxConcurrencyDeadline = Word
2,
bfcMaxRequestsInflight :: Word
bfcMaxRequestsInflight = Word
10,
bfcDecisionLoopIntervalGenesis :: DiffTime
bfcDecisionLoopIntervalGenesis = DiffTime
0.04,
bfcDecisionLoopIntervalPraos :: DiffTime
bfcDecisionLoopIntervalPraos = DiffTime
0.01,
bfcSalt :: Int
bfcSalt = Int
0,
bfcGenesisBFConfig :: GenesisBlockFetchConfiguration
bfcGenesisBFConfig = GenesisBlockFetchConfiguration
{ gbfcGracePeriod :: DiffTime
gbfcGracePeriod = DiffTime
10
}
})
m Void -> m () -> m ()
forall a b. m a -> m b -> m b
forall (m :: * -> *) a b. Monad m => m a -> m b -> m b
>> () -> m ()
forall a. a -> m a
forall (m :: * -> *) a. Monad m => a -> m a
return ()
headerForgeUTCTime :: FromConsensus b -> f UTCTime
headerForgeUTCTime (FromConsensus b
x) =
UTCTime -> f UTCTime
forall a. a -> f a
forall (f :: * -> *) a. Applicative f => a -> f a
pure (UTCTime -> f UTCTime) -> UTCTime -> f UTCTime
forall a b. (a -> b) -> a -> b
$ SlotNo -> UTCTime
convertSlotToTimeForTestsAssumingNoHardFork (b -> SlotNo
forall b. HasHeader b => b -> SlotNo
blockSlot b
x)
downloadTimer :: m ()
downloadTimer :: m ()
downloadTimer =
let totalBlocks :: Int
totalBlocks = [Int] -> Int
forall a. Num a => [a] -> a
forall (t :: * -> *) a. (Foldable t, Num a) => t a -> a
sum ([Int] -> Int) -> [Int] -> Int
forall a b. (a -> b) -> a -> b
$ (AnchoredFragment Block -> Int)
-> [AnchoredFragment Block] -> [Int]
forall a b. (a -> b) -> [a] -> [b]
map AnchoredFragment Block -> Int
forall v a b. Anchorable v a b => AnchoredSeq v a b -> Int
AF.length [AnchoredFragment Block]
candidateChains
in DiffTime -> m ()
forall (m :: * -> *). MonadDelay m => DiffTime -> m ()
threadDelay (Int -> DiffTime
forall a b. (Integral a, Num b) => a -> b
fromIntegral Int
totalBlocks)
sampleBlockFetchPolicy1 :: (MonadSTM m, HasHeader header, HasHeader block)
=> FetchMode
-> (forall x. HasHeader x => FromConsensus x -> STM m UTCTime)
-> TestFetchedBlockHeap m block
-> AnchoredFragment header
-> Map peer (AnchoredFragment header)
-> BlockFetchConsensusInterface peer header block m
sampleBlockFetchPolicy1 :: forall (m :: * -> *) header block peer.
(MonadSTM m, HasHeader header, HasHeader block) =>
FetchMode
-> (forall x. HasHeader x => FromConsensus x -> STM m UTCTime)
-> TestFetchedBlockHeap m block
-> AnchoredFragment header
-> Map peer (AnchoredFragment header)
-> BlockFetchConsensusInterface peer header block m
sampleBlockFetchPolicy1 FetchMode
fetchMode forall x. HasHeader x => FromConsensus x -> STM m UTCTime
headerFieldsForgeUTCTime TestFetchedBlockHeap m block
blockHeap AnchoredFragment header
currentChain Map peer (AnchoredFragment header)
candidateChains =
BlockFetchConsensusInterface {
readCandidateChains :: STM m (Map peer (AnchoredFragment header))
readCandidateChains = Map peer (AnchoredFragment header)
-> STM m (Map peer (AnchoredFragment header))
forall a. a -> STM m a
forall (m :: * -> *) a. Monad m => a -> m a
return Map peer (AnchoredFragment header)
candidateChains,
readCurrentChain :: STM m (AnchoredFragment header)
readCurrentChain = AnchoredFragment header -> STM m (AnchoredFragment header)
forall a. a -> STM m a
forall (m :: * -> *) a. Monad m => a -> m a
return AnchoredFragment header
currentChain,
readFetchMode :: STM m FetchMode
readFetchMode = FetchMode -> STM m FetchMode
forall a. a -> STM m a
forall (m :: * -> *) a. Monad m => a -> m a
return FetchMode
fetchMode,
readFetchedBlocks :: STM m (Point block -> Bool)
readFetchedBlocks = (Point block -> Set (Point block) -> Bool)
-> Set (Point block) -> Point block -> Bool
forall a b c. (a -> b -> c) -> b -> a -> c
flip Point block -> Set (Point block) -> Bool
forall a. Ord a => a -> Set a -> Bool
Set.member (Set (Point block) -> Point block -> Bool)
-> STM m (Set (Point block)) -> STM m (Point block -> Bool)
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$>
TestFetchedBlockHeap m block -> STM m (Set (Point block))
forall (m :: * -> *) block.
TestFetchedBlockHeap m block -> STM m (Set (Point block))
getTestFetchedBlocks TestFetchedBlockHeap m block
blockHeap,
readFetchedMaxSlotNo :: STM m MaxSlotNo
readFetchedMaxSlotNo = (MaxSlotNo -> MaxSlotNo -> MaxSlotNo)
-> MaxSlotNo -> [MaxSlotNo] -> MaxSlotNo
forall b a. (b -> a -> b) -> b -> [a] -> b
forall (t :: * -> *) b a.
Foldable t =>
(b -> a -> b) -> b -> t a -> b
List.foldl' MaxSlotNo -> MaxSlotNo -> MaxSlotNo
forall a. Ord a => a -> a -> a
max MaxSlotNo
NoMaxSlotNo ([MaxSlotNo] -> MaxSlotNo)
-> (Set (Point block) -> [MaxSlotNo])
-> Set (Point block)
-> MaxSlotNo
forall b c a. (b -> c) -> (a -> b) -> a -> c
.
(Point block -> MaxSlotNo) -> [Point block] -> [MaxSlotNo]
forall a b. (a -> b) -> [a] -> [b]
map (WithOrigin SlotNo -> MaxSlotNo
maxSlotNoFromWithOrigin (WithOrigin SlotNo -> MaxSlotNo)
-> (Point block -> WithOrigin SlotNo) -> Point block -> MaxSlotNo
forall b c a. (b -> c) -> (a -> b) -> a -> c
. Point block -> WithOrigin SlotNo
forall {k} (block :: k). Point block -> WithOrigin SlotNo
pointSlot) ([Point block] -> [MaxSlotNo])
-> (Set (Point block) -> [Point block])
-> Set (Point block)
-> [MaxSlotNo]
forall b c a. (b -> c) -> (a -> b) -> a -> c
.
Set (Point block) -> [Point block]
forall a. Set a -> [a]
Set.elems (Set (Point block) -> MaxSlotNo)
-> STM m (Set (Point block)) -> STM m MaxSlotNo
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$>
TestFetchedBlockHeap m block -> STM m (Set (Point block))
forall (m :: * -> *) block.
TestFetchedBlockHeap m block -> STM m (Set (Point block))
getTestFetchedBlocks TestFetchedBlockHeap m block
blockHeap,
mkAddFetchedBlock :: STM m (Point block -> block -> m ())
mkAddFetchedBlock = (Point block -> block -> m ())
-> STM m (Point block -> block -> m ())
forall a. a -> STM m a
forall (f :: * -> *) a. Applicative f => a -> f a
pure ((Point block -> block -> m ())
-> STM m (Point block -> block -> m ()))
-> (Point block -> block -> m ())
-> STM m (Point block -> block -> m ())
forall a b. (a -> b) -> a -> b
$ TestFetchedBlockHeap m block -> Point block -> block -> m ()
forall (m :: * -> *) block.
TestFetchedBlockHeap m block -> Point block -> block -> m ()
addTestFetchedBlock TestFetchedBlockHeap m block
blockHeap,
HasCallStack =>
AnchoredFragment header -> AnchoredFragment header -> Bool
AnchoredFragment header -> AnchoredFragment header -> Bool
forall {block} {block}.
(HasHeader block, HasHeader block) =>
AnchoredFragment block -> AnchoredFragment block -> Bool
plausibleCandidateChain :: forall {block} {block}.
(HasHeader block, HasHeader block) =>
AnchoredFragment block -> AnchoredFragment block -> Bool
plausibleCandidateChain :: HasCallStack =>
AnchoredFragment header -> AnchoredFragment header -> Bool
plausibleCandidateChain,
HasCallStack =>
AnchoredFragment header -> AnchoredFragment header -> Ordering
AnchoredFragment header -> AnchoredFragment header -> Ordering
forall {block} {block}.
(HasHeader block, HasHeader block) =>
AnchoredFragment block -> AnchoredFragment block -> Ordering
compareCandidateChains :: forall {block} {block}.
(HasHeader block, HasHeader block) =>
AnchoredFragment block -> AnchoredFragment block -> Ordering
compareCandidateChains :: HasCallStack =>
AnchoredFragment header -> AnchoredFragment header -> Ordering
compareCandidateChains,
blockFetchSize :: header -> SizeInBytes
blockFetchSize = \header
_ -> SizeInBytes
2000,
blockMatchesHeader :: header -> block -> Bool
blockMatchesHeader = \header
_ block
_ -> Bool
True,
headerForgeUTCTime :: FromConsensus header -> STM m UTCTime
headerForgeUTCTime = FromConsensus header -> STM m UTCTime
forall x. HasHeader x => FromConsensus x -> STM m UTCTime
headerFieldsForgeUTCTime,
readChainSelStarvation :: STM m ChainSelStarvation
readChainSelStarvation = ChainSelStarvation -> STM m ChainSelStarvation
forall a. a -> STM m a
forall (f :: * -> *) a. Applicative f => a -> f a
pure (Time -> ChainSelStarvation
ChainSelStarvationEndedAt (DiffTime -> Time
Time DiffTime
0)),
demoteChainSyncJumpingDynamo :: peer -> m ()
demoteChainSyncJumpingDynamo = \peer
_ -> () -> m ()
forall a. a -> m a
forall (f :: * -> *) a. Applicative f => a -> f a
pure ()
}
where
plausibleCandidateChain :: AnchoredFragment block -> AnchoredFragment block -> Bool
plausibleCandidateChain AnchoredFragment block
cur AnchoredFragment block
candidate =
AnchoredFragment block -> WithOrigin BlockNo
forall block.
HasHeader block =>
AnchoredFragment block -> WithOrigin BlockNo
AnchoredFragment.headBlockNo AnchoredFragment block
candidate WithOrigin BlockNo -> WithOrigin BlockNo -> Bool
forall a. Ord a => a -> a -> Bool
> AnchoredFragment block -> WithOrigin BlockNo
forall block.
HasHeader block =>
AnchoredFragment block -> WithOrigin BlockNo
AnchoredFragment.headBlockNo AnchoredFragment block
cur
compareCandidateChains :: AnchoredFragment block -> AnchoredFragment block -> Ordering
compareCandidateChains AnchoredFragment block
c1 AnchoredFragment block
c2 =
AnchoredFragment block -> WithOrigin BlockNo
forall block.
HasHeader block =>
AnchoredFragment block -> WithOrigin BlockNo
AnchoredFragment.headBlockNo AnchoredFragment block
c1 WithOrigin BlockNo -> WithOrigin BlockNo -> Ordering
forall a. Ord a => a -> a -> Ordering
`compare` AnchoredFragment block -> WithOrigin BlockNo
forall block.
HasHeader block =>
AnchoredFragment block -> WithOrigin BlockNo
AnchoredFragment.headBlockNo AnchoredFragment block
c2
exampleFixedPeerGSVs :: PeerGSV
exampleFixedPeerGSVs :: PeerGSV
exampleFixedPeerGSVs =
PeerGSV{Time
sampleTime :: Time
sampleTime :: Time
sampleTime, GSV
outboundGSV :: GSV
outboundGSV :: GSV
outboundGSV, GSV
inboundGSV :: GSV
inboundGSV :: GSV
inboundGSV}
where
inboundGSV :: GSV
inboundGSV = DiffTime -> DiffTime -> Distribution DiffTime -> GSV
ballisticGSV DiffTime
10e-3 DiffTime
10e-6 (DiffTime -> Distribution DiffTime
forall n. n -> Distribution n
degenerateDistribution DiffTime
0)
outboundGSV :: GSV
outboundGSV = GSV
inboundGSV
sampleTime :: Time
sampleTime = DiffTime -> Time
Time DiffTime
0
runFetchClient :: (MonadAsync m, MonadFork m, MonadMask m, MonadThrow (STM m),
MonadST m, MonadTime m, MonadTimer m, Ord peerid, Serialise
block, Serialise point, ShowProxy block)
=> Tracer m (TraceSendRecv (BlockFetch block point))
-> version
-> FetchClientRegistry peerid header block m
-> peerid
-> Channel m LBS.ByteString
-> ( FetchClientContext header block m
-> ClientPipelined (BlockFetch block point) BFIdle m a)
-> m a
runFetchClient :: forall (m :: * -> *) peerid block point version header a.
(MonadAsync m, MonadFork m, MonadMask m, MonadThrow (STM m),
MonadST m, MonadTime m, MonadTimer m, Ord peerid, Serialise block,
Serialise point, ShowProxy block) =>
Tracer m (TraceSendRecv (BlockFetch block point))
-> version
-> FetchClientRegistry peerid header block m
-> peerid
-> Channel m ByteString
-> (FetchClientContext header block m
-> ClientPipelined (BlockFetch block point) 'BFIdle m a)
-> m a
runFetchClient Tracer m (TraceSendRecv (BlockFetch block point))
tracer version
version FetchClientRegistry peerid header block m
registry peerid
peerid Channel m ByteString
channel FetchClientContext header block m
-> ClientPipelined (BlockFetch block point) 'BFIdle m a
client =
FetchClientRegistry peerid header block m
-> version
-> peerid
-> (FetchClientContext header block m -> m a)
-> m a
forall (m :: * -> *) a peer header block version.
(MonadFork m, MonadMask m, MonadTimer m, Ord peer) =>
FetchClientRegistry peer header block m
-> version
-> peer
-> (FetchClientContext header block m -> m a)
-> m a
bracketFetchClient FetchClientRegistry peerid header block m
registry version
version peerid
peerid ((FetchClientContext header block m -> m a) -> m a)
-> (FetchClientContext header block m -> m a) -> m a
forall a b. (a -> b) -> a -> b
$ \FetchClientContext header block m
clientCtx ->
(a, Maybe ByteString) -> a
forall a b. (a, b) -> a
fst ((a, Maybe ByteString) -> a) -> m (a, Maybe ByteString) -> m a
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$>
Tracer m (TraceSendRecv (BlockFetch block point))
-> Codec (BlockFetch block point) DeserialiseFailure m ByteString
-> ProtocolSizeLimits (BlockFetch block point) ByteString
-> ProtocolTimeLimits (BlockFetch block point)
-> Channel m ByteString
-> ClientPipelined (BlockFetch block point) 'BFIdle m a
-> m (a, Maybe ByteString)
forall ps (st :: ps) (pr :: PeerRole) failure bytes (m :: * -> *)
a.
(MonadAsync m, MonadFork m, MonadMask m, MonadTimer m,
MonadThrow (STM m), ShowProxy ps,
forall (st' :: ps) stok. (stok ~ StateToken st') => Show stok,
Show failure) =>
Tracer m (TraceSendRecv ps)
-> Codec ps failure m bytes
-> ProtocolSizeLimits ps bytes
-> ProtocolTimeLimits ps
-> Channel m bytes
-> PeerPipelined ps pr st m a
-> m (a, Maybe bytes)
runPipelinedPeerWithLimits Tracer m (TraceSendRecv (BlockFetch block point))
tracer Codec (BlockFetch block point) DeserialiseFailure m ByteString
codec ((ByteString -> Word)
-> ProtocolSizeLimits (BlockFetch block point) ByteString
forall {k} {k1} bytes (block :: k) (point :: k1).
(bytes -> Word)
-> ProtocolSizeLimits (BlockFetch block point) bytes
byteLimitsBlockFetch (Int64 -> Word
forall a b. (Integral a, Num b) => a -> b
fromIntegral (Int64 -> Word) -> (ByteString -> Int64) -> ByteString -> Word
forall b c a. (b -> c) -> (a -> b) -> a -> c
. ByteString -> Int64
LBS.length))
ProtocolTimeLimits (BlockFetch block point)
forall {k} {k1} (block :: k) (point :: k1).
ProtocolTimeLimits (BlockFetch block point)
timeLimitsBlockFetch Channel m ByteString
channel (FetchClientContext header block m
-> ClientPipelined (BlockFetch block point) 'BFIdle m a
client FetchClientContext header block m
clientCtx)
where
codec :: Codec (BlockFetch block point) DeserialiseFailure m ByteString
codec = (block -> Encoding)
-> (forall s. Decoder s block)
-> (point -> Encoding)
-> (forall s. Decoder s point)
-> Codec (BlockFetch block point) DeserialiseFailure m ByteString
forall block point (m :: * -> *).
MonadST m =>
(block -> Encoding)
-> (forall s. Decoder s block)
-> (point -> Encoding)
-> (forall s. Decoder s point)
-> Codec (BlockFetch block point) DeserialiseFailure m ByteString
codecBlockFetch block -> Encoding
forall a. Serialise a => a -> Encoding
encode Decoder s block
forall s. Decoder s block
forall a s. Serialise a => Decoder s a
decode point -> Encoding
forall a. Serialise a => a -> Encoding
encode Decoder s point
forall s. Decoder s point
forall a s. Serialise a => Decoder s a
decode
runFetchServer :: (MonadAsync m, MonadFork m, MonadMask m, MonadThrow (STM m),
MonadST m, MonadTime m, MonadTimer m,
Serialise block, Serialise point,
ShowProxy block)
=> Tracer m (TraceSendRecv (BlockFetch block point))
-> Channel m LBS.ByteString
-> BlockFetchServer block point m a
-> m a
runFetchServer :: forall (m :: * -> *) block point a.
(MonadAsync m, MonadFork m, MonadMask m, MonadThrow (STM m),
MonadST m, MonadTime m, MonadTimer m, Serialise block,
Serialise point, ShowProxy block) =>
Tracer m (TraceSendRecv (BlockFetch block point))
-> Channel m ByteString -> BlockFetchServer block point m a -> m a
runFetchServer Tracer m (TraceSendRecv (BlockFetch block point))
tracer Channel m ByteString
channel BlockFetchServer block point m a
server =
(a, Maybe ByteString) -> a
forall a b. (a, b) -> a
fst ((a, Maybe ByteString) -> a) -> m (a, Maybe ByteString) -> m a
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$>
Tracer m (TraceSendRecv (BlockFetch block point))
-> Codec (BlockFetch block point) DeserialiseFailure m ByteString
-> ProtocolSizeLimits (BlockFetch block point) ByteString
-> ProtocolTimeLimits (BlockFetch block point)
-> Channel m ByteString
-> Peer
(BlockFetch block point) 'AsServer 'NonPipelined 'BFIdle m a
-> m (a, Maybe ByteString)
forall ps (st :: ps) (pr :: PeerRole) failure bytes (m :: * -> *)
a.
(MonadAsync m, MonadFork m, MonadMask m, MonadThrow (STM m),
MonadTimer m, ShowProxy ps,
forall (st' :: ps) stok. (stok ~ StateToken st') => Show stok,
Show failure) =>
Tracer m (TraceSendRecv ps)
-> Codec ps failure m bytes
-> ProtocolSizeLimits ps bytes
-> ProtocolTimeLimits ps
-> Channel m bytes
-> Peer ps pr 'NonPipelined st m a
-> m (a, Maybe bytes)
runPeerWithLimits Tracer m (TraceSendRecv (BlockFetch block point))
tracer Codec (BlockFetch block point) DeserialiseFailure m ByteString
codec ((ByteString -> Word)
-> ProtocolSizeLimits (BlockFetch block point) ByteString
forall {k} {k1} bytes (block :: k) (point :: k1).
(bytes -> Word)
-> ProtocolSizeLimits (BlockFetch block point) bytes
byteLimitsBlockFetch (Int64 -> Word
forall a b. (Integral a, Num b) => a -> b
fromIntegral (Int64 -> Word) -> (ByteString -> Int64) -> ByteString -> Word
forall b c a. (b -> c) -> (a -> b) -> a -> c
. ByteString -> Int64
LBS.length))
ProtocolTimeLimits (BlockFetch block point)
forall {k} {k1} (block :: k) (point :: k1).
ProtocolTimeLimits (BlockFetch block point)
timeLimitsBlockFetch Channel m ByteString
channel (BlockFetchServer block point m a
-> Peer
(BlockFetch block point) 'AsServer 'NonPipelined 'BFIdle m a
forall block point (m :: * -> *) a.
Functor m =>
BlockFetchServer block point m a
-> Server (BlockFetch block point) 'NonPipelined 'BFIdle m a
blockFetchServerPeer BlockFetchServer block point m a
server)
where
codec :: Codec (BlockFetch block point) DeserialiseFailure m ByteString
codec = (block -> Encoding)
-> (forall s. Decoder s block)
-> (point -> Encoding)
-> (forall s. Decoder s point)
-> Codec (BlockFetch block point) DeserialiseFailure m ByteString
forall block point (m :: * -> *).
MonadST m =>
(block -> Encoding)
-> (forall s. Decoder s block)
-> (point -> Encoding)
-> (forall s. Decoder s point)
-> Codec (BlockFetch block point) DeserialiseFailure m ByteString
codecBlockFetch block -> Encoding
forall a. Serialise a => a -> Encoding
encode Decoder s block
forall s. Decoder s block
forall a s. Serialise a => Decoder s a
decode point -> Encoding
forall a. Serialise a => a -> Encoding
encode Decoder s point
forall s. Decoder s point
forall a s. Serialise a => Decoder s a
decode
runFetchClientAndServerAsync
:: forall peerid block header version m a b.
(MonadAsync m, MonadDelay m, MonadFork m, MonadMask m,
MonadThrow (STM m), MonadST m, MonadTime m, MonadTimer m,
Ord peerid, Show peerid,
Serialise block, Serialise (HeaderHash block),
ShowProxy block)
=> Tracer m (TraceSendRecv (BlockFetch block (Point block)))
-> Tracer m (TraceSendRecv (BlockFetch block (Point block)))
-> version
-> Maybe DiffTime
-> Maybe DiffTime
-> FetchClientRegistry peerid header block m
-> peerid
-> ( FetchClientContext header block m
-> ClientPipelined (BlockFetch block (Point block)) BFIdle m a)
-> BlockFetchServer block (Point block) m b
-> m (Async m a, Async m b, Async m (), Async m ())
runFetchClientAndServerAsync :: forall peerid block header version (m :: * -> *) a b.
(MonadAsync m, MonadDelay m, MonadFork m, MonadMask m,
MonadThrow (STM m), MonadST m, MonadTime m, MonadTimer m,
Ord peerid, Show peerid, Serialise block,
Serialise (HeaderHash block), ShowProxy block) =>
Tracer m (TraceSendRecv (BlockFetch block (Point block)))
-> Tracer m (TraceSendRecv (BlockFetch block (Point block)))
-> version
-> Maybe DiffTime
-> Maybe DiffTime
-> FetchClientRegistry peerid header block m
-> peerid
-> (FetchClientContext header block m
-> ClientPipelined (BlockFetch block (Point block)) 'BFIdle m a)
-> BlockFetchServer block (Point block) m b
-> m (Async m a, Async m b, Async m (), Async m ())
runFetchClientAndServerAsync Tracer m (TraceSendRecv (BlockFetch block (Point block)))
clientTracer Tracer m (TraceSendRecv (BlockFetch block (Point block)))
serverTracer
version
version
Maybe DiffTime
clientDelay Maybe DiffTime
serverDelay
FetchClientRegistry peerid header block m
registry peerid
peerid FetchClientContext header block m
-> ClientPipelined (BlockFetch block (Point block)) 'BFIdle m a
client BlockFetchServer block (Point block) m b
server = do
(clientChannel, serverChannel) <- m (Channel m ByteString, Channel m ByteString)
forall (m :: * -> *) a. MonadSTM m => m (Channel m a, Channel m a)
createConnectedChannels
clientAsync <- async $ do
threadId <- myThreadId
labelThread threadId ("block-fetch-client-" ++ show peerid)
runFetchClient
clientTracer
version
registry peerid
(fromMaybe id (delayChannel <$> clientDelay) clientChannel)
client
serverAsync <- async $ do
threadId <- myThreadId
labelThread threadId ("block-fetch-server-" ++ show peerid)
runFetchServer
serverTracer
(fromMaybe id (delayChannel <$> serverDelay) serverChannel)
server
syncClientAsync <- async $ do
threadId <- myThreadId
labelThread threadId ("registry-" ++ show peerid)
bracketSyncWithFetchClient
registry peerid
(forever (threadDelay 1000) >> return ())
keepAliveAsync <- async $ do
threadId <- myThreadId
labelThread threadId ("keep-alive-" ++ show peerid)
bracketKeepAliveClient
registry peerid
(\StrictTVar m (Map peerid PeerGSV)
_ -> m () -> m Any
forall (f :: * -> *) a b. Applicative f => f a -> f b
forever (DiffTime -> m ()
forall (m :: * -> *). MonadDelay m => DiffTime -> m ()
threadDelay DiffTime
1000) m Any -> m () -> m ()
forall a b. m a -> m b -> m b
forall (m :: * -> *) a b. Monad m => m a -> m b -> m b
>> () -> m ()
forall a. a -> m a
forall (m :: * -> *) a. Monad m => a -> m a
return ())
return (clientAsync, serverAsync, syncClientAsync, keepAliveAsync)
mockBlockFetchServer1 :: forall block m.
(MonadSTM m, HasHeader block)
=> AnchoredFragment block
-> BlockFetchServer block (Point block) m ()
mockBlockFetchServer1 :: forall block (m :: * -> *).
(MonadSTM m, HasHeader block) =>
AnchoredFragment block -> BlockFetchServer block (Point block) m ()
mockBlockFetchServer1 AnchoredFragment block
chain =
BlockFetchServer block (Point block) m ()
senderSide
where
senderSide :: BlockFetchServer block (Point block) m ()
senderSide :: BlockFetchServer block (Point block) m ()
senderSide = (ChainRange (Point block)
-> m (BlockFetchBlockSender block (Point block) m ()))
-> () -> BlockFetchServer block (Point block) m ()
forall point (m :: * -> *) block a.
(ChainRange point -> m (BlockFetchBlockSender block point m a))
-> a -> BlockFetchServer block point m a
BlockFetchServer ChainRange (Point block)
-> m (BlockFetchBlockSender block (Point block) m ())
receiveReq ()
receiveReq :: ChainRange (Point block)
-> m (BlockFetchBlockSender block (Point block) m ())
receiveReq :: ChainRange (Point block)
-> m (BlockFetchBlockSender block (Point block) m ())
receiveReq (ChainRange Point block
lpoint Point block
upoint) =
Bool
-> m (BlockFetchBlockSender block (Point block) m ())
-> m (BlockFetchBlockSender block (Point block) m ())
forall a. HasCallStack => Bool -> a -> a
assert (Point block -> WithOrigin SlotNo
forall {k} (block :: k). Point block -> WithOrigin SlotNo
pointSlot Point block
lpoint WithOrigin SlotNo -> WithOrigin SlotNo -> Bool
forall a. Ord a => a -> a -> Bool
<= Point block -> WithOrigin SlotNo
forall {k} (block :: k). Point block -> WithOrigin SlotNo
pointSlot Point block
upoint) (m (BlockFetchBlockSender block (Point block) m ())
-> m (BlockFetchBlockSender block (Point block) m ()))
-> m (BlockFetchBlockSender block (Point block) m ())
-> m (BlockFetchBlockSender block (Point block) m ())
forall a b. (a -> b) -> a -> b
$
case AnchoredFragment block
-> Point block -> Point block -> Maybe (AnchoredFragment block)
forall block.
HasHeader block =>
AnchoredFragment block
-> Point block -> Point block -> Maybe (AnchoredFragment block)
AnchoredFragment.sliceRange AnchoredFragment block
chain Point block
lpoint Point block
upoint of
Maybe (AnchoredFragment block)
Nothing -> BlockFetchBlockSender block (Point block) m ()
-> m (BlockFetchBlockSender block (Point block) m ())
forall a. a -> m a
forall (m :: * -> *) a. Monad m => a -> m a
return (BlockFetchBlockSender block (Point block) m ()
-> m (BlockFetchBlockSender block (Point block) m ()))
-> BlockFetchBlockSender block (Point block) m ()
-> m (BlockFetchBlockSender block (Point block) m ())
forall a b. (a -> b) -> a -> b
$ m (BlockFetchServer block (Point block) m ())
-> BlockFetchBlockSender block (Point block) m ()
forall (m :: * -> *) block point a.
m (BlockFetchServer block point m a)
-> BlockFetchBlockSender block point m a
SendMsgNoBlocks (BlockFetchServer block (Point block) m ()
-> m (BlockFetchServer block (Point block) m ())
forall a. a -> m a
forall (m :: * -> *) a. Monad m => a -> m a
return BlockFetchServer block (Point block) m ()
senderSide)
Just AnchoredFragment block
chain' -> BlockFetchBlockSender block (Point block) m ()
-> m (BlockFetchBlockSender block (Point block) m ())
forall a. a -> m a
forall (m :: * -> *) a. Monad m => a -> m a
return (BlockFetchBlockSender block (Point block) m ()
-> m (BlockFetchBlockSender block (Point block) m ()))
-> BlockFetchBlockSender block (Point block) m ()
-> m (BlockFetchBlockSender block (Point block) m ())
forall a b. (a -> b) -> a -> b
$ m (BlockFetchSendBlocks block (Point block) m ())
-> BlockFetchBlockSender block (Point block) m ()
forall (m :: * -> *) block point a.
m (BlockFetchSendBlocks block point m a)
-> BlockFetchBlockSender block point m a
SendMsgStartBatch ([block] -> m (BlockFetchSendBlocks block (Point block) m ())
sendBlocks [block]
blocks)
where blocks :: [block]
blocks = AnchoredFragment block -> [block]
forall v a b. AnchoredSeq v a b -> [b]
AnchoredFragment.toOldestFirst AnchoredFragment block
chain'
sendBlocks :: [block] -> m (BlockFetchSendBlocks block (Point block) m ())
sendBlocks :: [block] -> m (BlockFetchSendBlocks block (Point block) m ())
sendBlocks [] = BlockFetchSendBlocks block (Point block) m ()
-> m (BlockFetchSendBlocks block (Point block) m ())
forall a. a -> m a
forall (m :: * -> *) a. Monad m => a -> m a
return (BlockFetchSendBlocks block (Point block) m ()
-> m (BlockFetchSendBlocks block (Point block) m ()))
-> BlockFetchSendBlocks block (Point block) m ()
-> m (BlockFetchSendBlocks block (Point block) m ())
forall a b. (a -> b) -> a -> b
$ m (BlockFetchServer block (Point block) m ())
-> BlockFetchSendBlocks block (Point block) m ()
forall (m :: * -> *) block point a.
m (BlockFetchServer block point m a)
-> BlockFetchSendBlocks block point m a
SendMsgBatchDone (BlockFetchServer block (Point block) m ()
-> m (BlockFetchServer block (Point block) m ())
forall a. a -> m a
forall (m :: * -> *) a. Monad m => a -> m a
return BlockFetchServer block (Point block) m ()
senderSide)
sendBlocks (block
b:[block]
bs) = BlockFetchSendBlocks block (Point block) m ()
-> m (BlockFetchSendBlocks block (Point block) m ())
forall a. a -> m a
forall (m :: * -> *) a. Monad m => a -> m a
return (BlockFetchSendBlocks block (Point block) m ()
-> m (BlockFetchSendBlocks block (Point block) m ()))
-> BlockFetchSendBlocks block (Point block) m ()
-> m (BlockFetchSendBlocks block (Point block) m ())
forall a b. (a -> b) -> a -> b
$ block
-> m (BlockFetchSendBlocks block (Point block) m ())
-> BlockFetchSendBlocks block (Point block) m ()
forall block (m :: * -> *) point a.
block
-> m (BlockFetchSendBlocks block point m a)
-> BlockFetchSendBlocks block point m a
SendMsgBlock block
b ([block] -> m (BlockFetchSendBlocks block (Point block) m ())
sendBlocks [block]
bs)
data TestFetchedBlockHeap m block = TestFetchedBlockHeap {
forall (m :: * -> *) block.
TestFetchedBlockHeap m block -> STM m (Set (Point block))
getTestFetchedBlocks :: STM m (Set (Point block)),
forall (m :: * -> *) block.
TestFetchedBlockHeap m block -> Point block -> block -> m ()
addTestFetchedBlock :: Point block -> block -> m ()
}
mkTestFetchedBlockHeap :: (MonadSTM m, Ord (Point block))
=> [Point block]
-> m (TestFetchedBlockHeap m block)
mkTestFetchedBlockHeap :: forall (m :: * -> *) block.
(MonadSTM m, Ord (Point block)) =>
[Point block] -> m (TestFetchedBlockHeap m block)
mkTestFetchedBlockHeap [Point block]
points = do
v <- Set (Point block) -> m (StrictTVar m (Set (Point block)))
forall (m :: * -> *) a. MonadSTM m => a -> m (StrictTVar m a)
newTVarIO ([Point block] -> Set (Point block)
forall a. Ord a => [a] -> Set a
Set.fromList [Point block]
points)
return TestFetchedBlockHeap {
getTestFetchedBlocks = readTVar v,
addTestFetchedBlock = \Point block
p block
_b -> STM m () -> m ()
forall a. HasCallStack => STM m a -> m a
forall (m :: * -> *) a.
(MonadSTM m, HasCallStack) =>
STM m a -> m a
atomically (StrictTVar m (Set (Point block))
-> (Set (Point block) -> Set (Point block)) -> STM m ()
forall (m :: * -> *) a.
MonadSTM m =>
StrictTVar m a -> (a -> a) -> STM m ()
modifyTVar StrictTVar m (Set (Point block))
v (Point block -> Set (Point block) -> Set (Point block)
forall a. Ord a => a -> Set a -> Set a
Set.insert Point block
p))
}