{-# LANGUAGE DataKinds #-}
{-# LANGUAGE FlexibleContexts #-}
{-# LANGUAGE GADTs #-}
{-# LANGUAGE NamedFieldPuns #-}
{-# LANGUAGE ScopedTypeVariables #-}
{-# LANGUAGE TupleSections #-}
{-# LANGUAGE TypeOperators #-}
{-# OPTIONS_GHC -Wno-orphans #-}
module Test.Ouroboros.Network.Testnet.Node.MiniProtocols
( Codecs
, cborCodecs
, LimitsAndTimeouts (..)
, AppArgs (..)
, applications
) where
import Control.Applicative (Alternative)
import Control.Concurrent.Class.MonadMVar (MonadMVar)
import Control.Concurrent.Class.MonadSTM qualified as LazySTM
import Control.Concurrent.Class.MonadSTM.Strict
import Control.Monad.Class.MonadAsync
import Control.Monad.Class.MonadFork
import Control.Monad.Class.MonadSay
import Control.Monad.Class.MonadST
import Control.Monad.Class.MonadThrow
import Control.Monad.Class.MonadTime.SI
import Control.Monad.Class.MonadTimer.SI
import Control.Tracer (Tracer (..), contramap, nullTracer)
import Data.ByteString.Lazy (ByteString)
import Data.Functor (($>))
import Data.Maybe (fromMaybe)
import Data.Void (Void)
import System.Random (RandomGen, StdGen)
import Codec.CBOR.Read qualified as CBOR
import Codec.Serialise qualified as Serialise
import Network.TypedProtocol.Codec
import Network.TypedProtocol.PingPong.Client as PingPong
import Network.TypedProtocol.PingPong.Codec.CBOR
import Network.TypedProtocol.PingPong.Examples
import Network.TypedProtocol.PingPong.Server
import Network.TypedProtocol.PingPong.Type
import Ouroboros.Network.BlockFetch
import Ouroboros.Network.BlockFetch.Client
import Ouroboros.Network.Protocol.BlockFetch.Codec
import Ouroboros.Network.Protocol.BlockFetch.Examples
import Ouroboros.Network.Protocol.BlockFetch.Server
import Ouroboros.Network.Protocol.BlockFetch.Type
import Ouroboros.Network.Protocol.ChainSync.Client
import Ouroboros.Network.Protocol.ChainSync.Codec
import Ouroboros.Network.Protocol.ChainSync.Examples
import Ouroboros.Network.Protocol.ChainSync.Server
import Ouroboros.Network.Protocol.ChainSync.Type
import Ouroboros.Network.Protocol.Handshake.Type
import Ouroboros.Network.Protocol.Handshake.Unversioned
import Ouroboros.Network.Protocol.Handshake.Version (simpleSingletonVersions)
import Ouroboros.Network.Protocol.KeepAlive.Client
import Ouroboros.Network.Protocol.KeepAlive.Codec
import Ouroboros.Network.Protocol.KeepAlive.Server
import Ouroboros.Network.Protocol.KeepAlive.Type
import Data.Monoid.Synchronisation
import Ouroboros.Network.Block (HasHeader, HeaderHash, Point)
import Ouroboros.Network.Block qualified as Block
import Ouroboros.Network.Context
import Ouroboros.Network.ControlMessage (ControlMessage (..))
import Ouroboros.Network.Diffusion qualified as Diff (Applications (..))
import Ouroboros.Network.Driver.Limits
import Ouroboros.Network.KeepAlive
import Ouroboros.Network.Mock.Chain qualified as Chain
import Ouroboros.Network.Mock.ProducerState
import Ouroboros.Network.Mux
import Ouroboros.Network.NodeToNode.Version (DiffusionMode (..))
import Ouroboros.Network.Util.ShowProxy
import Ouroboros.Network.Mock.ConcreteBlock
import Network.TypedProtocol
import Network.Mux qualified as Mx
import Pipes qualified
import Ouroboros.Network.NodeToNode (blockFetchMiniProtocolNum,
chainSyncMiniProtocolNum, keepAliveMiniProtocolNum,
peerSharingMiniProtocolNum)
import Ouroboros.Network.PeerSelection.LedgerPeers
import Ouroboros.Network.PeerSelection.LocalRootPeers (OutboundConnectionsState)
import Ouroboros.Network.PeerSelection.PeerSharing qualified as PSTypes
import Ouroboros.Network.PeerSharing (PeerSharingAPI, bracketPeerSharingClient,
peerSharingClient, peerSharingServer)
import Ouroboros.Network.Protocol.PeerSharing.Client (peerSharingClientPeer)
import Ouroboros.Network.Protocol.PeerSharing.Codec (codecPeerSharing)
import Ouroboros.Network.Protocol.PeerSharing.Server (peerSharingServerPeer)
import Ouroboros.Network.Protocol.PeerSharing.Type (PeerSharing)
import Test.Ouroboros.Network.Testnet.Node.Kernel
data Codecs addr header block m = Codecs
{ forall addr header block (m :: * -> *).
Codecs addr header block m
-> Codec
(ChainSync header (Point block) (Tip block))
DeserialiseFailure
m
ByteString
chainSyncCodec :: Codec (ChainSync header (Point block) (Tip block))
CBOR.DeserialiseFailure m ByteString
, forall addr header block (m :: * -> *).
Codecs addr header block m
-> Codec
(BlockFetch block (Point block)) DeserialiseFailure m ByteString
blockFetchCodec :: Codec (BlockFetch block (Point block))
CBOR.DeserialiseFailure m ByteString
, forall addr header block (m :: * -> *).
Codecs addr header block m
-> Codec KeepAlive DeserialiseFailure m ByteString
keepAliveCodec :: Codec KeepAlive
CBOR.DeserialiseFailure m ByteString
, forall addr header block (m :: * -> *).
Codecs addr header block m
-> Codec PingPong DeserialiseFailure m ByteString
pingPongCodec :: Codec PingPong
CBOR.DeserialiseFailure m ByteString
, forall addr header block (m :: * -> *).
Codecs addr header block m
-> Codec (PeerSharing addr) DeserialiseFailure m ByteString
peerSharingCodec :: Codec (PeerSharing addr)
CBOR.DeserialiseFailure m ByteString
}
cborCodecs :: MonadST m => Codecs NtNAddr BlockHeader Block m
cborCodecs :: forall (m :: * -> *).
MonadST m =>
Codecs NtNAddr BlockHeader Block m
cborCodecs = Codecs
{ chainSyncCodec :: Codec
(ChainSync BlockHeader (Point Block) (Tip Block))
DeserialiseFailure
m
ByteString
chainSyncCodec = (BlockHeader -> Encoding)
-> (forall s. Decoder s BlockHeader)
-> (Point Block -> Encoding)
-> (forall s. Decoder s (Point Block))
-> (Tip Block -> Encoding)
-> (forall s. Decoder s (Tip Block))
-> Codec
(ChainSync BlockHeader (Point Block) (Tip Block))
DeserialiseFailure
m
ByteString
forall header point tip (m :: * -> *).
MonadST m =>
(header -> Encoding)
-> (forall s. Decoder s header)
-> (point -> Encoding)
-> (forall s. Decoder s point)
-> (tip -> Encoding)
-> (forall s. Decoder s tip)
-> Codec
(ChainSync header point tip) DeserialiseFailure m ByteString
codecChainSync BlockHeader -> Encoding
forall a. Serialise a => a -> Encoding
Serialise.encode Decoder s BlockHeader
forall s. Decoder s BlockHeader
forall a s. Serialise a => Decoder s a
Serialise.decode
Point Block -> Encoding
forall a. Serialise a => a -> Encoding
Serialise.encode Decoder s (Point Block)
forall s. Decoder s (Point Block)
forall a s. Serialise a => Decoder s a
Serialise.decode
((HeaderHash Block -> Encoding) -> Tip Block -> Encoding
forall {k} (blk :: k).
(HeaderHash blk -> Encoding) -> Tip blk -> Encoding
Block.encodeTip HeaderHash Block -> Encoding
ConcreteHeaderHash -> Encoding
forall a. Serialise a => a -> Encoding
Serialise.encode)
((forall s. Decoder s (HeaderHash Block))
-> forall s. Decoder s (Tip Block)
forall {k} (blk :: k).
(forall s. Decoder s (HeaderHash blk))
-> forall s. Decoder s (Tip blk)
Block.decodeTip Decoder s (HeaderHash Block)
Decoder s ConcreteHeaderHash
forall s. Decoder s (HeaderHash Block)
forall s. Decoder s ConcreteHeaderHash
forall a s. Serialise a => Decoder s a
Serialise.decode)
, blockFetchCodec :: Codec
(BlockFetch Block (Point Block)) DeserialiseFailure m ByteString
blockFetchCodec = (Block -> Encoding)
-> (forall s. Decoder s Block)
-> (Point Block -> Encoding)
-> (forall s. Decoder s (Point Block))
-> Codec
(BlockFetch Block (Point Block)) DeserialiseFailure m ByteString
forall block point (m :: * -> *).
MonadST m =>
(block -> Encoding)
-> (forall s. Decoder s block)
-> (point -> Encoding)
-> (forall s. Decoder s point)
-> Codec (BlockFetch block point) DeserialiseFailure m ByteString
codecBlockFetch Block -> Encoding
forall a. Serialise a => a -> Encoding
Serialise.encode Decoder s Block
forall s. Decoder s Block
forall a s. Serialise a => Decoder s a
Serialise.decode
Point Block -> Encoding
forall a. Serialise a => a -> Encoding
Serialise.encode Decoder s (Point Block)
forall s. Decoder s (Point Block)
forall a s. Serialise a => Decoder s a
Serialise.decode
, keepAliveCodec :: Codec KeepAlive DeserialiseFailure m ByteString
keepAliveCodec = Codec KeepAlive DeserialiseFailure m ByteString
forall (m :: * -> *).
MonadST m =>
Codec KeepAlive DeserialiseFailure m ByteString
codecKeepAlive_v2
, pingPongCodec :: Codec PingPong DeserialiseFailure m ByteString
pingPongCodec = Codec PingPong DeserialiseFailure m ByteString
forall (m :: * -> *).
MonadST m =>
Codec PingPong DeserialiseFailure m ByteString
codecPingPong
, peerSharingCodec :: Codec (PeerSharing NtNAddr) DeserialiseFailure m ByteString
peerSharingCodec = (NtNAddr -> Encoding)
-> (forall s. Decoder s NtNAddr)
-> Codec (PeerSharing NtNAddr) DeserialiseFailure m ByteString
forall (m :: * -> *) peerAddress.
MonadST m =>
(peerAddress -> Encoding)
-> (forall s. Decoder s peerAddress)
-> Codec (PeerSharing peerAddress) DeserialiseFailure m ByteString
codecPeerSharing NtNAddr -> Encoding
encodeNtNAddr Decoder s NtNAddr
forall s. Decoder s NtNAddr
decodeNtNAddr
}
data LimitsAndTimeouts header block = LimitsAndTimeouts
{
forall header block.
LimitsAndTimeouts header block -> MiniProtocolLimits
chainSyncLimits
:: MiniProtocolLimits
, forall header block.
LimitsAndTimeouts header block
-> ProtocolSizeLimits
(ChainSync header (Point block) (Tip block)) ByteString
chainSyncSizeLimits
:: ProtocolSizeLimits (ChainSync header (Point block) (Tip block))
ByteString
, forall header block.
LimitsAndTimeouts header block
-> ProtocolTimeLimits (ChainSync header (Point block) (Tip block))
chainSyncTimeLimits
:: ProtocolTimeLimits (ChainSync header (Point block) (Tip block))
, forall header block.
LimitsAndTimeouts header block -> MiniProtocolLimits
blockFetchLimits
:: MiniProtocolLimits
, forall header block.
LimitsAndTimeouts header block
-> ProtocolSizeLimits (BlockFetch block (Point block)) ByteString
blockFetchSizeLimits
:: ProtocolSizeLimits (BlockFetch block (Point block)) ByteString
, forall header block.
LimitsAndTimeouts header block
-> ProtocolTimeLimits (BlockFetch block (Point block))
blockFetchTimeLimits
:: ProtocolTimeLimits (BlockFetch block (Point block))
, forall header block.
LimitsAndTimeouts header block -> MiniProtocolLimits
keepAliveLimits
:: MiniProtocolLimits
, forall header block.
LimitsAndTimeouts header block
-> ProtocolSizeLimits KeepAlive ByteString
keepAliveSizeLimits
:: ProtocolSizeLimits KeepAlive ByteString
, forall header block.
LimitsAndTimeouts header block -> ProtocolTimeLimits KeepAlive
keepAliveTimeLimits
:: ProtocolTimeLimits KeepAlive
, forall header block.
LimitsAndTimeouts header block -> MiniProtocolLimits
pingPongLimits
:: MiniProtocolLimits
, forall header block.
LimitsAndTimeouts header block
-> ProtocolSizeLimits PingPong ByteString
pingPongSizeLimits
:: ProtocolSizeLimits PingPong ByteString
, forall header block.
LimitsAndTimeouts header block -> ProtocolTimeLimits PingPong
pingPongTimeLimits
:: ProtocolTimeLimits PingPong
, forall header block.
LimitsAndTimeouts header block -> MiniProtocolLimits
handshakeLimits
:: MiniProtocolLimits
, forall header block.
LimitsAndTimeouts header block
-> ProtocolTimeLimits (Handshake NtNVersion NtNVersionData)
handshakeTimeLimits
:: ProtocolTimeLimits (Handshake NtNVersion NtNVersionData)
, forall header block.
LimitsAndTimeouts header block
-> ProtocolSizeLimits
(Handshake NtNVersion NtNVersionData) ByteString
handhsakeSizeLimits
:: ProtocolSizeLimits (Handshake NtNVersion NtNVersionData) ByteString
, forall header block.
LimitsAndTimeouts header block -> MiniProtocolLimits
peerSharingLimits
:: MiniProtocolLimits
, forall header block.
LimitsAndTimeouts header block
-> ProtocolTimeLimits (PeerSharing NtNAddr)
peerSharingTimeLimits
:: ProtocolTimeLimits (PeerSharing NtNAddr)
, forall header block.
LimitsAndTimeouts header block
-> ProtocolSizeLimits (PeerSharing NtNAddr) ByteString
peerSharingSizeLimits
:: ProtocolSizeLimits (PeerSharing NtNAddr) ByteString
}
data AppArgs header block m = AppArgs
{ forall header block (m :: * -> *).
AppArgs header block m -> LedgerPeersConsensusInterface m
aaLedgerPeersConsensusInterface
:: LedgerPeersConsensusInterface m
, forall header block (m :: * -> *). AppArgs header block m -> StdGen
aaKeepAliveStdGen
:: StdGen
, forall header block (m :: * -> *).
AppArgs header block m -> DiffusionMode
aaDiffusionMode
:: DiffusionMode
, forall header block (m :: * -> *).
AppArgs header block m -> DiffTime
aaKeepAliveInterval
:: DiffTime
, forall header block (m :: * -> *).
AppArgs header block m -> DiffTime
aaPingPongInterval
:: DiffTime
, forall header block (m :: * -> *).
AppArgs header block m -> header -> m Bool
aaShouldChainSyncExit :: header -> m Bool
, forall header block (m :: * -> *). AppArgs header block m -> Bool
aaChainSyncEarlyExit :: Bool
, forall header block (m :: * -> *).
AppArgs header block m -> PeerSharing
aaOwnPeerSharing
:: PSTypes.PeerSharing
, forall header block (m :: * -> *).
AppArgs header block m -> OutboundConnectionsState -> STM m ()
aaUpdateOutboundConnectionsState
:: OutboundConnectionsState -> STM m ()
}
applications :: forall block header s m.
( Alternative (STM m)
, MonadAsync m
, MonadFork m
, MonadMask m
, MonadMVar m
, MonadSay m
, MonadThrow m
, MonadTime m
, MonadTimer m
, MonadThrow (STM m)
, HasHeader header
, HasHeader block
, HeaderHash header ~ HeaderHash block
, Show block
, ShowProxy block
, ShowProxy header
, RandomGen s
)
=> Tracer m String
-> NodeKernel header block s m
-> Codecs NtNAddr header block m
-> LimitsAndTimeouts header block
-> AppArgs header block m
-> (block -> header)
-> Diff.Applications NtNAddr NtNVersion NtNVersionData
NtCAddr NtCVersion NtCVersionData
m ()
applications :: forall block header s (m :: * -> *).
(Alternative (STM m), MonadAsync m, MonadFork m, MonadMask m,
MonadMVar m, MonadSay m, MonadThrow m, MonadTime m, MonadTimer m,
MonadThrow (STM m), HasHeader header, HasHeader block,
HeaderHash header ~ HeaderHash block, Show block, ShowProxy block,
ShowProxy header, RandomGen s) =>
Tracer m String
-> NodeKernel header block s m
-> Codecs NtNAddr header block m
-> LimitsAndTimeouts header block
-> AppArgs header block m
-> (block -> header)
-> Applications
NtNAddr
NtNVersion
NtNVersionData
NtCAddr
NtNVersion
NtCVersionData
m
()
applications Tracer m String
debugTracer NodeKernel header block s m
nodeKernel
Codecs { Codec
(ChainSync header (Point block) (Tip block))
DeserialiseFailure
m
ByteString
chainSyncCodec :: forall addr header block (m :: * -> *).
Codecs addr header block m
-> Codec
(ChainSync header (Point block) (Tip block))
DeserialiseFailure
m
ByteString
chainSyncCodec :: Codec
(ChainSync header (Point block) (Tip block))
DeserialiseFailure
m
ByteString
chainSyncCodec, Codec
(BlockFetch block (Point block)) DeserialiseFailure m ByteString
blockFetchCodec :: forall addr header block (m :: * -> *).
Codecs addr header block m
-> Codec
(BlockFetch block (Point block)) DeserialiseFailure m ByteString
blockFetchCodec :: Codec
(BlockFetch block (Point block)) DeserialiseFailure m ByteString
blockFetchCodec
, Codec KeepAlive DeserialiseFailure m ByteString
keepAliveCodec :: forall addr header block (m :: * -> *).
Codecs addr header block m
-> Codec KeepAlive DeserialiseFailure m ByteString
keepAliveCodec :: Codec KeepAlive DeserialiseFailure m ByteString
keepAliveCodec, Codec PingPong DeserialiseFailure m ByteString
pingPongCodec :: forall addr header block (m :: * -> *).
Codecs addr header block m
-> Codec PingPong DeserialiseFailure m ByteString
pingPongCodec :: Codec PingPong DeserialiseFailure m ByteString
pingPongCodec
, Codec (PeerSharing NtNAddr) DeserialiseFailure m ByteString
peerSharingCodec :: forall addr header block (m :: * -> *).
Codecs addr header block m
-> Codec (PeerSharing addr) DeserialiseFailure m ByteString
peerSharingCodec :: Codec (PeerSharing NtNAddr) DeserialiseFailure m ByteString
peerSharingCodec
}
LimitsAndTimeouts header block
limits
AppArgs
{ LedgerPeersConsensusInterface m
aaLedgerPeersConsensusInterface :: forall header block (m :: * -> *).
AppArgs header block m -> LedgerPeersConsensusInterface m
aaLedgerPeersConsensusInterface :: LedgerPeersConsensusInterface m
aaLedgerPeersConsensusInterface
, DiffusionMode
aaDiffusionMode :: forall header block (m :: * -> *).
AppArgs header block m -> DiffusionMode
aaDiffusionMode :: DiffusionMode
aaDiffusionMode
, StdGen
aaKeepAliveStdGen :: forall header block (m :: * -> *). AppArgs header block m -> StdGen
aaKeepAliveStdGen :: StdGen
aaKeepAliveStdGen
, DiffTime
aaKeepAliveInterval :: forall header block (m :: * -> *).
AppArgs header block m -> DiffTime
aaKeepAliveInterval :: DiffTime
aaKeepAliveInterval
, DiffTime
aaPingPongInterval :: forall header block (m :: * -> *).
AppArgs header block m -> DiffTime
aaPingPongInterval :: DiffTime
aaPingPongInterval
, header -> m Bool
aaShouldChainSyncExit :: forall header block (m :: * -> *).
AppArgs header block m -> header -> m Bool
aaShouldChainSyncExit :: header -> m Bool
aaShouldChainSyncExit
, Bool
aaChainSyncEarlyExit :: forall header block (m :: * -> *). AppArgs header block m -> Bool
aaChainSyncEarlyExit :: Bool
aaChainSyncEarlyExit
, PeerSharing
aaOwnPeerSharing :: forall header block (m :: * -> *).
AppArgs header block m -> PeerSharing
aaOwnPeerSharing :: PeerSharing
aaOwnPeerSharing
, OutboundConnectionsState -> STM m ()
aaUpdateOutboundConnectionsState :: forall header block (m :: * -> *).
AppArgs header block m -> OutboundConnectionsState -> STM m ()
aaUpdateOutboundConnectionsState :: OutboundConnectionsState -> STM m ()
aaUpdateOutboundConnectionsState
}
block -> header
toHeader =
Diff.Applications
{ daApplicationInitiatorMode :: Versions
NtNVersion
NtNVersionData
(OuroborosBundleWithExpandedCtx
'InitiatorMode NtNAddr ByteString m () X)
Diff.daApplicationInitiatorMode =
NtNVersion
-> NtNVersionData
-> OuroborosBundleWithExpandedCtx
'InitiatorMode NtNAddr ByteString m () X
-> Versions
NtNVersion
NtNVersionData
(OuroborosBundleWithExpandedCtx
'InitiatorMode NtNAddr ByteString m () X)
forall vNum vData r. vNum -> vData -> r -> Versions vNum vData r
simpleSingletonVersions NtNVersion
UnversionedProtocol
(DiffusionMode -> PeerSharing -> NtNVersionData
NtNVersionData DiffusionMode
InitiatorOnlyDiffusionMode PeerSharing
aaOwnPeerSharing)
OuroborosBundleWithExpandedCtx
'InitiatorMode NtNAddr ByteString m () X
initiatorApp
, daApplicationInitiatorResponderMode :: Versions
NtNVersion
NtNVersionData
(OuroborosBundleWithExpandedCtx
'InitiatorResponderMode NtNAddr ByteString m () ())
Diff.daApplicationInitiatorResponderMode =
NtNVersion
-> NtNVersionData
-> OuroborosBundleWithExpandedCtx
'InitiatorResponderMode NtNAddr ByteString m () ()
-> Versions
NtNVersion
NtNVersionData
(OuroborosBundleWithExpandedCtx
'InitiatorResponderMode NtNAddr ByteString m () ())
forall vNum vData r. vNum -> vData -> r -> Versions vNum vData r
simpleSingletonVersions NtNVersion
UnversionedProtocol
(DiffusionMode -> PeerSharing -> NtNVersionData
NtNVersionData DiffusionMode
aaDiffusionMode PeerSharing
aaOwnPeerSharing)
OuroborosBundleWithExpandedCtx
'InitiatorResponderMode NtNAddr ByteString m () ()
initiatorAndResponderApp
, daLocalResponderApplication :: Versions
NtNVersion
NtCVersionData
(OuroborosApplicationWithMinimalCtx
'ResponderMode NtCAddr ByteString m X ())
Diff.daLocalResponderApplication =
NtNVersion
-> NtCVersionData
-> OuroborosApplicationWithMinimalCtx
'ResponderMode NtCAddr ByteString m X ()
-> Versions
NtNVersion
NtCVersionData
(OuroborosApplicationWithMinimalCtx
'ResponderMode NtCAddr ByteString m X ())
forall vNum vData r. vNum -> vData -> r -> Versions vNum vData r
simpleSingletonVersions NtNVersion
UnversionedProtocol
NtCVersionData
UnversionedProtocolData
OuroborosApplicationWithMinimalCtx
'ResponderMode NtCAddr ByteString m X ()
localResponderApp
, daLedgerPeersCtx :: LedgerPeersConsensusInterface m
Diff.daLedgerPeersCtx =
LedgerPeersConsensusInterface m
aaLedgerPeersConsensusInterface
, daUpdateOutboundConnectionsState :: OutboundConnectionsState -> STM m ()
Diff.daUpdateOutboundConnectionsState =
OutboundConnectionsState -> STM m ()
aaUpdateOutboundConnectionsState
}
where
initiatorApp
:: OuroborosBundleWithExpandedCtx Mx.InitiatorMode NtNAddr ByteString m () Void
initiatorApp :: OuroborosBundleWithExpandedCtx
'InitiatorMode NtNAddr ByteString m () X
initiatorApp = (MiniProtocolWithExpandedCtx
'InitiatorResponderMode NtNAddr ByteString m () ()
-> MiniProtocol
'InitiatorMode
(ExpandedInitiatorContext NtNAddr m)
(ResponderContext NtNAddr)
ByteString
m
()
X)
-> [MiniProtocolWithExpandedCtx
'InitiatorResponderMode NtNAddr ByteString m () ()]
-> [MiniProtocol
'InitiatorMode
(ExpandedInitiatorContext NtNAddr m)
(ResponderContext NtNAddr)
ByteString
m
()
X]
forall a b. (a -> b) -> [a] -> [b]
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
fmap MiniProtocolWithExpandedCtx
'InitiatorResponderMode NtNAddr ByteString m () ()
-> MiniProtocol
'InitiatorMode
(ExpandedInitiatorContext NtNAddr m)
(ResponderContext NtNAddr)
ByteString
m
()
X
f ([MiniProtocolWithExpandedCtx
'InitiatorResponderMode NtNAddr ByteString m () ()]
-> [MiniProtocol
'InitiatorMode
(ExpandedInitiatorContext NtNAddr m)
(ResponderContext NtNAddr)
ByteString
m
()
X])
-> OuroborosBundleWithExpandedCtx
'InitiatorResponderMode NtNAddr ByteString m () ()
-> OuroborosBundleWithExpandedCtx
'InitiatorMode NtNAddr ByteString m () X
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> OuroborosBundleWithExpandedCtx
'InitiatorResponderMode NtNAddr ByteString m () ()
initiatorAndResponderApp
where
f :: MiniProtocolWithExpandedCtx Mx.InitiatorResponderMode NtNAddr ByteString m () ()
-> MiniProtocolWithExpandedCtx Mx.InitiatorMode NtNAddr ByteString m () Void
f :: MiniProtocolWithExpandedCtx
'InitiatorResponderMode NtNAddr ByteString m () ()
-> MiniProtocol
'InitiatorMode
(ExpandedInitiatorContext NtNAddr m)
(ResponderContext NtNAddr)
ByteString
m
()
X
f MiniProtocol { MiniProtocolNum
miniProtocolNum :: MiniProtocolNum
miniProtocolNum :: forall (mode :: Mode) initiatorCtx responderCtx bytes (m :: * -> *)
a b.
MiniProtocol mode initiatorCtx responderCtx bytes m a b
-> MiniProtocolNum
miniProtocolNum
, MiniProtocolLimits
miniProtocolLimits :: MiniProtocolLimits
miniProtocolLimits :: forall (mode :: Mode) initiatorCtx responderCtx bytes (m :: * -> *)
a b.
MiniProtocol mode initiatorCtx responderCtx bytes m a b
-> MiniProtocolLimits
miniProtocolLimits
, RunMiniProtocol
'InitiatorResponderMode
(ExpandedInitiatorContext NtNAddr m)
(ResponderContext NtNAddr)
ByteString
m
()
()
miniProtocolRun :: RunMiniProtocol
'InitiatorResponderMode
(ExpandedInitiatorContext NtNAddr m)
(ResponderContext NtNAddr)
ByteString
m
()
()
miniProtocolRun :: forall (mode :: Mode) initiatorCtx responderCtx bytes (m :: * -> *)
a b.
MiniProtocol mode initiatorCtx responderCtx bytes m a b
-> RunMiniProtocol mode initiatorCtx responderCtx bytes m a b
miniProtocolRun } =
MiniProtocol { MiniProtocolNum
miniProtocolNum :: MiniProtocolNum
miniProtocolNum :: MiniProtocolNum
miniProtocolNum
, MiniProtocolLimits
miniProtocolLimits :: MiniProtocolLimits
miniProtocolLimits :: MiniProtocolLimits
miniProtocolLimits
, miniProtocolRun :: RunMiniProtocol
'InitiatorMode
(ExpandedInitiatorContext NtNAddr m)
(ResponderContext NtNAddr)
ByteString
m
()
X
miniProtocolRun =
case RunMiniProtocol
'InitiatorResponderMode
(ExpandedInitiatorContext NtNAddr m)
(ResponderContext NtNAddr)
ByteString
m
()
()
miniProtocolRun of
InitiatorAndResponderProtocol MiniProtocolCb (ExpandedInitiatorContext NtNAddr m) ByteString m ()
initiator MiniProtocolCb (ResponderContext NtNAddr) ByteString m ()
_respnder ->
MiniProtocolCb (ExpandedInitiatorContext NtNAddr m) ByteString m ()
-> RunMiniProtocol
'InitiatorMode
(ExpandedInitiatorContext NtNAddr m)
(ResponderContext NtNAddr)
ByteString
m
()
X
forall initiatorCtx bytes (m :: * -> *) a responderCtx.
MiniProtocolCb initiatorCtx bytes m a
-> RunMiniProtocol
'InitiatorMode initiatorCtx responderCtx bytes m a X
InitiatorProtocolOnly MiniProtocolCb (ExpandedInitiatorContext NtNAddr m) ByteString m ()
initiator
}
initiatorAndResponderApp
:: OuroborosBundleWithExpandedCtx Mx.InitiatorResponderMode NtNAddr ByteString m () ()
initiatorAndResponderApp :: OuroborosBundleWithExpandedCtx
'InitiatorResponderMode NtNAddr ByteString m () ()
initiatorAndResponderApp = TemperatureBundle
{ withHot :: WithProtocolTemperature
'Hot
[MiniProtocolWithExpandedCtx
'InitiatorResponderMode NtNAddr ByteString m () ()]
withHot = [MiniProtocolWithExpandedCtx
'InitiatorResponderMode NtNAddr ByteString m () ()]
-> WithProtocolTemperature
'Hot
[MiniProtocolWithExpandedCtx
'InitiatorResponderMode NtNAddr ByteString m () ()]
forall a. a -> WithProtocolTemperature 'Hot a
WithHot
[ MiniProtocol
{ miniProtocolNum :: MiniProtocolNum
miniProtocolNum = MiniProtocolNum
chainSyncMiniProtocolNum
, miniProtocolLimits :: MiniProtocolLimits
miniProtocolLimits = LimitsAndTimeouts header block -> MiniProtocolLimits
forall header block.
LimitsAndTimeouts header block -> MiniProtocolLimits
chainSyncLimits LimitsAndTimeouts header block
limits
, miniProtocolRun :: RunMiniProtocol
'InitiatorResponderMode
(ExpandedInitiatorContext NtNAddr m)
(ResponderContext NtNAddr)
ByteString
m
()
()
miniProtocolRun =
MiniProtocolCb (ExpandedInitiatorContext NtNAddr m) ByteString m ()
-> MiniProtocolCb (ResponderContext NtNAddr) ByteString m ()
-> RunMiniProtocol
'InitiatorResponderMode
(ExpandedInitiatorContext NtNAddr m)
(ResponderContext NtNAddr)
ByteString
m
()
()
forall initiatorCtx bytes (m :: * -> *) a responderCtx b.
MiniProtocolCb initiatorCtx bytes m a
-> MiniProtocolCb responderCtx bytes m b
-> RunMiniProtocol
'InitiatorResponderMode initiatorCtx responderCtx bytes m a b
InitiatorAndResponderProtocol
MiniProtocolCb (ExpandedInitiatorContext NtNAddr m) ByteString m ()
chainSyncInitiator
MiniProtocolCb (ResponderContext NtNAddr) ByteString m ()
chainSyncResponder
}
, MiniProtocol
{ miniProtocolNum :: MiniProtocolNum
miniProtocolNum = MiniProtocolNum
blockFetchMiniProtocolNum
, miniProtocolLimits :: MiniProtocolLimits
miniProtocolLimits = LimitsAndTimeouts header block -> MiniProtocolLimits
forall header block.
LimitsAndTimeouts header block -> MiniProtocolLimits
blockFetchLimits LimitsAndTimeouts header block
limits
, miniProtocolRun :: RunMiniProtocol
'InitiatorResponderMode
(ExpandedInitiatorContext NtNAddr m)
(ResponderContext NtNAddr)
ByteString
m
()
()
miniProtocolRun =
MiniProtocolCb (ExpandedInitiatorContext NtNAddr m) ByteString m ()
-> MiniProtocolCb (ResponderContext NtNAddr) ByteString m ()
-> RunMiniProtocol
'InitiatorResponderMode
(ExpandedInitiatorContext NtNAddr m)
(ResponderContext NtNAddr)
ByteString
m
()
()
forall initiatorCtx bytes (m :: * -> *) a responderCtx b.
MiniProtocolCb initiatorCtx bytes m a
-> MiniProtocolCb responderCtx bytes m b
-> RunMiniProtocol
'InitiatorResponderMode initiatorCtx responderCtx bytes m a b
InitiatorAndResponderProtocol
MiniProtocolCb (ExpandedInitiatorContext NtNAddr m) ByteString m ()
blockFetchInitiator
MiniProtocolCb (ResponderContext NtNAddr) ByteString m ()
blockFetchResponder
}
]
, withWarm :: WithProtocolTemperature
'Warm
[MiniProtocolWithExpandedCtx
'InitiatorResponderMode NtNAddr ByteString m () ()]
withWarm = [MiniProtocolWithExpandedCtx
'InitiatorResponderMode NtNAddr ByteString m () ()]
-> WithProtocolTemperature
'Warm
[MiniProtocolWithExpandedCtx
'InitiatorResponderMode NtNAddr ByteString m () ()]
forall a. a -> WithProtocolTemperature 'Warm a
WithWarm
[ MiniProtocol
{ miniProtocolNum :: MiniProtocolNum
miniProtocolNum = Word16 -> MiniProtocolNum
MiniProtocolNum Word16
9
, miniProtocolLimits :: MiniProtocolLimits
miniProtocolLimits = LimitsAndTimeouts header block -> MiniProtocolLimits
forall header block.
LimitsAndTimeouts header block -> MiniProtocolLimits
pingPongLimits LimitsAndTimeouts header block
limits
, miniProtocolRun :: RunMiniProtocol
'InitiatorResponderMode
(ExpandedInitiatorContext NtNAddr m)
(ResponderContext NtNAddr)
ByteString
m
()
()
miniProtocolRun =
MiniProtocolCb (ExpandedInitiatorContext NtNAddr m) ByteString m ()
-> MiniProtocolCb (ResponderContext NtNAddr) ByteString m ()
-> RunMiniProtocol
'InitiatorResponderMode
(ExpandedInitiatorContext NtNAddr m)
(ResponderContext NtNAddr)
ByteString
m
()
()
forall initiatorCtx bytes (m :: * -> *) a responderCtx b.
MiniProtocolCb initiatorCtx bytes m a
-> MiniProtocolCb responderCtx bytes m b
-> RunMiniProtocol
'InitiatorResponderMode initiatorCtx responderCtx bytes m a b
InitiatorAndResponderProtocol
MiniProtocolCb (ExpandedInitiatorContext NtNAddr m) ByteString m ()
pingPongInitiator
MiniProtocolCb (ResponderContext NtNAddr) ByteString m ()
pingPongResponder
}
]
, withEstablished :: WithProtocolTemperature
'Established
[MiniProtocolWithExpandedCtx
'InitiatorResponderMode NtNAddr ByteString m () ()]
withEstablished = [MiniProtocolWithExpandedCtx
'InitiatorResponderMode NtNAddr ByteString m () ()]
-> WithProtocolTemperature
'Established
[MiniProtocolWithExpandedCtx
'InitiatorResponderMode NtNAddr ByteString m () ()]
forall a. a -> WithProtocolTemperature 'Established a
WithEstablished ([MiniProtocolWithExpandedCtx
'InitiatorResponderMode NtNAddr ByteString m () ()]
-> WithProtocolTemperature
'Established
[MiniProtocolWithExpandedCtx
'InitiatorResponderMode NtNAddr ByteString m () ()])
-> [MiniProtocolWithExpandedCtx
'InitiatorResponderMode NtNAddr ByteString m () ()]
-> WithProtocolTemperature
'Established
[MiniProtocolWithExpandedCtx
'InitiatorResponderMode NtNAddr ByteString m () ()]
forall a b. (a -> b) -> a -> b
$
[ MiniProtocol
{ miniProtocolNum :: MiniProtocolNum
miniProtocolNum = MiniProtocolNum
keepAliveMiniProtocolNum
, miniProtocolLimits :: MiniProtocolLimits
miniProtocolLimits = LimitsAndTimeouts header block -> MiniProtocolLimits
forall header block.
LimitsAndTimeouts header block -> MiniProtocolLimits
keepAliveLimits LimitsAndTimeouts header block
limits
, miniProtocolRun :: RunMiniProtocol
'InitiatorResponderMode
(ExpandedInitiatorContext NtNAddr m)
(ResponderContext NtNAddr)
ByteString
m
()
()
miniProtocolRun =
MiniProtocolCb (ExpandedInitiatorContext NtNAddr m) ByteString m ()
-> MiniProtocolCb (ResponderContext NtNAddr) ByteString m ()
-> RunMiniProtocol
'InitiatorResponderMode
(ExpandedInitiatorContext NtNAddr m)
(ResponderContext NtNAddr)
ByteString
m
()
()
forall initiatorCtx bytes (m :: * -> *) a responderCtx b.
MiniProtocolCb initiatorCtx bytes m a
-> MiniProtocolCb responderCtx bytes m b
-> RunMiniProtocol
'InitiatorResponderMode initiatorCtx responderCtx bytes m a b
InitiatorAndResponderProtocol
MiniProtocolCb (ExpandedInitiatorContext NtNAddr m) ByteString m ()
keepAliveInitiator
MiniProtocolCb (ResponderContext NtNAddr) ByteString m ()
keepAliveResponder
}
] [MiniProtocolWithExpandedCtx
'InitiatorResponderMode NtNAddr ByteString m () ()]
-> [MiniProtocolWithExpandedCtx
'InitiatorResponderMode NtNAddr ByteString m () ()]
-> [MiniProtocolWithExpandedCtx
'InitiatorResponderMode NtNAddr ByteString m () ()]
forall a. [a] -> [a] -> [a]
++ if PeerSharing
aaOwnPeerSharing PeerSharing -> PeerSharing -> Bool
forall a. Eq a => a -> a -> Bool
/= PeerSharing
PSTypes.PeerSharingDisabled
then [ MiniProtocol
{ miniProtocolNum :: MiniProtocolNum
miniProtocolNum = MiniProtocolNum
peerSharingMiniProtocolNum
, miniProtocolLimits :: MiniProtocolLimits
miniProtocolLimits = LimitsAndTimeouts header block -> MiniProtocolLimits
forall header block.
LimitsAndTimeouts header block -> MiniProtocolLimits
peerSharingLimits LimitsAndTimeouts header block
limits
, miniProtocolRun :: RunMiniProtocol
'InitiatorResponderMode
(ExpandedInitiatorContext NtNAddr m)
(ResponderContext NtNAddr)
ByteString
m
()
()
miniProtocolRun =
MiniProtocolCb (ExpandedInitiatorContext NtNAddr m) ByteString m ()
-> MiniProtocolCb (ResponderContext NtNAddr) ByteString m ()
-> RunMiniProtocol
'InitiatorResponderMode
(ExpandedInitiatorContext NtNAddr m)
(ResponderContext NtNAddr)
ByteString
m
()
()
forall initiatorCtx bytes (m :: * -> *) a responderCtx b.
MiniProtocolCb initiatorCtx bytes m a
-> MiniProtocolCb responderCtx bytes m b
-> RunMiniProtocol
'InitiatorResponderMode initiatorCtx responderCtx bytes m a b
InitiatorAndResponderProtocol
MiniProtocolCb (ExpandedInitiatorContext NtNAddr m) ByteString m ()
peerSharingInitiator
(PeerSharingAPI NtNAddr s m
-> MiniProtocolCb (ResponderContext NtNAddr) ByteString m ()
peerSharingResponder (NodeKernel header block s m -> PeerSharingAPI NtNAddr s m
forall header block s (m :: * -> *).
NodeKernel header block s m -> PeerSharingAPI NtNAddr s m
nkPeerSharingAPI NodeKernel header block s m
nodeKernel))
}
]
else []
}
localResponderApp
:: OuroborosApplicationWithMinimalCtx
Mx.ResponderMode NtCAddr ByteString m Void ()
localResponderApp :: OuroborosApplicationWithMinimalCtx
'ResponderMode NtCAddr ByteString m X ()
localResponderApp = [MiniProtocol
'ResponderMode
(MinimalInitiatorContext NtCAddr)
(ResponderContext NtCAddr)
ByteString
m
X
()]
-> OuroborosApplicationWithMinimalCtx
'ResponderMode NtCAddr ByteString m X ()
forall (mode :: Mode) initiatorCtx responderCtx bytes (m :: * -> *)
a b.
[MiniProtocol mode initiatorCtx responderCtx bytes m a b]
-> OuroborosApplication mode initiatorCtx responderCtx bytes m a b
OuroborosApplication []
chainSyncInitiator
:: MiniProtocolCb (ExpandedInitiatorContext NtNAddr m) ByteString m ()
chainSyncInitiator :: MiniProtocolCb (ExpandedInitiatorContext NtNAddr m) ByteString m ()
chainSyncInitiator =
(ExpandedInitiatorContext NtNAddr m
-> Channel m ByteString -> m ((), Maybe ByteString))
-> MiniProtocolCb
(ExpandedInitiatorContext NtNAddr m) ByteString m ()
forall ctx bytes (m :: * -> *) a.
(ctx -> Channel m bytes -> m (a, Maybe bytes))
-> MiniProtocolCb ctx bytes m a
MiniProtocolCb ((ExpandedInitiatorContext NtNAddr m
-> Channel m ByteString -> m ((), Maybe ByteString))
-> MiniProtocolCb
(ExpandedInitiatorContext NtNAddr m) ByteString m ())
-> (ExpandedInitiatorContext NtNAddr m
-> Channel m ByteString -> m ((), Maybe ByteString))
-> MiniProtocolCb
(ExpandedInitiatorContext NtNAddr m) ByteString m ()
forall a b. (a -> b) -> a -> b
$
\ ExpandedInitiatorContext {
eicConnectionId :: forall addr (m :: * -> *).
ExpandedInitiatorContext addr m -> ConnectionId addr
eicConnectionId = ConnectionId NtNAddr
connId,
eicControlMessage :: forall addr (m :: * -> *).
ExpandedInitiatorContext addr m -> ControlMessageSTM m
eicControlMessage = ControlMessageSTM m
controlMessageSTM
}
Channel m ByteString
channel
->
let client :: Client header point tip m ()
client :: forall point tip. Client header point tip m ()
client = Client header point tip m ()
go
where
go :: Client header point tip m ()
go = Client
{ rollbackward :: point -> tip -> m (Either () (Client header point tip m ()))
rollbackward = \point
_ tip
_ -> do
ctrl <- ControlMessageSTM m -> m ControlMessage
forall a. HasCallStack => STM m a -> m a
forall (m :: * -> *) a.
(MonadSTM m, HasCallStack) =>
STM m a -> m a
atomically ControlMessageSTM m
controlMessageSTM
case ctrl of
ControlMessage
Continue -> Either () (Client header point tip m ())
-> m (Either () (Client header point tip m ()))
forall a. a -> m a
forall (f :: * -> *) a. Applicative f => a -> f a
pure (Client header point tip m ()
-> Either () (Client header point tip m ())
forall a b. b -> Either a b
Right Client header point tip m ()
go)
ControlMessage
Quiesce -> String -> m (Either () (Client header point tip m ()))
forall a. HasCallStack => String -> a
error String
"Ouroboros.Network.Protocol.ChainSync.Examples.controlledClient: unexpected Quiesce"
ControlMessage
Terminate -> Either () (Client header point tip m ())
-> m (Either () (Client header point tip m ()))
forall a. a -> m a
forall (f :: * -> *) a. Applicative f => a -> f a
pure (() -> Either () (Client header point tip m ())
forall a b. a -> Either a b
Left ())
, rollforward :: header -> m (Either () (Client header point tip m ()))
rollforward = \header
header -> do
exit <- header -> m Bool
aaShouldChainSyncExit header
header
if exit
then pure (Left ())
else do ctrl <- atomically controlMessageSTM
case ctrl of
ControlMessage
Continue -> Either () (Client header point tip m ())
-> m (Either () (Client header point tip m ()))
forall a. a -> m a
forall (f :: * -> *) a. Applicative f => a -> f a
pure (Client header point tip m ()
-> Either () (Client header point tip m ())
forall a b. b -> Either a b
Right Client header point tip m ()
go)
ControlMessage
Quiesce -> String -> m (Either () (Client header point tip m ()))
forall a. HasCallStack => String -> a
error String
"Ouroboros.Network.Protocol.ChainSync.Examples.controlledClient: unexpected Quiesce"
ControlMessage
Terminate -> Either () (Client header point tip m ())
-> m (Either () (Client header point tip m ()))
forall a. a -> m a
forall (f :: * -> *) a. Applicative f => a -> f a
pure (() -> Either () (Client header point tip m ())
forall a b. a -> Either a b
Left ())
, points :: [point] -> m (Either () (Client header point tip m ()))
points = \[point]
_ -> Either () (Client header point tip m ())
-> m (Either () (Client header point tip m ()))
forall a. a -> m a
forall (f :: * -> *) a. Applicative f => a -> f a
pure (Either () (Client header point tip m ())
-> m (Either () (Client header point tip m ())))
-> Either () (Client header point tip m ())
-> m (Either () (Client header point tip m ()))
forall a b. (a -> b) -> a -> b
$
if Bool
aaChainSyncEarlyExit
then () -> Either () (Client header point tip m ())
forall a b. a -> Either a b
Left ()
else Client header point tip m ()
-> Either () (Client header point tip m ())
forall a b. b -> Either a b
Right Client header point tip m ()
go
}
in do String -> m ()
forall (m :: * -> *). MonadThread m => String -> m ()
labelThisThread String
"ChainSyncClient"
FetchClientRegistry NtNAddr header block m
-> NtNAddr -> m ((), Maybe ByteString) -> m ((), Maybe ByteString)
forall (m :: * -> *) a peer header block.
(MonadSTM m, MonadFork m, MonadCatch m, Ord peer) =>
FetchClientRegistry peer header block m -> peer -> m a -> m a
bracketSyncWithFetchClient (NodeKernel header block s m
-> FetchClientRegistry NtNAddr header block m
forall header block s (m :: * -> *).
NodeKernel header block s m
-> FetchClientRegistry NtNAddr header block m
nkFetchClientRegistry NodeKernel header block s m
nodeKernel)
(ConnectionId NtNAddr -> NtNAddr
forall addr. ConnectionId addr -> addr
remoteAddress ConnectionId NtNAddr
connId) (m ((), Maybe ByteString) -> m ((), Maybe ByteString))
-> m ((), Maybe ByteString) -> m ((), Maybe ByteString)
forall a b. (a -> b) -> a -> b
$
m (StrictTVar m (Chain header))
-> (StrictTVar m (Chain header) -> m ())
-> (StrictTVar m (Chain header) -> m ((), Maybe ByteString))
-> m ((), Maybe ByteString)
forall a b c. m a -> (a -> m b) -> (a -> m c) -> m c
forall (m :: * -> *) a b c.
MonadThrow m =>
m a -> (a -> m b) -> (a -> m c) -> m c
bracket (NodeKernel header block s m
-> NtNAddr -> m (StrictTVar m (Chain header))
forall (m :: * -> *) header block s.
MonadSTM m =>
NodeKernel header block s m
-> NtNAddr -> m (StrictTVar m (Chain header))
registerClientChains NodeKernel header block s m
nodeKernel (ConnectionId NtNAddr -> NtNAddr
forall addr. ConnectionId addr -> addr
remoteAddress ConnectionId NtNAddr
connId))
(\StrictTVar m (Chain header)
_ -> NodeKernel header block s m -> NtNAddr -> m ()
forall (m :: * -> *) header block s.
MonadSTM m =>
NodeKernel header block s m -> NtNAddr -> m ()
unregisterClientChains NodeKernel header block s m
nodeKernel (ConnectionId NtNAddr -> NtNAddr
forall addr. ConnectionId addr -> addr
remoteAddress ConnectionId NtNAddr
connId))
(\StrictTVar m (Chain header)
chainVar ->
Tracer
m (TraceSendRecv (ChainSync header (Point block) (Tip block)))
-> Codec
(ChainSync header (Point block) (Tip block))
DeserialiseFailure
m
ByteString
-> ProtocolSizeLimits
(ChainSync header (Point block) (Tip block)) ByteString
-> ProtocolTimeLimits (ChainSync header (Point block) (Tip block))
-> Channel m ByteString
-> Peer
(ChainSync header (Point block) (Tip block))
'AsClient
'NonPipelined
'StIdle
m
()
-> m ((), Maybe ByteString)
forall ps (st :: ps) (pr :: PeerRole) failure bytes (m :: * -> *)
a.
(MonadAsync m, MonadFork m, MonadMask m, MonadThrow (STM m),
MonadTimer m, ShowProxy ps,
forall (st' :: ps) stok. (stok ~ StateToken st') => Show stok,
Show failure) =>
Tracer m (TraceSendRecv ps)
-> Codec ps failure m bytes
-> ProtocolSizeLimits ps bytes
-> ProtocolTimeLimits ps
-> Channel m bytes
-> Peer ps pr 'NonPipelined st m a
-> m (a, Maybe bytes)
runPeerWithLimits
Tracer
m (TraceSendRecv (ChainSync header (Point block) (Tip block)))
forall (m :: * -> *) a. Applicative m => Tracer m a
nullTracer
Codec
(ChainSync header (Point block) (Tip block))
DeserialiseFailure
m
ByteString
chainSyncCodec
(LimitsAndTimeouts header block
-> ProtocolSizeLimits
(ChainSync header (Point block) (Tip block)) ByteString
forall header block.
LimitsAndTimeouts header block
-> ProtocolSizeLimits
(ChainSync header (Point block) (Tip block)) ByteString
chainSyncSizeLimits LimitsAndTimeouts header block
limits)
(LimitsAndTimeouts header block
-> ProtocolTimeLimits (ChainSync header (Point block) (Tip block))
forall header block.
LimitsAndTimeouts header block
-> ProtocolTimeLimits (ChainSync header (Point block) (Tip block))
chainSyncTimeLimits LimitsAndTimeouts header block
limits)
Channel m ByteString
channel
(ChainSyncClient header (Point block) (Tip block) m ()
-> Peer
(ChainSync header (Point block) (Tip block))
'AsClient
'NonPipelined
'StIdle
m
()
forall header point tip (m :: * -> *) a.
Monad m =>
ChainSyncClient header point tip m a
-> Client (ChainSync header point tip) 'NonPipelined 'StIdle m a
chainSyncClientPeer (ChainSyncClient header (Point block) (Tip block) m ()
-> Peer
(ChainSync header (Point block) (Tip block))
'AsClient
'NonPipelined
'StIdle
m
())
-> ChainSyncClient header (Point block) (Tip block) m ()
-> Peer
(ChainSync header (Point block) (Tip block))
'AsClient
'NonPipelined
'StIdle
m
()
forall a b. (a -> b) -> a -> b
$
StrictTVar m (Chain header)
-> Client header (Point block) (Tip block) m ()
-> ChainSyncClient header (Point block) (Tip block) m ()
forall header block tip (m :: * -> *) a.
(HasHeader header, HasHeader block,
HeaderHash header ~ HeaderHash block, MonadSTM m) =>
StrictTVar m (Chain header)
-> Client header (Point block) tip m a
-> ChainSyncClient header (Point block) tip m a
chainSyncClientExample
StrictTVar m (Chain header)
chainVar
Client header (Point block) (Tip block) m ()
forall point tip. Client header point tip m ()
client)
)
chainSyncResponder
:: MiniProtocolCb (ResponderContext NtNAddr) ByteString m ()
chainSyncResponder :: MiniProtocolCb (ResponderContext NtNAddr) ByteString m ()
chainSyncResponder = (ResponderContext NtNAddr
-> Channel m ByteString -> m ((), Maybe ByteString))
-> MiniProtocolCb (ResponderContext NtNAddr) ByteString m ()
forall ctx bytes (m :: * -> *) a.
(ctx -> Channel m bytes -> m (a, Maybe bytes))
-> MiniProtocolCb ctx bytes m a
MiniProtocolCb ((ResponderContext NtNAddr
-> Channel m ByteString -> m ((), Maybe ByteString))
-> MiniProtocolCb (ResponderContext NtNAddr) ByteString m ())
-> (ResponderContext NtNAddr
-> Channel m ByteString -> m ((), Maybe ByteString))
-> MiniProtocolCb (ResponderContext NtNAddr) ByteString m ()
forall a b. (a -> b) -> a -> b
$ \ResponderContext NtNAddr
_ctx Channel m ByteString
channel -> do
String -> m ()
forall (m :: * -> *). MonadThread m => String -> m ()
labelThisThread String
"ChainSyncServer"
Tracer
m (TraceSendRecv (ChainSync header (Point block) (Tip block)))
-> Codec
(ChainSync header (Point block) (Tip block))
DeserialiseFailure
m
ByteString
-> ProtocolSizeLimits
(ChainSync header (Point block) (Tip block)) ByteString
-> ProtocolTimeLimits (ChainSync header (Point block) (Tip block))
-> Channel m ByteString
-> Peer
(ChainSync header (Point block) (Tip block))
'AsServer
'NonPipelined
'StIdle
m
()
-> m ((), Maybe ByteString)
forall ps (st :: ps) (pr :: PeerRole) failure bytes (m :: * -> *)
a.
(MonadAsync m, MonadFork m, MonadMask m, MonadThrow (STM m),
MonadTimer m, ShowProxy ps,
forall (st' :: ps) stok. (stok ~ StateToken st') => Show stok,
Show failure) =>
Tracer m (TraceSendRecv ps)
-> Codec ps failure m bytes
-> ProtocolSizeLimits ps bytes
-> ProtocolTimeLimits ps
-> Channel m bytes
-> Peer ps pr 'NonPipelined st m a
-> m (a, Maybe bytes)
runPeerWithLimits
Tracer
m (TraceSendRecv (ChainSync header (Point block) (Tip block)))
forall (m :: * -> *) a. Applicative m => Tracer m a
nullTracer
Codec
(ChainSync header (Point block) (Tip block))
DeserialiseFailure
m
ByteString
chainSyncCodec
(LimitsAndTimeouts header block
-> ProtocolSizeLimits
(ChainSync header (Point block) (Tip block)) ByteString
forall header block.
LimitsAndTimeouts header block
-> ProtocolSizeLimits
(ChainSync header (Point block) (Tip block)) ByteString
chainSyncSizeLimits LimitsAndTimeouts header block
limits)
(LimitsAndTimeouts header block
-> ProtocolTimeLimits (ChainSync header (Point block) (Tip block))
forall header block.
LimitsAndTimeouts header block
-> ProtocolTimeLimits (ChainSync header (Point block) (Tip block))
chainSyncTimeLimits LimitsAndTimeouts header block
limits)
Channel m ByteString
channel
(ChainSyncServer header (Point block) (Tip block) m ()
-> Peer
(ChainSync header (Point block) (Tip block))
'AsServer
'NonPipelined
'StIdle
m
()
forall header point tip (m :: * -> *) a.
Monad m =>
ChainSyncServer header point tip m a
-> Server (ChainSync header point tip) 'NonPipelined 'StIdle m a
chainSyncServerPeer
(()
-> StrictTVar m (ChainProducerState block)
-> (block -> header)
-> ChainSyncServer header (Point block) (Tip block) m ()
forall blk header (m :: * -> *) a.
(HasHeader blk, MonadSTM m, HeaderHash header ~ HeaderHash blk) =>
a
-> StrictTVar m (ChainProducerState blk)
-> (blk -> header)
-> ChainSyncServer header (Point blk) (Tip blk) m a
chainSyncServerExample
() (NodeKernel header block s m
-> StrictTVar m (ChainProducerState block)
forall header block s (m :: * -> *).
NodeKernel header block s m
-> StrictTVar m (ChainProducerState block)
nkChainProducerState NodeKernel header block s m
nodeKernel) block -> header
toHeader))
blockFetchInitiator
:: MiniProtocolCb (ExpandedInitiatorContext NtNAddr m) ByteString m ()
blockFetchInitiator :: MiniProtocolCb (ExpandedInitiatorContext NtNAddr m) ByteString m ()
blockFetchInitiator =
(ExpandedInitiatorContext NtNAddr m
-> Channel m ByteString -> m ((), Maybe ByteString))
-> MiniProtocolCb
(ExpandedInitiatorContext NtNAddr m) ByteString m ()
forall ctx bytes (m :: * -> *) a.
(ctx -> Channel m bytes -> m (a, Maybe bytes))
-> MiniProtocolCb ctx bytes m a
MiniProtocolCb ((ExpandedInitiatorContext NtNAddr m
-> Channel m ByteString -> m ((), Maybe ByteString))
-> MiniProtocolCb
(ExpandedInitiatorContext NtNAddr m) ByteString m ())
-> (ExpandedInitiatorContext NtNAddr m
-> Channel m ByteString -> m ((), Maybe ByteString))
-> MiniProtocolCb
(ExpandedInitiatorContext NtNAddr m) ByteString m ()
forall a b. (a -> b) -> a -> b
$
\ ExpandedInitiatorContext {
eicConnectionId :: forall addr (m :: * -> *).
ExpandedInitiatorContext addr m -> ConnectionId addr
eicConnectionId = ConnectionId { NtNAddr
remoteAddress :: forall addr. ConnectionId addr -> addr
remoteAddress :: NtNAddr
remoteAddress },
eicControlMessage :: forall addr (m :: * -> *).
ExpandedInitiatorContext addr m -> ControlMessageSTM m
eicControlMessage = ControlMessageSTM m
controlMessageSTM
}
Channel m ByteString
channel
-> do String -> m ()
forall (m :: * -> *). MonadThread m => String -> m ()
labelThisThread String
"BlockFetchClient"
FetchClientRegistry NtNAddr header block m
-> NtNVersion
-> NtNAddr
-> (FetchClientContext header block m -> m ((), Maybe ByteString))
-> m ((), Maybe ByteString)
forall (m :: * -> *) a peer header block version.
(MonadFork m, MonadMask m, MonadTimer m, Ord peer) =>
FetchClientRegistry peer header block m
-> version
-> peer
-> (FetchClientContext header block m -> m a)
-> m a
bracketFetchClient (NodeKernel header block s m
-> FetchClientRegistry NtNAddr header block m
forall header block s (m :: * -> *).
NodeKernel header block s m
-> FetchClientRegistry NtNAddr header block m
nkFetchClientRegistry NodeKernel header block s m
nodeKernel)
NtNVersion
UnversionedProtocol
NtNAddr
remoteAddress
((FetchClientContext header block m -> m ((), Maybe ByteString))
-> m ((), Maybe ByteString))
-> (FetchClientContext header block m -> m ((), Maybe ByteString))
-> m ((), Maybe ByteString)
forall a b. (a -> b) -> a -> b
$ \FetchClientContext header block m
clientCtx ->
Tracer m (TraceSendRecv (BlockFetch block (Point block)))
-> Codec
(BlockFetch block (Point block)) DeserialiseFailure m ByteString
-> ProtocolSizeLimits (BlockFetch block (Point block)) ByteString
-> ProtocolTimeLimits (BlockFetch block (Point block))
-> Channel m ByteString
-> Peer
(BlockFetch block (Point block))
'AsClient
'NonPipelined
'BFIdle
m
()
-> m ((), Maybe ByteString)
forall ps (st :: ps) (pr :: PeerRole) failure bytes (m :: * -> *)
a.
(MonadAsync m, MonadFork m, MonadMask m, MonadThrow (STM m),
MonadTimer m, ShowProxy ps,
forall (st' :: ps) stok. (stok ~ StateToken st') => Show stok,
Show failure) =>
Tracer m (TraceSendRecv ps)
-> Codec ps failure m bytes
-> ProtocolSizeLimits ps bytes
-> ProtocolTimeLimits ps
-> Channel m bytes
-> Peer ps pr 'NonPipelined st m a
-> m (a, Maybe bytes)
runPeerWithLimits
Tracer m (TraceSendRecv (BlockFetch block (Point block)))
forall (m :: * -> *) a. Applicative m => Tracer m a
nullTracer
Codec
(BlockFetch block (Point block)) DeserialiseFailure m ByteString
blockFetchCodec
(LimitsAndTimeouts header block
-> ProtocolSizeLimits (BlockFetch block (Point block)) ByteString
forall header block.
LimitsAndTimeouts header block
-> ProtocolSizeLimits (BlockFetch block (Point block)) ByteString
blockFetchSizeLimits LimitsAndTimeouts header block
limits)
(LimitsAndTimeouts header block
-> ProtocolTimeLimits (BlockFetch block (Point block))
forall header block.
LimitsAndTimeouts header block
-> ProtocolTimeLimits (BlockFetch block (Point block))
blockFetchTimeLimits LimitsAndTimeouts header block
limits)
Channel m ByteString
channel
([Bool]
-> PeerPipelined
(BlockFetch block (Point block)) 'AsClient 'BFIdle m ()
-> Peer
(BlockFetch block (Point block))
'AsClient
'NonPipelined
'BFIdle
m
()
forall ps (pr :: PeerRole) (st :: ps) (m :: * -> *) a.
Functor m =>
[Bool]
-> PeerPipelined ps pr st m a -> Peer ps pr 'NonPipelined st m a
forgetPipelined []
(PeerPipelined
(BlockFetch block (Point block)) 'AsClient 'BFIdle m ()
-> Peer
(BlockFetch block (Point block))
'AsClient
'NonPipelined
'BFIdle
m
())
-> PeerPipelined
(BlockFetch block (Point block)) 'AsClient 'BFIdle m ()
-> Peer
(BlockFetch block (Point block))
'AsClient
'NonPipelined
'BFIdle
m
()
forall a b. (a -> b) -> a -> b
$ NtNVersion
-> ControlMessageSTM m
-> FetchedMetricsTracer m
-> FetchClientContext header block m
-> PeerPipelined
(BlockFetch block (Point block)) 'AsClient 'BFIdle m ()
forall header block versionNumber (m :: * -> *).
(MonadSTM m, MonadThrow m, MonadTime m, MonadMonotonicTime m,
HasHeader header, HasHeader block,
HeaderHash header ~ HeaderHash block) =>
versionNumber
-> ControlMessageSTM m
-> FetchedMetricsTracer m
-> FetchClientContext header block m
-> ClientPipelined (BlockFetch block (Point block)) 'BFIdle m ()
blockFetchClient NtNVersion
UnversionedProtocol ControlMessageSTM m
controlMessageSTM
FetchedMetricsTracer m
forall (m :: * -> *) a. Applicative m => Tracer m a
nullTracer FetchClientContext header block m
clientCtx)
blockFetchResponder
:: MiniProtocolCb (ResponderContext NtNAddr) ByteString m ()
blockFetchResponder :: MiniProtocolCb (ResponderContext NtNAddr) ByteString m ()
blockFetchResponder =
(ResponderContext NtNAddr
-> Channel m ByteString -> m ((), Maybe ByteString))
-> MiniProtocolCb (ResponderContext NtNAddr) ByteString m ()
forall ctx bytes (m :: * -> *) a.
(ctx -> Channel m bytes -> m (a, Maybe bytes))
-> MiniProtocolCb ctx bytes m a
MiniProtocolCb ((ResponderContext NtNAddr
-> Channel m ByteString -> m ((), Maybe ByteString))
-> MiniProtocolCb (ResponderContext NtNAddr) ByteString m ())
-> (ResponderContext NtNAddr
-> Channel m ByteString -> m ((), Maybe ByteString))
-> MiniProtocolCb (ResponderContext NtNAddr) ByteString m ()
forall a b. (a -> b) -> a -> b
$ \ResponderContext NtNAddr
_ctx Channel m ByteString
channel -> do
String -> m ()
forall (m :: * -> *). MonadThread m => String -> m ()
labelThisThread String
"BlockFetchServer"
Tracer m (TraceSendRecv (BlockFetch block (Point block)))
-> Codec
(BlockFetch block (Point block)) DeserialiseFailure m ByteString
-> ProtocolSizeLimits (BlockFetch block (Point block)) ByteString
-> ProtocolTimeLimits (BlockFetch block (Point block))
-> Channel m ByteString
-> Peer
(BlockFetch block (Point block))
'AsServer
'NonPipelined
'BFIdle
m
()
-> m ((), Maybe ByteString)
forall ps (st :: ps) (pr :: PeerRole) failure bytes (m :: * -> *)
a.
(MonadAsync m, MonadFork m, MonadMask m, MonadThrow (STM m),
MonadTimer m, ShowProxy ps,
forall (st' :: ps) stok. (stok ~ StateToken st') => Show stok,
Show failure) =>
Tracer m (TraceSendRecv ps)
-> Codec ps failure m bytes
-> ProtocolSizeLimits ps bytes
-> ProtocolTimeLimits ps
-> Channel m bytes
-> Peer ps pr 'NonPipelined st m a
-> m (a, Maybe bytes)
runPeerWithLimits
Tracer m (TraceSendRecv (BlockFetch block (Point block)))
forall (m :: * -> *) a. Applicative m => Tracer m a
nullTracer
Codec
(BlockFetch block (Point block)) DeserialiseFailure m ByteString
blockFetchCodec
(LimitsAndTimeouts header block
-> ProtocolSizeLimits (BlockFetch block (Point block)) ByteString
forall header block.
LimitsAndTimeouts header block
-> ProtocolSizeLimits (BlockFetch block (Point block)) ByteString
blockFetchSizeLimits LimitsAndTimeouts header block
limits)
(LimitsAndTimeouts header block
-> ProtocolTimeLimits (BlockFetch block (Point block))
forall header block.
LimitsAndTimeouts header block
-> ProtocolTimeLimits (BlockFetch block (Point block))
blockFetchTimeLimits LimitsAndTimeouts header block
limits)
Channel m ByteString
channel
(BlockFetchServer block (Point block) m ()
-> Peer
(BlockFetch block (Point block))
'AsServer
'NonPipelined
'BFIdle
m
()
forall block point (m :: * -> *) a.
Functor m =>
BlockFetchServer block point m a
-> Server (BlockFetch block point) 'NonPipelined 'BFIdle m a
blockFetchServerPeer (BlockFetchServer block (Point block) m ()
-> Peer
(BlockFetch block (Point block))
'AsServer
'NonPipelined
'BFIdle
m
())
-> BlockFetchServer block (Point block) m ()
-> Peer
(BlockFetch block (Point block))
'AsServer
'NonPipelined
'BFIdle
m
()
forall a b. (a -> b) -> a -> b
$
RangeRequests m block -> BlockFetchServer block (Point block) m ()
forall (m :: * -> *) block.
Monad m =>
RangeRequests m block -> BlockFetchServer block (Point block) m ()
blockFetchServer
((ChainRange (Point block) -> Producer block m ())
-> RangeRequests m block
forall (m :: * -> *) block.
Monad m =>
(ChainRange (Point block) -> Producer block m ())
-> RangeRequests m block
constantRangeRequests ((ChainRange (Point block) -> Producer block m ())
-> RangeRequests m block)
-> (ChainRange (Point block) -> Producer block m ())
-> RangeRequests m block
forall a b. (a -> b) -> a -> b
$ \(ChainRange Point block
from Point block
to) -> do
nkChainProducer <- m (ChainProducerState block)
-> Proxy X () () block m (ChainProducerState block)
forall (m :: * -> *) a. Monad m => m a -> Proxy X () () block m a
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(MonadTrans t, Monad m) =>
m a -> t m a
Pipes.lift
(m (ChainProducerState block)
-> Proxy X () () block m (ChainProducerState block))
-> m (ChainProducerState block)
-> Proxy X () () block m (ChainProducerState block)
forall a b. (a -> b) -> a -> b
$ StrictTVar m (ChainProducerState block)
-> m (ChainProducerState block)
forall (m :: * -> *) a. MonadSTM m => StrictTVar m a -> m a
readTVarIO (NodeKernel header block s m
-> StrictTVar m (ChainProducerState block)
forall header block s (m :: * -> *).
NodeKernel header block s m
-> StrictTVar m (ChainProducerState block)
nkChainProducerState NodeKernel header block s m
nodeKernel)
Pipes.each $ fromMaybe []
$ Chain.selectBlockRange (chainState nkChainProducer)
from
to
)
)
keepAliveInitiator
:: MiniProtocolCb (ExpandedInitiatorContext NtNAddr m) ByteString m ()
keepAliveInitiator :: MiniProtocolCb (ExpandedInitiatorContext NtNAddr m) ByteString m ()
keepAliveInitiator =
(ExpandedInitiatorContext NtNAddr m
-> Channel m ByteString -> m ((), Maybe ByteString))
-> MiniProtocolCb
(ExpandedInitiatorContext NtNAddr m) ByteString m ()
forall ctx bytes (m :: * -> *) a.
(ctx -> Channel m bytes -> m (a, Maybe bytes))
-> MiniProtocolCb ctx bytes m a
MiniProtocolCb ((ExpandedInitiatorContext NtNAddr m
-> Channel m ByteString -> m ((), Maybe ByteString))
-> MiniProtocolCb
(ExpandedInitiatorContext NtNAddr m) ByteString m ())
-> (ExpandedInitiatorContext NtNAddr m
-> Channel m ByteString -> m ((), Maybe ByteString))
-> MiniProtocolCb
(ExpandedInitiatorContext NtNAddr m) ByteString m ()
forall a b. (a -> b) -> a -> b
$
\ ExpandedInitiatorContext {
eicConnectionId :: forall addr (m :: * -> *).
ExpandedInitiatorContext addr m -> ConnectionId addr
eicConnectionId = connId :: ConnectionId NtNAddr
connId@ConnectionId { NtNAddr
remoteAddress :: forall addr. ConnectionId addr -> addr
remoteAddress :: NtNAddr
remoteAddress },
eicControlMessage :: forall addr (m :: * -> *).
ExpandedInitiatorContext addr m -> ControlMessageSTM m
eicControlMessage = ControlMessageSTM m
controlMessageSTM
}
Channel m ByteString
channel
-> do String -> m ()
forall (m :: * -> *). MonadThread m => String -> m ()
labelThisThread String
"KeepAliveClient"
let kacApp :: StrictTVar m (Map NtNAddr PeerGSV) -> m ((), Maybe ByteString)
kacApp =
\StrictTVar m (Map NtNAddr PeerGSV)
ctxVar -> Tracer m (TraceSendRecv KeepAlive)
-> Codec KeepAlive DeserialiseFailure m ByteString
-> ProtocolSizeLimits KeepAlive ByteString
-> ProtocolTimeLimits KeepAlive
-> Channel m ByteString
-> Peer KeepAlive 'AsClient 'NonPipelined 'StClient m ()
-> m ((), Maybe ByteString)
forall ps (st :: ps) (pr :: PeerRole) failure bytes (m :: * -> *)
a.
(MonadAsync m, MonadFork m, MonadMask m, MonadThrow (STM m),
MonadTimer m, ShowProxy ps,
forall (st' :: ps) stok. (stok ~ StateToken st') => Show stok,
Show failure) =>
Tracer m (TraceSendRecv ps)
-> Codec ps failure m bytes
-> ProtocolSizeLimits ps bytes
-> ProtocolTimeLimits ps
-> Channel m bytes
-> Peer ps pr 'NonPipelined st m a
-> m (a, Maybe bytes)
runPeerWithLimits
(((ConnectionId NtNAddr, TraceSendRecv KeepAlive) -> String
forall a. Show a => a -> String
show ((ConnectionId NtNAddr, TraceSendRecv KeepAlive) -> String)
-> (TraceSendRecv KeepAlive
-> (ConnectionId NtNAddr, TraceSendRecv KeepAlive))
-> TraceSendRecv KeepAlive
-> String
forall b c a. (b -> c) -> (a -> b) -> a -> c
. (ConnectionId NtNAddr
connId,)) (TraceSendRecv KeepAlive -> String)
-> Tracer m String -> Tracer m (TraceSendRecv KeepAlive)
forall a' a. (a' -> a) -> Tracer m a -> Tracer m a'
forall (f :: * -> *) a' a.
Contravariant f =>
(a' -> a) -> f a -> f a'
`contramap` Tracer m String
debugTracer)
Codec KeepAlive DeserialiseFailure m ByteString
keepAliveCodec
(LimitsAndTimeouts header block
-> ProtocolSizeLimits KeepAlive ByteString
forall header block.
LimitsAndTimeouts header block
-> ProtocolSizeLimits KeepAlive ByteString
keepAliveSizeLimits LimitsAndTimeouts header block
limits)
(LimitsAndTimeouts header block -> ProtocolTimeLimits KeepAlive
forall header block.
LimitsAndTimeouts header block -> ProtocolTimeLimits KeepAlive
keepAliveTimeLimits LimitsAndTimeouts header block
limits)
Channel m ByteString
channel
(KeepAliveClient m ()
-> Peer KeepAlive 'AsClient 'NonPipelined 'StClient m ()
forall (m :: * -> *) a.
MonadThrow m =>
KeepAliveClient m a -> Client KeepAlive 'NonPipelined 'StClient m a
keepAliveClientPeer (KeepAliveClient m ()
-> Peer KeepAlive 'AsClient 'NonPipelined 'StClient m ())
-> KeepAliveClient m ()
-> Peer KeepAlive 'AsClient 'NonPipelined 'StClient m ()
forall a b. (a -> b) -> a -> b
$
Tracer m (TraceKeepAliveClient NtNAddr)
-> StdGen
-> ControlMessageSTM m
-> NtNAddr
-> StrictTVar m (Map NtNAddr PeerGSV)
-> KeepAliveInterval
-> KeepAliveClient m ()
forall (m :: * -> *) peer.
(MonadTimer m, Ord peer) =>
Tracer m (TraceKeepAliveClient peer)
-> StdGen
-> ControlMessageSTM m
-> peer
-> StrictTVar m (Map peer PeerGSV)
-> KeepAliveInterval
-> KeepAliveClient m ()
keepAliveClient
Tracer m (TraceKeepAliveClient NtNAddr)
forall (m :: * -> *) a. Applicative m => Tracer m a
nullTracer
StdGen
aaKeepAliveStdGen
ControlMessageSTM m
controlMessageSTM
NtNAddr
remoteAddress
StrictTVar m (Map NtNAddr PeerGSV)
ctxVar
(DiffTime -> KeepAliveInterval
KeepAliveInterval DiffTime
aaKeepAliveInterval))
FetchClientRegistry NtNAddr header block m
-> NtNAddr
-> (StrictTVar m (Map NtNAddr PeerGSV) -> m ((), Maybe ByteString))
-> m ((), Maybe ByteString)
forall (m :: * -> *) a peer header block.
(MonadSTM m, MonadFork m, MonadMask m, Ord peer) =>
FetchClientRegistry peer header block m
-> peer -> (StrictTVar m (Map peer PeerGSV) -> m a) -> m a
bracketKeepAliveClient (NodeKernel header block s m
-> FetchClientRegistry NtNAddr header block m
forall header block s (m :: * -> *).
NodeKernel header block s m
-> FetchClientRegistry NtNAddr header block m
nkFetchClientRegistry NodeKernel header block s m
nodeKernel)
NtNAddr
remoteAddress
StrictTVar m (Map NtNAddr PeerGSV) -> m ((), Maybe ByteString)
kacApp
keepAliveResponder
:: MiniProtocolCb (ResponderContext NtNAddr) ByteString m ()
keepAliveResponder :: MiniProtocolCb (ResponderContext NtNAddr) ByteString m ()
keepAliveResponder = (ResponderContext NtNAddr
-> Channel m ByteString -> m ((), Maybe ByteString))
-> MiniProtocolCb (ResponderContext NtNAddr) ByteString m ()
forall ctx bytes (m :: * -> *) a.
(ctx -> Channel m bytes -> m (a, Maybe bytes))
-> MiniProtocolCb ctx bytes m a
MiniProtocolCb ((ResponderContext NtNAddr
-> Channel m ByteString -> m ((), Maybe ByteString))
-> MiniProtocolCb (ResponderContext NtNAddr) ByteString m ())
-> (ResponderContext NtNAddr
-> Channel m ByteString -> m ((), Maybe ByteString))
-> MiniProtocolCb (ResponderContext NtNAddr) ByteString m ()
forall a b. (a -> b) -> a -> b
$ \ResponderContext NtNAddr
_ctx Channel m ByteString
channel -> do
String -> m ()
forall (m :: * -> *). MonadThread m => String -> m ()
labelThisThread String
"KeepAliveServer"
Tracer m (TraceSendRecv KeepAlive)
-> Codec KeepAlive DeserialiseFailure m ByteString
-> ProtocolSizeLimits KeepAlive ByteString
-> ProtocolTimeLimits KeepAlive
-> Channel m ByteString
-> Peer KeepAlive 'AsServer 'NonPipelined 'StClient m ()
-> m ((), Maybe ByteString)
forall ps (st :: ps) (pr :: PeerRole) failure bytes (m :: * -> *)
a.
(MonadAsync m, MonadFork m, MonadMask m, MonadThrow (STM m),
MonadTimer m, ShowProxy ps,
forall (st' :: ps) stok. (stok ~ StateToken st') => Show stok,
Show failure) =>
Tracer m (TraceSendRecv ps)
-> Codec ps failure m bytes
-> ProtocolSizeLimits ps bytes
-> ProtocolTimeLimits ps
-> Channel m bytes
-> Peer ps pr 'NonPipelined st m a
-> m (a, Maybe bytes)
runPeerWithLimits
Tracer m (TraceSendRecv KeepAlive)
forall (m :: * -> *) a. Applicative m => Tracer m a
nullTracer
Codec KeepAlive DeserialiseFailure m ByteString
keepAliveCodec
(LimitsAndTimeouts header block
-> ProtocolSizeLimits KeepAlive ByteString
forall header block.
LimitsAndTimeouts header block
-> ProtocolSizeLimits KeepAlive ByteString
keepAliveSizeLimits LimitsAndTimeouts header block
limits)
(LimitsAndTimeouts header block -> ProtocolTimeLimits KeepAlive
forall header block.
LimitsAndTimeouts header block -> ProtocolTimeLimits KeepAlive
keepAliveTimeLimits LimitsAndTimeouts header block
limits)
Channel m ByteString
channel
(KeepAliveServer m ()
-> Peer KeepAlive 'AsServer 'NonPipelined 'StClient m ()
forall (m :: * -> *) a.
Functor m =>
KeepAliveServer m a -> Server KeepAlive 'NonPipelined 'StClient m a
keepAliveServerPeer KeepAliveServer m ()
forall (m :: * -> *). Applicative m => KeepAliveServer m ()
keepAliveServer)
pingPongInitiator
:: MiniProtocolCb (ExpandedInitiatorContext NtNAddr m) ByteString m ()
pingPongInitiator :: MiniProtocolCb (ExpandedInitiatorContext NtNAddr m) ByteString m ()
pingPongInitiator =
(ExpandedInitiatorContext NtNAddr m
-> Channel m ByteString -> m ((), Maybe ByteString))
-> MiniProtocolCb
(ExpandedInitiatorContext NtNAddr m) ByteString m ()
forall ctx bytes (m :: * -> *) a.
(ctx -> Channel m bytes -> m (a, Maybe bytes))
-> MiniProtocolCb ctx bytes m a
MiniProtocolCb ((ExpandedInitiatorContext NtNAddr m
-> Channel m ByteString -> m ((), Maybe ByteString))
-> MiniProtocolCb
(ExpandedInitiatorContext NtNAddr m) ByteString m ())
-> (ExpandedInitiatorContext NtNAddr m
-> Channel m ByteString -> m ((), Maybe ByteString))
-> MiniProtocolCb
(ExpandedInitiatorContext NtNAddr m) ByteString m ()
forall a b. (a -> b) -> a -> b
$
\ ExpandedInitiatorContext {
eicConnectionId :: forall addr (m :: * -> *).
ExpandedInitiatorContext addr m -> ConnectionId addr
eicConnectionId = ConnectionId NtNAddr
connId,
eicControlMessage :: forall addr (m :: * -> *).
ExpandedInitiatorContext addr m -> ControlMessageSTM m
eicControlMessage = ControlMessageSTM m
controlMessageSTM
}
Channel m ByteString
channel
-> let continueSTM :: STM m Bool
continueSTM :: STM m Bool
continueSTM = do
ctrl <- ControlMessageSTM m
controlMessageSTM
case ctrl of
ControlMessage
Continue -> Bool -> STM m Bool
forall a. a -> STM m a
forall (m :: * -> *) a. Monad m => a -> m a
return Bool
True
ControlMessage
Quiesce -> STM m Bool
forall a. STM m a
forall (m :: * -> *) a. MonadSTM m => STM m a
retry
ControlMessage
Terminate -> Bool -> STM m Bool
forall a. a -> STM m a
forall (m :: * -> *) a. Monad m => a -> m a
return Bool
False
pingPongClient :: PingPongClient m ()
pingPongClient :: PingPongClient m ()
pingPongClient = m (PingPongClient m ()) -> PingPongClient m ()
forall (m :: * -> *) a.
m (PingPongClient m a) -> PingPongClient m a
SendMsgPing (m (PingPongClient m ()) -> PingPongClient m ())
-> m (PingPongClient m ()) -> PingPongClient m ()
forall a b. (a -> b) -> a -> b
$ do
v <- DiffTime -> m (TVar m Bool)
forall (m :: * -> *). MonadTimer m => DiffTime -> m (TVar m Bool)
registerDelay DiffTime
aaPingPongInterval
continue <- atomically $ runFirstToFinish $
( FirstToFinish $ do
LazySTM.readTVar v >>= check
continueSTM )
<> ( FirstToFinish $ do
continueSTM >>= \Bool
b -> Bool -> STM m ()
forall (m :: * -> *). MonadSTM m => Bool -> STM m ()
check (Bool -> Bool
not Bool
b) STM m () -> Bool -> STM m Bool
forall (f :: * -> *) a b. Functor f => f a -> b -> f b
$> Bool
b )
if continue
then return pingPongClient
else return $ PingPong.SendMsgDone ()
in Tracer m (TraceSendRecv PingPong)
-> Codec PingPong DeserialiseFailure m ByteString
-> ProtocolSizeLimits PingPong ByteString
-> ProtocolTimeLimits PingPong
-> Channel m ByteString
-> Peer PingPong 'AsClient 'NonPipelined 'StIdle m ()
-> m ((), Maybe ByteString)
forall ps (st :: ps) (pr :: PeerRole) failure bytes (m :: * -> *)
a.
(MonadAsync m, MonadFork m, MonadMask m, MonadThrow (STM m),
MonadTimer m, ShowProxy ps,
forall (st' :: ps) stok. (stok ~ StateToken st') => Show stok,
Show failure) =>
Tracer m (TraceSendRecv ps)
-> Codec ps failure m bytes
-> ProtocolSizeLimits ps bytes
-> ProtocolTimeLimits ps
-> Channel m bytes
-> Peer ps pr 'NonPipelined st m a
-> m (a, Maybe bytes)
runPeerWithLimits
(((ConnectionId NtNAddr, TraceSendRecv PingPong) -> String
forall a. Show a => a -> String
show ((ConnectionId NtNAddr, TraceSendRecv PingPong) -> String)
-> (TraceSendRecv PingPong
-> (ConnectionId NtNAddr, TraceSendRecv PingPong))
-> TraceSendRecv PingPong
-> String
forall b c a. (b -> c) -> (a -> b) -> a -> c
. (ConnectionId NtNAddr
connId,)) (TraceSendRecv PingPong -> String)
-> Tracer m String -> Tracer m (TraceSendRecv PingPong)
forall a' a. (a' -> a) -> Tracer m a -> Tracer m a'
forall (f :: * -> *) a' a.
Contravariant f =>
(a' -> a) -> f a -> f a'
`contramap` Tracer m String
debugTracer)
Codec PingPong DeserialiseFailure m ByteString
pingPongCodec
(LimitsAndTimeouts header block
-> ProtocolSizeLimits PingPong ByteString
forall header block.
LimitsAndTimeouts header block
-> ProtocolSizeLimits PingPong ByteString
pingPongSizeLimits LimitsAndTimeouts header block
limits)
(LimitsAndTimeouts header block -> ProtocolTimeLimits PingPong
forall header block.
LimitsAndTimeouts header block -> ProtocolTimeLimits PingPong
pingPongTimeLimits LimitsAndTimeouts header block
limits)
Channel m ByteString
channel
(PingPongClient m ()
-> Peer PingPong 'AsClient 'NonPipelined 'StIdle m ()
forall (m :: * -> *) a.
Functor m =>
PingPongClient m a -> Client PingPong 'NonPipelined 'StIdle m a
pingPongClientPeer PingPongClient m ()
pingPongClient)
pingPongResponder
:: MiniProtocolCb (ResponderContext NtNAddr) ByteString m ()
pingPongResponder :: MiniProtocolCb (ResponderContext NtNAddr) ByteString m ()
pingPongResponder = (ResponderContext NtNAddr
-> Channel m ByteString -> m ((), Maybe ByteString))
-> MiniProtocolCb (ResponderContext NtNAddr) ByteString m ()
forall ctx bytes (m :: * -> *) a.
(ctx -> Channel m bytes -> m (a, Maybe bytes))
-> MiniProtocolCb ctx bytes m a
MiniProtocolCb ((ResponderContext NtNAddr
-> Channel m ByteString -> m ((), Maybe ByteString))
-> MiniProtocolCb (ResponderContext NtNAddr) ByteString m ())
-> (ResponderContext NtNAddr
-> Channel m ByteString -> m ((), Maybe ByteString))
-> MiniProtocolCb (ResponderContext NtNAddr) ByteString m ()
forall a b. (a -> b) -> a -> b
$
\ResponderContext { rcConnectionId :: forall addr. ResponderContext addr -> ConnectionId addr
rcConnectionId = ConnectionId NtNAddr
connId } Channel m ByteString
channel ->
Tracer m (TraceSendRecv PingPong)
-> Codec PingPong DeserialiseFailure m ByteString
-> ProtocolSizeLimits PingPong ByteString
-> ProtocolTimeLimits PingPong
-> Channel m ByteString
-> Peer PingPong 'AsServer 'NonPipelined 'StIdle m ()
-> m ((), Maybe ByteString)
forall ps (st :: ps) (pr :: PeerRole) failure bytes (m :: * -> *)
a.
(MonadAsync m, MonadFork m, MonadMask m, MonadThrow (STM m),
MonadTimer m, ShowProxy ps,
forall (st' :: ps) stok. (stok ~ StateToken st') => Show stok,
Show failure) =>
Tracer m (TraceSendRecv ps)
-> Codec ps failure m bytes
-> ProtocolSizeLimits ps bytes
-> ProtocolTimeLimits ps
-> Channel m bytes
-> Peer ps pr 'NonPipelined st m a
-> m (a, Maybe bytes)
runPeerWithLimits
(((ConnectionId NtNAddr, TraceSendRecv PingPong) -> String
forall a. Show a => a -> String
show ((ConnectionId NtNAddr, TraceSendRecv PingPong) -> String)
-> (TraceSendRecv PingPong
-> (ConnectionId NtNAddr, TraceSendRecv PingPong))
-> TraceSendRecv PingPong
-> String
forall b c a. (b -> c) -> (a -> b) -> a -> c
. (ConnectionId NtNAddr
connId,)) (TraceSendRecv PingPong -> String)
-> Tracer m String -> Tracer m (TraceSendRecv PingPong)
forall a' a. (a' -> a) -> Tracer m a -> Tracer m a'
forall (f :: * -> *) a' a.
Contravariant f =>
(a' -> a) -> f a -> f a'
`contramap` Tracer m String
debugTracer)
Codec PingPong DeserialiseFailure m ByteString
pingPongCodec
(LimitsAndTimeouts header block
-> ProtocolSizeLimits PingPong ByteString
forall header block.
LimitsAndTimeouts header block
-> ProtocolSizeLimits PingPong ByteString
pingPongSizeLimits LimitsAndTimeouts header block
limits)
(LimitsAndTimeouts header block -> ProtocolTimeLimits PingPong
forall header block.
LimitsAndTimeouts header block -> ProtocolTimeLimits PingPong
pingPongTimeLimits LimitsAndTimeouts header block
limits)
Channel m ByteString
channel
(PingPongServer m ()
-> Peer PingPong 'AsServer 'NonPipelined 'StIdle m ()
forall (m :: * -> *) a.
Monad m =>
PingPongServer m a -> Server PingPong 'NonPipelined 'StIdle m a
pingPongServerPeer PingPongServer m ()
forall (m :: * -> *). Applicative m => PingPongServer m ()
pingPongServerStandard)
peerSharingInitiator
:: MiniProtocolCb (ExpandedInitiatorContext NtNAddr m) ByteString m ()
peerSharingInitiator :: MiniProtocolCb (ExpandedInitiatorContext NtNAddr m) ByteString m ()
peerSharingInitiator =
(ExpandedInitiatorContext NtNAddr m
-> Channel m ByteString -> m ((), Maybe ByteString))
-> MiniProtocolCb
(ExpandedInitiatorContext NtNAddr m) ByteString m ()
forall ctx bytes (m :: * -> *) a.
(ctx -> Channel m bytes -> m (a, Maybe bytes))
-> MiniProtocolCb ctx bytes m a
MiniProtocolCb ((ExpandedInitiatorContext NtNAddr m
-> Channel m ByteString -> m ((), Maybe ByteString))
-> MiniProtocolCb
(ExpandedInitiatorContext NtNAddr m) ByteString m ())
-> (ExpandedInitiatorContext NtNAddr m
-> Channel m ByteString -> m ((), Maybe ByteString))
-> MiniProtocolCb
(ExpandedInitiatorContext NtNAddr m) ByteString m ()
forall a b. (a -> b) -> a -> b
$
\ ExpandedInitiatorContext {
eicConnectionId :: forall addr (m :: * -> *).
ExpandedInitiatorContext addr m -> ConnectionId addr
eicConnectionId = connId :: ConnectionId NtNAddr
connId@ConnectionId { remoteAddress :: forall addr. ConnectionId addr -> addr
remoteAddress = NtNAddr
them },
eicControlMessage :: forall addr (m :: * -> *).
ExpandedInitiatorContext addr m -> ControlMessageSTM m
eicControlMessage = ControlMessageSTM m
controlMessageSTM
}
Channel m ByteString
channel
-> do String -> m ()
forall (m :: * -> *). MonadThread m => String -> m ()
labelThisThread String
"PeerSharingClient"
PeerSharingRegistry NtNAddr m
-> NtNAddr
-> (PeerSharingController NtNAddr m -> m ((), Maybe ByteString))
-> m ((), Maybe ByteString)
forall peer (m :: * -> *) a.
(Ord peer, MonadSTM m, MonadThrow m) =>
PeerSharingRegistry peer m
-> peer -> (PeerSharingController peer m -> m a) -> m a
bracketPeerSharingClient (NodeKernel header block s m -> PeerSharingRegistry NtNAddr m
forall header block s (m :: * -> *).
NodeKernel header block s m -> PeerSharingRegistry NtNAddr m
nkPeerSharingRegistry NodeKernel header block s m
nodeKernel) NtNAddr
them
((PeerSharingController NtNAddr m -> m ((), Maybe ByteString))
-> m ((), Maybe ByteString))
-> (PeerSharingController NtNAddr m -> m ((), Maybe ByteString))
-> m ((), Maybe ByteString)
forall a b. (a -> b) -> a -> b
$ \PeerSharingController NtNAddr m
controller -> do
psClient <- ControlMessageSTM m
-> PeerSharingController NtNAddr m
-> m (PeerSharingClient NtNAddr m ())
forall (m :: * -> *) peer.
(Alternative (STM m), MonadMVar m, MonadSTM m, MonadThrow m) =>
ControlMessageSTM m
-> PeerSharingController peer m -> m (PeerSharingClient peer m ())
peerSharingClient ControlMessageSTM m
controlMessageSTM PeerSharingController NtNAddr m
controller
runPeerWithLimits
((show . (connId,)) `contramap` debugTracer)
peerSharingCodec
(peerSharingSizeLimits limits)
(peerSharingTimeLimits limits)
channel
(peerSharingClientPeer psClient)
peerSharingResponder
:: PeerSharingAPI NtNAddr s m
-> MiniProtocolCb (ResponderContext NtNAddr) ByteString m ()
peerSharingResponder :: PeerSharingAPI NtNAddr s m
-> MiniProtocolCb (ResponderContext NtNAddr) ByteString m ()
peerSharingResponder PeerSharingAPI NtNAddr s m
psAPI = (ResponderContext NtNAddr
-> Channel m ByteString -> m ((), Maybe ByteString))
-> MiniProtocolCb (ResponderContext NtNAddr) ByteString m ()
forall ctx bytes (m :: * -> *) a.
(ctx -> Channel m bytes -> m (a, Maybe bytes))
-> MiniProtocolCb ctx bytes m a
MiniProtocolCb ((ResponderContext NtNAddr
-> Channel m ByteString -> m ((), Maybe ByteString))
-> MiniProtocolCb (ResponderContext NtNAddr) ByteString m ())
-> (ResponderContext NtNAddr
-> Channel m ByteString -> m ((), Maybe ByteString))
-> MiniProtocolCb (ResponderContext NtNAddr) ByteString m ()
forall a b. (a -> b) -> a -> b
$ \ResponderContext { rcConnectionId :: forall addr. ResponderContext addr -> ConnectionId addr
rcConnectionId = ConnectionId NtNAddr
connId } Channel m ByteString
channel -> do
String -> m ()
forall (m :: * -> *). MonadThread m => String -> m ()
labelThisThread String
"PeerSharingServer"
Tracer m (TraceSendRecv (PeerSharing NtNAddr))
-> Codec (PeerSharing NtNAddr) DeserialiseFailure m ByteString
-> ProtocolSizeLimits (PeerSharing NtNAddr) ByteString
-> ProtocolTimeLimits (PeerSharing NtNAddr)
-> Channel m ByteString
-> Peer (PeerSharing NtNAddr) 'AsServer 'NonPipelined 'StIdle m ()
-> m ((), Maybe ByteString)
forall ps (st :: ps) (pr :: PeerRole) failure bytes (m :: * -> *)
a.
(MonadAsync m, MonadFork m, MonadMask m, MonadThrow (STM m),
MonadTimer m, ShowProxy ps,
forall (st' :: ps) stok. (stok ~ StateToken st') => Show stok,
Show failure) =>
Tracer m (TraceSendRecv ps)
-> Codec ps failure m bytes
-> ProtocolSizeLimits ps bytes
-> ProtocolTimeLimits ps
-> Channel m bytes
-> Peer ps pr 'NonPipelined st m a
-> m (a, Maybe bytes)
runPeerWithLimits
(((ConnectionId NtNAddr, TraceSendRecv (PeerSharing NtNAddr))
-> String
forall a. Show a => a -> String
show ((ConnectionId NtNAddr, TraceSendRecv (PeerSharing NtNAddr))
-> String)
-> (TraceSendRecv (PeerSharing NtNAddr)
-> (ConnectionId NtNAddr, TraceSendRecv (PeerSharing NtNAddr)))
-> TraceSendRecv (PeerSharing NtNAddr)
-> String
forall b c a. (b -> c) -> (a -> b) -> a -> c
. (ConnectionId NtNAddr
connId,)) (TraceSendRecv (PeerSharing NtNAddr) -> String)
-> Tracer m String
-> Tracer m (TraceSendRecv (PeerSharing NtNAddr))
forall a' a. (a' -> a) -> Tracer m a -> Tracer m a'
forall (f :: * -> *) a' a.
Contravariant f =>
(a' -> a) -> f a -> f a'
`contramap` Tracer m String
debugTracer)
Codec (PeerSharing NtNAddr) DeserialiseFailure m ByteString
peerSharingCodec
(LimitsAndTimeouts header block
-> ProtocolSizeLimits (PeerSharing NtNAddr) ByteString
forall header block.
LimitsAndTimeouts header block
-> ProtocolSizeLimits (PeerSharing NtNAddr) ByteString
peerSharingSizeLimits LimitsAndTimeouts header block
limits)
(LimitsAndTimeouts header block
-> ProtocolTimeLimits (PeerSharing NtNAddr)
forall header block.
LimitsAndTimeouts header block
-> ProtocolTimeLimits (PeerSharing NtNAddr)
peerSharingTimeLimits LimitsAndTimeouts header block
limits)
Channel m ByteString
channel
(Peer (PeerSharing NtNAddr) 'AsServer 'NonPipelined 'StIdle m ()
-> m ((), Maybe ByteString))
-> Peer (PeerSharing NtNAddr) 'AsServer 'NonPipelined 'StIdle m ()
-> m ((), Maybe ByteString)
forall a b. (a -> b) -> a -> b
$ PeerSharingServer NtNAddr m
-> Peer (PeerSharing NtNAddr) 'AsServer 'NonPipelined 'StIdle m ()
forall (m :: * -> *) peerAddress.
Monad m =>
PeerSharingServer peerAddress m
-> Server (PeerSharing peerAddress) 'NonPipelined 'StIdle m ()
peerSharingServerPeer
(PeerSharingServer NtNAddr m
-> Peer (PeerSharing NtNAddr) 'AsServer 'NonPipelined 'StIdle m ())
-> PeerSharingServer NtNAddr m
-> Peer (PeerSharing NtNAddr) 'AsServer 'NonPipelined 'StIdle m ()
forall a b. (a -> b) -> a -> b
$ PeerSharingAPI NtNAddr s m -> PeerSharingServer NtNAddr m
forall (m :: * -> *) peer s.
(MonadSTM m, MonadMonotonicTime m, Hashable peer, RandomGen s) =>
PeerSharingAPI peer s m -> PeerSharingServer peer m
peerSharingServer PeerSharingAPI NtNAddr s m
psAPI
instance ShowProxy PingPong where
showProxy :: Proxy PingPong -> String
showProxy Proxy PingPong
Proxy = String
"PingPong"