Safe Haskell | None |
---|---|
Language | Haskell2010 |
Let's start with the big picture...
Key: ┏━━━━━━━━━━━━┓ ╔═════════════╗ ┏━━━━━━━━━━━━━━┓ ╔════════════╗ ┃ STM-based ┃ ║active thread║ ┃state instance┃┓ ║ one thread ║╗ ┃shared state┃ ║ ║ ┃ per peer ┃┃ ║ per peer ║║ ┗━━━━━━━━━━━━┛ ╚═════════════╝ ┗━━━━━━━━━━━━━━┛┃ ╚════════════╝║ ┗━━━━━━━━━━━━━━┛ ╚════════════╝
╔═════════════╗ ┏━━━━━━━━━━━━━┓ ║ Chain sync ║╗ ┃ Ledger ┃ ║ protocol ║║◀───┨ state ┃◀───────────╮ ║(client side)║║ ┃ ┃ │ ╚══════╤══════╝║ ┗━━━━━━━━━━━━━┛ │ ╚═════╪═══════╝ │ ▼ │ ┏━━━━━━━━━━━━━┓ ┏━━━━━━━━━━━━━┓ ╔══════╧══════╗ ┃ Candidate ┃ ┃ Set of ┃ ║ Chain and ║ ┃ chains ┃ ┃ downloaded ┠────▶║ ledger ║ ┃ (headers) ┃ ┃ blocks ┃ ║ validation ║ ┗━━━━━┯━━━━━━━┛ ┗━━━━━┯━━━━━━━┛ ╚══════╤══════╝ │ │ ▲ │ │ ╭─────────────────╯ │ │ ░░░░░░░░▼░▼░░░░░░░░ │ ▼ ░░╔═════════════╗░░ │ ┏━━━━━━━━━━━━━┓ ╔═════════════╗ ░░║ Block ║░░ │ ┃ Current ┃ ║ Block fetch ║╗ ░░╢ fetch ║◀────────────┼───────────┨ chain ┠────▶║ protocol ║║ ░░║ logic ║░░ │ ┃ (blocks) ┃ ║(server side)║║ ░░╚═════════════╝░░ │ ┠─────────────┨ ╚═════════════╝║ ░░░░░░░░░▲░░░░░░░░░ │ ┃ Tentative ┃ ╚═════════════╝ ░░░░░░░░░▼░░░░░░░░░░░░░░░░░░░░│░░░░░░░░ ┃ chain ┠──╮ ░░┏━━━━━━━━━━━━━┓░░░░░╔═══════╧═════╗░░ ┃ (headers) ┃ │ ╔═════════════╗ ░░┃ Block fetch ┃┓░░░░║ block fetch ║╗░ ┗━━━━━━━━━━━━━┛ │ ║ Chain sync ║╗ ░░┃ state and ┃┃◀──▶║ protocol ║║░ ╰─▶║ protocol ║║ ░░┃ requests ┃┃░░░░║(client side)║║░ ║(server side)║║ ░░┗━━━━━━━━━━━━━┛┃░░░░╚═════════════╝║░ ╚═════════════╝║ ░░░┗━━━━━━━━━━━━━┛░░░░░╚═════════════╝░ ╚═════════════╝ ░░░░░░░░░░░░░░░░░░░░░░░░░░░░░░░░░░░░░░░
Notes:
- Thread communication is via STM based state.
- Outbound: threads update STM state.
- Inbound: threads wait on STM state changing (using retry).
- These are no queues: there is only the current state, not all change events.
We consider the block fetch logic and the policy for the block fetch protocol client together as one unit of functionality. This is the shaded area in the diagram.
Looking at the diagram we see that these two threads interact with each other and other threads via the following shared state
State | Interactions | Internal/External |
---|---|---|
Candidate chains (headers) | Read | External |
Current chain (blocks) | Read | External |
Set of downloaded blocks | Read & Write | External |
Block fetch requests | Read & Write | Internal |
The block fetch requests state is private between the block fetch logic and the block fetch protocol client, so it is implemented here.
The other state is managed by the consensus layer and is considered external here. So here we define interfaces for interacting with the external state. These have to be provided when instantiating the block fetch logic.
Synopsis
- blockFetchLogic :: forall addr header block m. (HasHeader header, HasHeader block, HeaderHash header ~ HeaderHash block, MonadDelay m, MonadTimer m, Ord addr, Hashable addr) => Tracer m (TraceDecisionEvent addr header) -> Tracer m (TraceLabelPeer addr (TraceFetchClientState header)) -> BlockFetchConsensusInterface addr header block m -> FetchClientRegistry addr header block m -> BlockFetchConfiguration -> m Void
- data BlockFetchConfiguration = BlockFetchConfiguration {}
- data BlockFetchConsensusInterface peer header block (m :: Type -> Type) = BlockFetchConsensusInterface {
- readCandidateChains :: STM m (Map peer (AnchoredFragment header))
- readCurrentChain :: STM m (AnchoredFragment header)
- readFetchMode :: STM m FetchMode
- readFetchedBlocks :: STM m (Point block -> Bool)
- mkAddFetchedBlock :: STM m (Point block -> block -> m ())
- readFetchedMaxSlotNo :: STM m MaxSlotNo
- plausibleCandidateChain :: HasCallStack => AnchoredFragment header -> AnchoredFragment header -> Bool
- compareCandidateChains :: HasCallStack => AnchoredFragment header -> AnchoredFragment header -> Ordering
- blockFetchSize :: header -> SizeInBytes
- blockMatchesHeader :: header -> block -> Bool
- headerForgeUTCTime :: FromConsensus header -> STM m UTCTime
- readChainSelStarvation :: STM m ChainSelStarvation
- demoteChainSyncJumpingDynamo :: peer -> m ()
- data GenesisBlockFetchConfiguration = GenesisBlockFetchConfiguration {}
- type FetchDecision result = Either FetchDecline result
- data TraceFetchClientState header
- = AddedFetchRequest (FetchRequest header) (PeerFetchInFlight header) PeerFetchInFlightLimits (PeerFetchStatus header)
- | AcknowledgedFetchRequest (FetchRequest header)
- | SendFetchRequest (AnchoredFragment header) PeerGSV
- | StartedFetchBatch (ChainRange (Point header)) (PeerFetchInFlight header) PeerFetchInFlightLimits (PeerFetchStatus header)
- | CompletedBlockFetch (Point header) (PeerFetchInFlight header) PeerFetchInFlightLimits (PeerFetchStatus header) NominalDiffTime SizeInBytes
- | CompletedFetchBatch (ChainRange (Point header)) (PeerFetchInFlight header) PeerFetchInFlightLimits (PeerFetchStatus header)
- | RejectedFetchBatch (ChainRange (Point header)) (PeerFetchInFlight header) PeerFetchInFlightLimits (PeerFetchStatus header)
- | ClientTerminating Int
- data TraceLabelPeer peerid a = TraceLabelPeer peerid a
- data FetchClientRegistry peer header block (m :: Type -> Type)
- newFetchClientRegistry :: MonadSTM m => m (FetchClientRegistry peer header block m)
- bracketFetchClient :: forall m a peer header block version. (MonadFork m, MonadMask m, MonadTimer m, Ord peer) => FetchClientRegistry peer header block m -> version -> peer -> (FetchClientContext header block m -> m a) -> m a
- bracketSyncWithFetchClient :: forall m a peer header block. (MonadSTM m, MonadFork m, MonadCatch m, Ord peer) => FetchClientRegistry peer header block m -> peer -> m a -> m a
- bracketKeepAliveClient :: forall m a peer header block. (MonadSTM m, MonadFork m, MonadMask m, Ord peer) => FetchClientRegistry peer header block m -> peer -> (StrictTVar m (Map peer PeerGSV) -> m a) -> m a
- data PraosFetchMode
- data FetchMode
- newtype FromConsensus a = FromConsensus {
- unFromConsensus :: a
- data SizeInBytes
Documentation
blockFetchLogic :: forall addr header block m. (HasHeader header, HasHeader block, HeaderHash header ~ HeaderHash block, MonadDelay m, MonadTimer m, Ord addr, Hashable addr) => Tracer m (TraceDecisionEvent addr header) -> Tracer m (TraceLabelPeer addr (TraceFetchClientState header)) -> BlockFetchConsensusInterface addr header block m -> FetchClientRegistry addr header block m -> BlockFetchConfiguration -> m Void Source #
Execute the block fetch logic. It monitors the current chain and candidate chains. It decided which block bodies to fetch and manages the process of fetching them, including making alternative decisions based on timeouts and failures.
This runs forever and should be shut down using mechanisms such as async.
data BlockFetchConfiguration Source #
Configuration for FetchDecisionPolicy. Should be determined by external local node config.
BlockFetchConfiguration | |
|
Instances
Show BlockFetchConfiguration Source # | |
Defined in Ouroboros.Network.BlockFetch showsPrec :: Int -> BlockFetchConfiguration -> ShowS # show :: BlockFetchConfiguration -> String # showList :: [BlockFetchConfiguration] -> ShowS # |
data BlockFetchConsensusInterface peer header block (m :: Type -> Type) #
The consensus layer functionality that the block fetch logic requires.
These are provided as input to the block fetch by the consensus layer.
BlockFetchConsensusInterface | |
|
data GenesisBlockFetchConfiguration Source #
BlockFetch configuration parameters specific to Genesis.
GenesisBlockFetchConfiguration | |
|
Instances
Tracer types
type FetchDecision result = Either FetchDecline result Source #
Throughout the decision making process we accumulate reasons to decline to fetch any blocks. This type is used to wrap intermediate and final results.
data TraceFetchClientState header Source #
Tracing types for the various events that change the state
(i.e. FetchClientStateVars
) for a block fetch client.
Note that while these are all state changes, the AddedFetchRequest
occurs
in the decision thread while the other state changes occur in the block
fetch client threads.
AddedFetchRequest (FetchRequest header) (PeerFetchInFlight header) PeerFetchInFlightLimits (PeerFetchStatus header) | The block fetch decision thread has added a new fetch instruction consisting of one or more individual request ranges. |
AcknowledgedFetchRequest (FetchRequest header) | Mark the point when the fetch client picks up the request added
by the block fetch decision thread. Note that this event can happen
fewer times than the |
SendFetchRequest (AnchoredFragment header) PeerGSV | Mark the point when fetch request for a fragment is actually sent over the wire. |
StartedFetchBatch (ChainRange (Point header)) (PeerFetchInFlight header) PeerFetchInFlightLimits (PeerFetchStatus header) | Mark the start of receiving a streaming batch of blocks. This will
be followed by one or more |
CompletedBlockFetch (Point header) (PeerFetchInFlight header) PeerFetchInFlightLimits (PeerFetchStatus header) NominalDiffTime SizeInBytes | Mark the completion of of receiving a single block within a streaming batch of blocks. |
CompletedFetchBatch (ChainRange (Point header)) (PeerFetchInFlight header) PeerFetchInFlightLimits (PeerFetchStatus header) | Mark the successful end of receiving a streaming batch of blocks |
RejectedFetchBatch (ChainRange (Point header)) (PeerFetchInFlight header) PeerFetchInFlightLimits (PeerFetchStatus header) | If the other peer rejects our request then we have this event
instead of |
ClientTerminating Int | The client is terminating. Log the number of outstanding requests. |
Instances
(StandardHash header, Show header) => Show (TraceFetchClientState header) Source # | |
Defined in Ouroboros.Network.BlockFetch.ClientState showsPrec :: Int -> TraceFetchClientState header -> ShowS # show :: TraceFetchClientState header -> String # showList :: [TraceFetchClientState header] -> ShowS # |
data TraceLabelPeer peerid a #
A peer label for use in Tracer
s. This annotates tracer output as being
associated with a given peer identifier.
TraceLabelPeer peerid a |
Instances
Bifunctor TraceLabelPeer | |
Defined in Network.Mux.Trace bimap :: (a -> b) -> (c -> d) -> TraceLabelPeer a c -> TraceLabelPeer b d # first :: (a -> b) -> TraceLabelPeer a c -> TraceLabelPeer b c # second :: (b -> c) -> TraceLabelPeer a b -> TraceLabelPeer a c # | |
Functor (TraceLabelPeer peerid) | |
Defined in Network.Mux.Trace fmap :: (a -> b) -> TraceLabelPeer peerid a -> TraceLabelPeer peerid b # (<$) :: a -> TraceLabelPeer peerid b -> TraceLabelPeer peerid a # | |
(Show peerid, Show a) => Show (TraceLabelPeer peerid a) | |
Defined in Network.Mux.Trace showsPrec :: Int -> TraceLabelPeer peerid a -> ShowS # show :: TraceLabelPeer peerid a -> String # showList :: [TraceLabelPeer peerid a] -> ShowS # | |
(Eq peerid, Eq a) => Eq (TraceLabelPeer peerid a) | |
Defined in Network.Mux.Trace (==) :: TraceLabelPeer peerid a -> TraceLabelPeer peerid a -> Bool # (/=) :: TraceLabelPeer peerid a -> TraceLabelPeer peerid a -> Bool # |
The FetchClientRegistry
data FetchClientRegistry peer header block (m :: Type -> Type) Source #
A registry for the threads that are executing the client side of the
BlockFetch
protocol to communicate with our peers.
The registry contains the shared variables we use to communicate with these threads, both to track their status and to provide instructions.
The threads add/remove themselves to/from this registry when they start up and shut down.
newFetchClientRegistry :: MonadSTM m => m (FetchClientRegistry peer header block m) Source #
bracketFetchClient :: forall m a peer header block version. (MonadFork m, MonadMask m, MonadTimer m, Ord peer) => FetchClientRegistry peer header block m -> version -> peer -> (FetchClientContext header block m -> m a) -> m a Source #
This is needed to start a block fetch client. It provides the required
FetchClientContext
. It registers and unregisters the fetch client on
start and end.
It also manages synchronisation with the corresponding chain sync client.
bracketSyncWithFetchClient :: forall m a peer header block. (MonadSTM m, MonadFork m, MonadCatch m, Ord peer) => FetchClientRegistry peer header block m -> peer -> m a -> m a Source #
The block fetch and chain sync clients for each peer need to synchronise their startup and shutdown. This bracket operation provides that synchronisation for the chain sync client.
This must be used for the chain sync client outside of its own state registration and deregistration.
bracketKeepAliveClient :: forall m a peer header block. (MonadSTM m, MonadFork m, MonadMask m, Ord peer) => FetchClientRegistry peer header block m -> peer -> (StrictTVar m (Map peer PeerGSV) -> m a) -> m a Source #
Re-export types used by BlockFetchConsensusInterface
data PraosFetchMode #
FetchModeBulkSync | Use this mode when we are catching up on the chain but are stil well behind. In this mode the fetch logic will optimise for throughput rather than latency. |
FetchModeDeadline | Use this mode for block-producing nodes that have a known deadline to produce a block and need to get the best chain before that. In this mode the fetch logic will optimise for picking the best chain within the given deadline. |
Instances
Show PraosFetchMode | |
Defined in Ouroboros.Network.BlockFetch.ConsensusInterface showsPrec :: Int -> PraosFetchMode -> ShowS # show :: PraosFetchMode -> String # showList :: [PraosFetchMode] -> ShowS # | |
Eq PraosFetchMode | |
Defined in Ouroboros.Network.BlockFetch.ConsensusInterface (==) :: PraosFetchMode -> PraosFetchMode -> Bool # (/=) :: PraosFetchMode -> PraosFetchMode -> Bool # |
newtype FromConsensus a #
A new type used to emphasize the precondition of
headerForgeUTCTime
and
blockForgeUTCTime
at each
call site.
At time of writing, the a
is either a header or a block. The headers are
literally from Consensus (ie provided by ChainSync). Blocks, on the other
hand, are indirectly from Consensus: they were fetched only because we
favored the corresponding header that Consensus provided.
Instances
Applicative FromConsensus | |
Defined in Ouroboros.Network.BlockFetch.ConsensusInterface pure :: a -> FromConsensus a # (<*>) :: FromConsensus (a -> b) -> FromConsensus a -> FromConsensus b # liftA2 :: (a -> b -> c) -> FromConsensus a -> FromConsensus b -> FromConsensus c # (*>) :: FromConsensus a -> FromConsensus b -> FromConsensus b # (<*) :: FromConsensus a -> FromConsensus b -> FromConsensus a # | |
Functor FromConsensus | |
Defined in Ouroboros.Network.BlockFetch.ConsensusInterface fmap :: (a -> b) -> FromConsensus a -> FromConsensus b # (<$) :: a -> FromConsensus b -> FromConsensus a # |
data SizeInBytes #
Instances
NFData SizeInBytes | |||||
Defined in Ouroboros.Network.SizeInBytes rnf :: SizeInBytes -> () # | |||||
Monoid SizeInBytes | |||||
Defined in Ouroboros.Network.SizeInBytes mempty :: SizeInBytes # mappend :: SizeInBytes -> SizeInBytes -> SizeInBytes # mconcat :: [SizeInBytes] -> SizeInBytes # | |||||
Semigroup SizeInBytes | |||||
Defined in Ouroboros.Network.SizeInBytes (<>) :: SizeInBytes -> SizeInBytes -> SizeInBytes # sconcat :: NonEmpty SizeInBytes -> SizeInBytes # stimes :: Integral b => b -> SizeInBytes -> SizeInBytes # | |||||
Enum SizeInBytes | |||||
Defined in Ouroboros.Network.SizeInBytes succ :: SizeInBytes -> SizeInBytes # pred :: SizeInBytes -> SizeInBytes # toEnum :: Int -> SizeInBytes # fromEnum :: SizeInBytes -> Int # enumFrom :: SizeInBytes -> [SizeInBytes] # enumFromThen :: SizeInBytes -> SizeInBytes -> [SizeInBytes] # enumFromTo :: SizeInBytes -> SizeInBytes -> [SizeInBytes] # enumFromThenTo :: SizeInBytes -> SizeInBytes -> SizeInBytes -> [SizeInBytes] # | |||||
Generic SizeInBytes | |||||
Defined in Ouroboros.Network.SizeInBytes
from :: SizeInBytes -> Rep SizeInBytes x # to :: Rep SizeInBytes x -> SizeInBytes # | |||||
Num SizeInBytes | |||||
Defined in Ouroboros.Network.SizeInBytes (+) :: SizeInBytes -> SizeInBytes -> SizeInBytes # (-) :: SizeInBytes -> SizeInBytes -> SizeInBytes # (*) :: SizeInBytes -> SizeInBytes -> SizeInBytes # negate :: SizeInBytes -> SizeInBytes # abs :: SizeInBytes -> SizeInBytes # signum :: SizeInBytes -> SizeInBytes # fromInteger :: Integer -> SizeInBytes # | |||||
Integral SizeInBytes | |||||
Defined in Ouroboros.Network.SizeInBytes quot :: SizeInBytes -> SizeInBytes -> SizeInBytes # rem :: SizeInBytes -> SizeInBytes -> SizeInBytes # div :: SizeInBytes -> SizeInBytes -> SizeInBytes # mod :: SizeInBytes -> SizeInBytes -> SizeInBytes # quotRem :: SizeInBytes -> SizeInBytes -> (SizeInBytes, SizeInBytes) # divMod :: SizeInBytes -> SizeInBytes -> (SizeInBytes, SizeInBytes) # toInteger :: SizeInBytes -> Integer # | |||||
Real SizeInBytes | |||||
Defined in Ouroboros.Network.SizeInBytes toRational :: SizeInBytes -> Rational # | |||||
Show SizeInBytes | |||||
Defined in Ouroboros.Network.SizeInBytes showsPrec :: Int -> SizeInBytes -> ShowS # show :: SizeInBytes -> String # showList :: [SizeInBytes] -> ShowS # | |||||
Eq SizeInBytes | |||||
Defined in Ouroboros.Network.SizeInBytes (==) :: SizeInBytes -> SizeInBytes -> Bool # (/=) :: SizeInBytes -> SizeInBytes -> Bool # | |||||
Ord SizeInBytes | |||||
Defined in Ouroboros.Network.SizeInBytes compare :: SizeInBytes -> SizeInBytes -> Ordering # (<) :: SizeInBytes -> SizeInBytes -> Bool # (<=) :: SizeInBytes -> SizeInBytes -> Bool # (>) :: SizeInBytes -> SizeInBytes -> Bool # (>=) :: SizeInBytes -> SizeInBytes -> Bool # max :: SizeInBytes -> SizeInBytes -> SizeInBytes # min :: SizeInBytes -> SizeInBytes -> SizeInBytes # | |||||
BoundedMeasure SizeInBytes | |||||
Defined in Ouroboros.Network.SizeInBytes maxBound :: SizeInBytes # | |||||
Measure SizeInBytes | |||||
Defined in Ouroboros.Network.SizeInBytes zero :: SizeInBytes # plus :: SizeInBytes -> SizeInBytes -> SizeInBytes # min :: SizeInBytes -> SizeInBytes -> SizeInBytes # max :: SizeInBytes -> SizeInBytes -> SizeInBytes # | |||||
NoThunks SizeInBytes | |||||
Defined in Ouroboros.Network.SizeInBytes noThunks :: Context -> SizeInBytes -> IO (Maybe ThunkInfo) # wNoThunks :: Context -> SizeInBytes -> IO (Maybe ThunkInfo) # showTypeOf :: Proxy SizeInBytes -> String # | |||||
type Rep SizeInBytes | |||||
Defined in Ouroboros.Network.SizeInBytes type Rep SizeInBytes = D1 ('MetaData "SizeInBytes" "Ouroboros.Network.SizeInBytes" "ouroboros-network-api-0.12.0.0-inplace" 'True) (C1 ('MetaCons "SizeInBytes" 'PrefixI 'True) (S1 ('MetaSel ('Just "getSizeInBytes") 'NoSourceUnpackedness 'NoSourceStrictness 'DecidedLazy) (Rec0 Word32))) |