{-# LANGUAGE DataKinds #-}
{-# LANGUAGE FlexibleContexts #-}
{-# LANGUAGE NamedFieldPuns #-}
{-# LANGUAGE OverloadedStrings #-}
{-# LANGUAGE RankNTypes #-}
{-# LANGUAGE ScopedTypeVariables #-}
module DMQ.NodeToNode
( RemoteAddress
, module DMQ.NodeToNode.Version
, ClientApp
, ServerApp
, Apps (..)
, ntnApps
, Protocols (..)
, nodeToNodeProtocols
, initiatorProtocols
, initiatorAndResponderProtocols
, dmqCodecs
, LimitsAndTimeouts
, dmqLimitsAndTimeouts
, HandshakeTr
, ntnHandshakeArguments
, stdVersionDataNTN
) where
import Control.Applicative (Alternative)
import Control.Concurrent.Class.MonadMVar.Strict
import Control.Concurrent.Class.MonadSTM.Strict
import Control.Monad.Class.MonadAsync
import Control.Monad.Class.MonadFork
import Control.Monad.Class.MonadST
import Control.Monad.Class.MonadThrow
import Control.Monad.Class.MonadTimer.SI
import Control.Tracer (Tracer, nullTracer)
import Codec.CBOR.Decoding qualified as CBOR
import Codec.CBOR.Encoding qualified as CBOR
import Codec.CBOR.Read qualified as CBOR
import Codec.CBOR.Term qualified as CBOR
import Data.Aeson qualified as Aeson
import Data.ByteString.Lazy qualified as BL
import Data.Functor.Contravariant ((>$<))
import Data.Hashable (Hashable)
import Data.Typeable
import Data.Void (Void)
import System.Random (mkStdGen)
import Network.Mux.Trace qualified as Mx
import Network.Mux.Types (Mode (..))
import Network.Mux.Types qualified as Mx
import Network.TypedProtocol.Codec (AnnotatedCodec, Codec)
import Cardano.KESAgent.KES.Crypto (Crypto (..))
import DMQ.Configuration (Configuration, Configuration' (..), I (..))
import DMQ.Diffusion.NodeKernel (NodeKernel (..))
import DMQ.NodeToNode.Version
import DMQ.Protocol.SigSubmission.Codec
import DMQ.Protocol.SigSubmission.Type
import DMQ.Tracer
import Ouroboros.Network.BlockFetch.ClientRegistry (bracketKeepAliveClient)
import Ouroboros.Network.Channel (Channel)
import Ouroboros.Network.ConnectionId (ConnectionId (..))
import Ouroboros.Network.Context (ExpandedInitiatorContext (..),
ResponderContext (..))
import Ouroboros.Network.DiffusionMode
import Ouroboros.Network.Driver.Limits (runAnnotatedPeerWithLimits,
runPeerWithLimits, runPipelinedAnnotatedPeerWithLimits)
import Ouroboros.Network.Driver.Simple (TraceSendRecv)
import Ouroboros.Network.Handshake.Acceptable (Acceptable (..))
import Ouroboros.Network.Handshake.Queryable (Queryable (..))
import Ouroboros.Network.KeepAlive (KeepAliveInterval (..), keepAliveClient,
keepAliveServer)
import Ouroboros.Network.Magic (NetworkMagic (..))
import Ouroboros.Network.Mux (MiniProtocol (..), MiniProtocolCb (..),
MiniProtocolLimits (..), OuroborosBundle,
OuroborosBundleWithExpandedCtx, RunMiniProtocol (..),
StartOnDemandOrEagerly (..), TemperatureBundle (..),
WithProtocolTemperature (..))
import Ouroboros.Network.PeerSelection (PeerSharing (..))
import Ouroboros.Network.PeerSharing (bracketPeerSharingClient,
peerSharingClient, peerSharingServer)
import Ouroboros.Network.Snocket (RemoteAddress)
import Ouroboros.Network.TxSubmission.Inbound.V2 as SigSubmission
import Ouroboros.Network.TxSubmission.Mempool.Simple qualified as Mempool
import Ouroboros.Network.TxSubmission.Outbound
import Ouroboros.Network.OrphanInstances ()
import Ouroboros.Network.Protocol.Handshake (Handshake, HandshakeArguments (..))
import Ouroboros.Network.Protocol.Handshake.Codec (cborTermVersionDataCodec,
codecHandshake, timeLimitsHandshake)
import Ouroboros.Network.Protocol.KeepAlive.Client (keepAliveClientPeer)
import Ouroboros.Network.Protocol.KeepAlive.Codec (byteLimitsKeepAlive,
codecKeepAlive_v2, timeLimitsKeepAlive)
import Ouroboros.Network.Protocol.KeepAlive.Server (keepAliveServerPeer)
import Ouroboros.Network.Protocol.KeepAlive.Type (KeepAlive)
import Ouroboros.Network.Protocol.Limits (ProtocolSizeLimits,
ProtocolTimeLimits)
import Ouroboros.Network.Protocol.PeerSharing.Client (peerSharingClientPeer)
import Ouroboros.Network.Protocol.PeerSharing.Codec (byteLimitsPeerSharing,
codecPeerSharing, timeLimitsPeerSharing)
import Ouroboros.Network.Protocol.PeerSharing.Server (peerSharingServerPeer)
import Ouroboros.Network.Protocol.PeerSharing.Type qualified as Protocol
import Ouroboros.Network.Protocol.TxSubmission2.Client (txSubmissionClientPeer)
import Ouroboros.Network.Protocol.TxSubmission2.Server
(txSubmissionServerPeerPipelined)
type ClientApp addr m a =
NodeToNodeVersion
-> ExpandedInitiatorContext addr m
-> Channel m BL.ByteString
-> m (a, Maybe BL.ByteString)
type ServerApp addr m a =
NodeToNodeVersion
-> ResponderContext addr
-> Channel m BL.ByteString
-> m (a, Maybe BL.ByteString)
data Apps addr m a b =
Apps {
forall addr (m :: * -> *) a b.
Apps addr m a b -> ClientApp addr m a
aSigSubmissionClient :: ClientApp addr m a
, forall addr (m :: * -> *) a b.
Apps addr m a b -> ServerApp addr m b
aSigSubmissionServer :: ServerApp addr m b
, forall addr (m :: * -> *) a b.
Apps addr m a b -> ClientApp addr m a
aKeepAliveClient :: ClientApp addr m a
, forall addr (m :: * -> *) a b.
Apps addr m a b -> ServerApp addr m b
aKeepAliveServer :: ServerApp addr m b
, forall addr (m :: * -> *) a b.
Apps addr m a b -> ClientApp addr m a
aPeerSharingClient :: ClientApp addr m a
, forall addr (m :: * -> *) a b.
Apps addr m a b -> ServerApp addr m b
aPeerSharingServer :: ServerApp addr m b
}
ntnApps
:: forall crypto m addr .
( Crypto crypto
, Typeable crypto
, Alternative (STM m)
, MonadAsync m
, MonadDelay m
, MonadFork m
, MonadMask m
, MonadMVar m
, MonadThrow (STM m)
, MonadTimer m
, Ord addr
, Show addr
, Hashable addr
, Aeson.ToJSON addr
)
=> (forall ev. Aeson.ToJSON ev => Tracer m (WithEventType ev))
-> Configuration
-> NodeKernel crypto addr m
-> Codecs crypto addr m
-> LimitsAndTimeouts crypto addr
-> TxDecisionPolicy
-> Apps addr m () ()
ntnApps :: forall crypto (m :: * -> *) addr.
(Crypto crypto, Typeable crypto, Alternative (STM m), MonadAsync m,
MonadDelay m, MonadFork m, MonadMask m, MonadMVar m,
MonadThrow (STM m), MonadTimer m, Ord addr, Show addr,
Hashable addr, ToJSON addr) =>
(forall ev. ToJSON ev => Tracer m (WithEventType ev))
-> Configuration
-> NodeKernel crypto addr m
-> Codecs crypto addr m
-> LimitsAndTimeouts crypto addr
-> TxDecisionPolicy
-> Apps addr m () ()
ntnApps
forall ev. ToJSON ev => Tracer m (WithEventType ev)
tracer
Configuration {
dmqcSigSubmissionClientProtocolTracer :: forall (f :: * -> *). Configuration' f -> f Bool
dmqcSigSubmissionClientProtocolTracer = I Bool
sigSubmissionClientProtocolTracer
, dmqcSigSubmissionServerProtocolTracer :: forall (f :: * -> *). Configuration' f -> f Bool
dmqcSigSubmissionServerProtocolTracer = I Bool
sigSubmissionServerProtocolTracer
, dmqcKeepAliveClientProtocolTracer :: forall (f :: * -> *). Configuration' f -> f Bool
dmqcKeepAliveClientProtocolTracer = I Bool
keepAliveClientProtocolTracer
, dmqcKeepAliveServerProtocolTracer :: forall (f :: * -> *). Configuration' f -> f Bool
dmqcKeepAliveServerProtocolTracer = I Bool
keepAliveServerProtocolTracer
, dmqcPeerSharingClientProtocolTracer :: forall (f :: * -> *). Configuration' f -> f Bool
dmqcPeerSharingClientProtocolTracer = I Bool
peerSharingClientProtocolTracer
, dmqcPeerSharingServerProtocolTracer :: forall (f :: * -> *). Configuration' f -> f Bool
dmqcPeerSharingServerProtocolTracer = I Bool
peerSharingServerProtocolTracer
, dmqcSigSubmissionOutboundTracer :: forall (f :: * -> *). Configuration' f -> f Bool
dmqcSigSubmissionOutboundTracer = I Bool
sigSubmissionOutboundTracer
, dmqcSigSubmissionInboundTracer :: forall (f :: * -> *). Configuration' f -> f Bool
dmqcSigSubmissionInboundTracer = I Bool
sigSubmissionInboundTracer
, dmqcSigSubmissionLogicTracer :: forall (f :: * -> *). Configuration' f -> f Bool
dmqcSigSubmissionLogicTracer = I Bool
sigSubmissionLogicTracer
}
NodeKernel {
FetchClientRegistry (ConnectionId addr) () () m
fetchClientRegistry :: FetchClientRegistry (ConnectionId addr) () () m
fetchClientRegistry :: forall crypto ntnAddr (m :: * -> *).
NodeKernel crypto ntnAddr m
-> FetchClientRegistry (ConnectionId ntnAddr) () () m
fetchClientRegistry
, PeerSharingRegistry addr m
peerSharingRegistry :: PeerSharingRegistry addr m
peerSharingRegistry :: forall crypto ntnAddr (m :: * -> *).
NodeKernel crypto ntnAddr m -> PeerSharingRegistry ntnAddr m
peerSharingRegistry
, PeerSharingAPI addr StdGen m
peerSharingAPI :: PeerSharingAPI addr StdGen m
peerSharingAPI :: forall crypto ntnAddr (m :: * -> *).
NodeKernel crypto ntnAddr m -> PeerSharingAPI ntnAddr StdGen m
peerSharingAPI
, Mempool m (Sig crypto)
mempool :: Mempool m (Sig crypto)
mempool :: forall crypto ntnAddr (m :: * -> *).
NodeKernel crypto ntnAddr m -> Mempool m (Sig crypto)
mempool
, TxChannelsVar m addr SigId (Sig crypto)
sigChannelVar :: TxChannelsVar m addr SigId (Sig crypto)
sigChannelVar :: forall crypto ntnAddr (m :: * -> *).
NodeKernel crypto ntnAddr m
-> TxChannelsVar m ntnAddr SigId (Sig crypto)
sigChannelVar
, TxMempoolSem m
sigMempoolSem :: TxMempoolSem m
sigMempoolSem :: forall crypto ntnAddr (m :: * -> *).
NodeKernel crypto ntnAddr m -> TxMempoolSem m
sigMempoolSem
, SharedTxStateVar m addr SigId (Sig crypto)
sigSharedTxStateVar :: SharedTxStateVar m addr SigId (Sig crypto)
sigSharedTxStateVar :: forall crypto ntnAddr (m :: * -> *).
NodeKernel crypto ntnAddr m
-> SharedTxStateVar m ntnAddr SigId (Sig crypto)
sigSharedTxStateVar
}
Codecs {
AnnotatedCodec
(SigSubmission crypto) DeserialiseFailure m ByteString
sigSubmissionCodec :: AnnotatedCodec
(SigSubmission crypto) DeserialiseFailure m ByteString
sigSubmissionCodec :: forall crypto addr (m :: * -> *).
Codecs crypto addr m
-> AnnotatedCodec
(SigSubmission crypto) DeserialiseFailure m ByteString
sigSubmissionCodec
, Codec KeepAlive DeserialiseFailure m ByteString
keepAliveCodec :: Codec KeepAlive DeserialiseFailure m ByteString
keepAliveCodec :: forall crypto addr (m :: * -> *).
Codecs crypto addr m
-> Codec KeepAlive DeserialiseFailure m ByteString
keepAliveCodec
, Codec (PeerSharing addr) DeserialiseFailure m ByteString
peerSharingCodec :: Codec (PeerSharing addr) DeserialiseFailure m ByteString
peerSharingCodec :: forall crypto addr (m :: * -> *).
Codecs crypto addr m
-> Codec (PeerSharing addr) DeserialiseFailure m ByteString
peerSharingCodec
}
LimitsAndTimeouts {
ProtocolSizeLimits (SigSubmission crypto) ByteString
sigSubmissionSizeLimits :: ProtocolSizeLimits (SigSubmission crypto) ByteString
sigSubmissionSizeLimits :: forall crypto addr.
LimitsAndTimeouts crypto addr
-> ProtocolSizeLimits (SigSubmission crypto) ByteString
sigSubmissionSizeLimits
, ProtocolTimeLimits (SigSubmission crypto)
sigSubmissionTimeLimits :: ProtocolTimeLimits (SigSubmission crypto)
sigSubmissionTimeLimits :: forall crypto addr.
LimitsAndTimeouts crypto addr
-> ProtocolTimeLimits (SigSubmission crypto)
sigSubmissionTimeLimits
, ProtocolSizeLimits KeepAlive ByteString
keepAliveSizeLimits :: ProtocolSizeLimits KeepAlive ByteString
keepAliveSizeLimits :: forall crypto addr.
LimitsAndTimeouts crypto addr
-> ProtocolSizeLimits KeepAlive ByteString
keepAliveSizeLimits
, ProtocolTimeLimits KeepAlive
keepAliveTimeLimits :: ProtocolTimeLimits KeepAlive
keepAliveTimeLimits :: forall crypto addr.
LimitsAndTimeouts crypto addr -> ProtocolTimeLimits KeepAlive
keepAliveTimeLimits
, ProtocolTimeLimits (PeerSharing addr)
peerSharingTimeLimits :: ProtocolTimeLimits (PeerSharing addr)
peerSharingTimeLimits :: forall crypto addr.
LimitsAndTimeouts crypto addr
-> ProtocolTimeLimits (PeerSharing addr)
peerSharingTimeLimits
, ProtocolSizeLimits (PeerSharing addr) ByteString
peerSharingSizeLimits :: ProtocolSizeLimits (PeerSharing addr) ByteString
peerSharingSizeLimits :: forall crypto addr.
LimitsAndTimeouts crypto addr
-> ProtocolSizeLimits (PeerSharing addr) ByteString
peerSharingSizeLimits
}
TxDecisionPolicy
sigDecisionPolicy
=
Apps {
ClientApp addr m ()
aSigSubmissionClient :: ClientApp addr m ()
aSigSubmissionClient :: ClientApp addr m ()
aSigSubmissionClient
, ServerApp addr m ()
aSigSubmissionServer :: ServerApp addr m ()
aSigSubmissionServer :: ServerApp addr m ()
aSigSubmissionServer
, ClientApp addr m ()
aKeepAliveClient :: ClientApp addr m ()
aKeepAliveClient :: ClientApp addr m ()
aKeepAliveClient
, ServerApp addr m ()
aKeepAliveServer :: ServerApp addr m ()
aKeepAliveServer :: ServerApp addr m ()
aKeepAliveServer
, ClientApp addr m ()
aPeerSharingClient :: ClientApp addr m ()
aPeerSharingClient :: ClientApp addr m ()
aPeerSharingClient
, ServerApp addr m ()
aPeerSharingServer :: ServerApp addr m ()
aPeerSharingServer :: ServerApp addr m ()
aPeerSharingServer
}
where
sigSize :: Sig crypto -> SizeInBytes
sigSize :: Sig crypto -> SizeInBytes
sigSize Sig crypto
_ = SizeInBytes
0
mempoolReader :: TxSubmissionMempoolReader SigId (Sig crypto) Int m
mempoolReader = (Sig crypto -> SigId)
-> (Sig crypto -> SizeInBytes)
-> Mempool m (Sig crypto)
-> TxSubmissionMempoolReader SigId (Sig crypto) Int m
forall tx txid (m :: * -> *).
(MonadSTM m, Eq txid) =>
(tx -> txid)
-> (tx -> SizeInBytes)
-> Mempool m tx
-> TxSubmissionMempoolReader txid tx Int m
Mempool.getReader Sig crypto -> SigId
forall crypto. Sig crypto -> SigId
sigId Sig crypto -> SizeInBytes
sigSize Mempool m (Sig crypto)
mempool
mempoolWriter :: TxSubmissionMempoolWriter SigId (Sig crypto) Int m
mempoolWriter = (Sig crypto -> SigId)
-> m ()
-> (() -> Sig crypto -> Either Void ())
-> (Void -> Bool)
-> Mempool m (Sig crypto)
-> TxSubmissionMempoolWriter SigId (Sig crypto) Int m
forall tx txid ctx failure (m :: * -> *).
(MonadSTM m, MonadThrow m, Ord txid, Typeable txid,
Typeable failure, Show txid, Show failure) =>
(tx -> txid)
-> m ctx
-> (ctx -> tx -> Either failure ())
-> (failure -> Bool)
-> Mempool m tx
-> TxSubmissionMempoolWriter txid tx Int m
Mempool.getWriter Sig crypto -> SigId
forall crypto. Sig crypto -> SigId
sigId
(() -> m ()
forall a. a -> m a
forall (f :: * -> *) a. Applicative f => a -> f a
pure ())
(\()
_ Sig crypto
_ -> () -> Either Void ()
forall a b. b -> Either a b
Right () :: Either Void ())
(\Void
_ -> Bool
True)
Mempool m (Sig crypto)
mempool
aSigSubmissionClient
:: NodeToNodeVersion
-> ExpandedInitiatorContext addr m
-> Channel m BL.ByteString
-> m ((), Maybe BL.ByteString)
aSigSubmissionClient :: ClientApp addr m ()
aSigSubmissionClient NodeToNodeVersion
version
ExpandedInitiatorContext {
eicConnectionId :: forall addr (m :: * -> *).
ExpandedInitiatorContext addr m -> ConnectionId addr
eicConnectionId = ConnectionId addr
connId,
eicControlMessage :: forall addr (m :: * -> *).
ExpandedInitiatorContext addr m -> ControlMessageSTM m
eicControlMessage = ControlMessageSTM m
controlMessage
} Channel m ByteString
channel =
Tracer m (TraceSendRecv (SigSubmission crypto))
-> AnnotatedCodec
(SigSubmission crypto) DeserialiseFailure m ByteString
-> ProtocolSizeLimits (SigSubmission crypto) ByteString
-> ProtocolTimeLimits (SigSubmission crypto)
-> Channel m ByteString
-> Peer (SigSubmission crypto) 'AsClient 'NonPipelined 'StInit m ()
-> m ((), Maybe ByteString)
forall ps (st :: ps) (pr :: PeerRole) failure bytes (m :: * -> *)
a.
(MonadAsync m, MonadFork m, MonadMask m, MonadThrow (STM m),
MonadTimer m, Monoid bytes, ShowProxy ps,
forall (st' :: ps) stok. (stok ~ StateToken st') => Show stok,
Show failure) =>
Tracer m (TraceSendRecv ps)
-> AnnotatedCodec ps failure m bytes
-> ProtocolSizeLimits ps bytes
-> ProtocolTimeLimits ps
-> Channel m bytes
-> Peer ps pr 'NonPipelined st m a
-> m (a, Maybe bytes)
runAnnotatedPeerWithLimits
(if Bool
sigSubmissionClientProtocolTracer
then String
-> WithBearer
(ConnectionId addr) (TraceSendRecv (SigSubmission crypto))
-> WithEventType
(WithBearer
(ConnectionId addr) (TraceSendRecv (SigSubmission crypto)))
forall a. String -> a -> WithEventType a
WithEventType String
"SigSubmission.Protocol.Client" (WithBearer
(ConnectionId addr) (TraceSendRecv (SigSubmission crypto))
-> WithEventType
(WithBearer
(ConnectionId addr) (TraceSendRecv (SigSubmission crypto))))
-> (TraceSendRecv (SigSubmission crypto)
-> WithBearer
(ConnectionId addr) (TraceSendRecv (SigSubmission crypto)))
-> TraceSendRecv (SigSubmission crypto)
-> WithEventType
(WithBearer
(ConnectionId addr) (TraceSendRecv (SigSubmission crypto)))
forall b c a. (b -> c) -> (a -> b) -> a -> c
. ConnectionId addr
-> TraceSendRecv (SigSubmission crypto)
-> WithBearer
(ConnectionId addr) (TraceSendRecv (SigSubmission crypto))
forall peerid a. peerid -> a -> WithBearer peerid a
Mx.WithBearer ConnectionId addr
connId (TraceSendRecv (SigSubmission crypto)
-> WithEventType
(WithBearer
(ConnectionId addr) (TraceSendRecv (SigSubmission crypto))))
-> Tracer
m
(WithEventType
(WithBearer
(ConnectionId addr) (TraceSendRecv (SigSubmission crypto))))
-> Tracer m (TraceSendRecv (SigSubmission crypto))
forall (f :: * -> *) a b. Contravariant f => (a -> b) -> f b -> f a
>$< Tracer
m
(WithEventType
(WithBearer
(ConnectionId addr) (TraceSendRecv (SigSubmission crypto))))
forall ev. ToJSON ev => Tracer m (WithEventType ev)
tracer
else Tracer m (TraceSendRecv (SigSubmission crypto))
forall (m :: * -> *) a. Applicative m => Tracer m a
nullTracer)
AnnotatedCodec
(SigSubmission crypto) DeserialiseFailure m ByteString
sigSubmissionCodec
ProtocolSizeLimits (SigSubmission crypto) ByteString
sigSubmissionSizeLimits
ProtocolTimeLimits (SigSubmission crypto)
sigSubmissionTimeLimits
Channel m ByteString
channel
(Peer (SigSubmission crypto) 'AsClient 'NonPipelined 'StInit m ()
-> m ((), Maybe ByteString))
-> Peer (SigSubmission crypto) 'AsClient 'NonPipelined 'StInit m ()
-> m ((), Maybe ByteString)
forall a b. (a -> b) -> a -> b
$ TxSubmissionClient SigId (Sig crypto) m ()
-> Peer (SigSubmission crypto) 'AsClient 'NonPipelined 'StInit m ()
forall txid tx (m :: * -> *) a.
Monad m =>
TxSubmissionClient txid tx m a
-> Client (TxSubmission2 txid tx) 'NonPipelined 'StInit m a
txSubmissionClientPeer
(TxSubmissionClient SigId (Sig crypto) m ()
-> Peer
(SigSubmission crypto) 'AsClient 'NonPipelined 'StInit m ())
-> TxSubmissionClient SigId (Sig crypto) m ()
-> Peer (SigSubmission crypto) 'AsClient 'NonPipelined 'StInit m ()
forall a b. (a -> b) -> a -> b
$ Tracer m (TraceTxSubmissionOutbound SigId (Sig crypto))
-> NumTxIdsToAck
-> TxSubmissionMempoolReader SigId (Sig crypto) Int m
-> NodeToNodeVersion
-> ControlMessageSTM m
-> TxSubmissionClient SigId (Sig crypto) m ()
forall version txid tx idx (m :: * -> *).
(Ord txid, Ord idx, MonadSTM m, MonadThrow m) =>
Tracer m (TraceTxSubmissionOutbound txid tx)
-> NumTxIdsToAck
-> TxSubmissionMempoolReader txid tx idx m
-> version
-> ControlMessageSTM m
-> TxSubmissionClient txid tx m ()
txSubmissionOutbound
(if Bool
sigSubmissionOutboundTracer
then String
-> WithBearer
(ConnectionId addr) (TraceTxSubmissionOutbound SigId (Sig crypto))
-> WithEventType
(WithBearer
(ConnectionId addr) (TraceTxSubmissionOutbound SigId (Sig crypto)))
forall a. String -> a -> WithEventType a
WithEventType String
"SigSubmission.Outbound" (WithBearer
(ConnectionId addr) (TraceTxSubmissionOutbound SigId (Sig crypto))
-> WithEventType
(WithBearer
(ConnectionId addr)
(TraceTxSubmissionOutbound SigId (Sig crypto))))
-> (TraceTxSubmissionOutbound SigId (Sig crypto)
-> WithBearer
(ConnectionId addr) (TraceTxSubmissionOutbound SigId (Sig crypto)))
-> TraceTxSubmissionOutbound SigId (Sig crypto)
-> WithEventType
(WithBearer
(ConnectionId addr) (TraceTxSubmissionOutbound SigId (Sig crypto)))
forall b c a. (b -> c) -> (a -> b) -> a -> c
. ConnectionId addr
-> TraceTxSubmissionOutbound SigId (Sig crypto)
-> WithBearer
(ConnectionId addr) (TraceTxSubmissionOutbound SigId (Sig crypto))
forall peerid a. peerid -> a -> WithBearer peerid a
Mx.WithBearer ConnectionId addr
connId (TraceTxSubmissionOutbound SigId (Sig crypto)
-> WithEventType
(WithBearer
(ConnectionId addr)
(TraceTxSubmissionOutbound SigId (Sig crypto))))
-> Tracer
m
(WithEventType
(WithBearer
(ConnectionId addr)
(TraceTxSubmissionOutbound SigId (Sig crypto))))
-> Tracer m (TraceTxSubmissionOutbound SigId (Sig crypto))
forall (f :: * -> *) a b. Contravariant f => (a -> b) -> f b -> f a
>$< Tracer
m
(WithEventType
(WithBearer
(ConnectionId addr)
(TraceTxSubmissionOutbound SigId (Sig crypto))))
forall ev. ToJSON ev => Tracer m (WithEventType ev)
tracer
else Tracer m (TraceTxSubmissionOutbound SigId (Sig crypto))
forall (m :: * -> *) a. Applicative m => Tracer m a
nullTracer)
NumTxIdsToAck
_MAX_SIGS_TO_ACK
TxSubmissionMempoolReader SigId (Sig crypto) Int m
mempoolReader
NodeToNodeVersion
version
ControlMessageSTM m
controlMessage
aSigSubmissionServer
:: NodeToNodeVersion
-> ResponderContext addr
-> Channel m BL.ByteString
-> m ((), Maybe BL.ByteString)
aSigSubmissionServer :: ServerApp addr m ()
aSigSubmissionServer NodeToNodeVersion
_version ResponderContext { rcConnectionId :: forall addr. ResponderContext addr -> ConnectionId addr
rcConnectionId = ConnectionId addr
connId } Channel m ByteString
channel =
Tracer m (TraceTxLogic addr SigId (Sig crypto))
-> TxChannelsVar m addr SigId (Sig crypto)
-> TxMempoolSem m
-> TxDecisionPolicy
-> SharedTxStateVar m addr SigId (Sig crypto)
-> TxSubmissionMempoolReader SigId (Sig crypto) Int m
-> TxSubmissionMempoolWriter SigId (Sig crypto) Int m
-> (Sig crypto -> SizeInBytes)
-> addr
-> (PeerTxAPI m SigId (Sig crypto) -> m ((), Maybe ByteString))
-> m ((), Maybe ByteString)
forall tx peeraddr txid idx (m :: * -> *) a.
(MonadMask m, MonadMVar m, MonadSTM m, MonadMonotonicTime m,
Ord txid, Show txid, Typeable txid, Ord peeraddr, Show peeraddr) =>
Tracer m (TraceTxLogic peeraddr txid tx)
-> TxChannelsVar m peeraddr txid tx
-> TxMempoolSem m
-> TxDecisionPolicy
-> SharedTxStateVar m peeraddr txid tx
-> TxSubmissionMempoolReader txid tx idx m
-> TxSubmissionMempoolWriter txid tx idx m
-> (tx -> SizeInBytes)
-> peeraddr
-> (PeerTxAPI m txid tx -> m a)
-> m a
SigSubmission.withPeer
(if Bool
sigSubmissionLogicTracer
then String
-> WithBearer
(ConnectionId addr) (TraceTxLogic addr SigId (Sig crypto))
-> WithEventType
(WithBearer
(ConnectionId addr) (TraceTxLogic addr SigId (Sig crypto)))
forall a. String -> a -> WithEventType a
WithEventType String
"SigSubmission.Logic" (WithBearer
(ConnectionId addr) (TraceTxLogic addr SigId (Sig crypto))
-> WithEventType
(WithBearer
(ConnectionId addr) (TraceTxLogic addr SigId (Sig crypto))))
-> (TraceTxLogic addr SigId (Sig crypto)
-> WithBearer
(ConnectionId addr) (TraceTxLogic addr SigId (Sig crypto)))
-> TraceTxLogic addr SigId (Sig crypto)
-> WithEventType
(WithBearer
(ConnectionId addr) (TraceTxLogic addr SigId (Sig crypto)))
forall b c a. (b -> c) -> (a -> b) -> a -> c
. ConnectionId addr
-> TraceTxLogic addr SigId (Sig crypto)
-> WithBearer
(ConnectionId addr) (TraceTxLogic addr SigId (Sig crypto))
forall peerid a. peerid -> a -> WithBearer peerid a
Mx.WithBearer ConnectionId addr
connId (TraceTxLogic addr SigId (Sig crypto)
-> WithEventType
(WithBearer
(ConnectionId addr) (TraceTxLogic addr SigId (Sig crypto))))
-> Tracer
m
(WithEventType
(WithBearer
(ConnectionId addr) (TraceTxLogic addr SigId (Sig crypto))))
-> Tracer m (TraceTxLogic addr SigId (Sig crypto))
forall (f :: * -> *) a b. Contravariant f => (a -> b) -> f b -> f a
>$< Tracer
m
(WithEventType
(WithBearer
(ConnectionId addr) (TraceTxLogic addr SigId (Sig crypto))))
forall ev. ToJSON ev => Tracer m (WithEventType ev)
tracer
else Tracer m (TraceTxLogic addr SigId (Sig crypto))
forall (m :: * -> *) a. Applicative m => Tracer m a
nullTracer)
TxChannelsVar m addr SigId (Sig crypto)
sigChannelVar
TxMempoolSem m
sigMempoolSem
TxDecisionPolicy
sigDecisionPolicy
SharedTxStateVar m addr SigId (Sig crypto)
sigSharedTxStateVar
TxSubmissionMempoolReader SigId (Sig crypto) Int m
mempoolReader
TxSubmissionMempoolWriter SigId (Sig crypto) Int m
mempoolWriter
Sig crypto -> SizeInBytes
sigSize
(ConnectionId addr -> addr
forall addr. ConnectionId addr -> addr
remoteAddress ConnectionId addr
connId)
((PeerTxAPI m SigId (Sig crypto) -> m ((), Maybe ByteString))
-> m ((), Maybe ByteString))
-> (PeerTxAPI m SigId (Sig crypto) -> m ((), Maybe ByteString))
-> m ((), Maybe ByteString)
forall a b. (a -> b) -> a -> b
$ \(PeerTxAPI m SigId (Sig crypto)
peerSigAPI :: PeerTxAPI m SigId (Sig crypto)) ->
Tracer m (TraceSendRecv (SigSubmission crypto))
-> AnnotatedCodec
(SigSubmission crypto) DeserialiseFailure m ByteString
-> ProtocolSizeLimits (SigSubmission crypto) ByteString
-> ProtocolTimeLimits (SigSubmission crypto)
-> Channel m ByteString
-> PeerPipelined (SigSubmission crypto) 'AsServer 'StInit m ()
-> m ((), Maybe ByteString)
forall ps (st :: ps) (pr :: PeerRole) failure bytes (m :: * -> *)
a.
(MonadAsync m, MonadFork m, MonadMask m, MonadTimer m,
MonadThrow (STM m), Monoid bytes, ShowProxy ps,
forall (st' :: ps) stok. (stok ~ StateToken st') => Show stok,
Show failure) =>
Tracer m (TraceSendRecv ps)
-> AnnotatedCodec ps failure m bytes
-> ProtocolSizeLimits ps bytes
-> ProtocolTimeLimits ps
-> Channel m bytes
-> PeerPipelined ps pr st m a
-> m (a, Maybe bytes)
runPipelinedAnnotatedPeerWithLimits
(if Bool
sigSubmissionServerProtocolTracer
then String
-> WithBearer
(ConnectionId addr) (TraceSendRecv (SigSubmission crypto))
-> WithEventType
(WithBearer
(ConnectionId addr) (TraceSendRecv (SigSubmission crypto)))
forall a. String -> a -> WithEventType a
WithEventType String
"SigSubmission.Protocol.Server" (WithBearer
(ConnectionId addr) (TraceSendRecv (SigSubmission crypto))
-> WithEventType
(WithBearer
(ConnectionId addr) (TraceSendRecv (SigSubmission crypto))))
-> (TraceSendRecv (SigSubmission crypto)
-> WithBearer
(ConnectionId addr) (TraceSendRecv (SigSubmission crypto)))
-> TraceSendRecv (SigSubmission crypto)
-> WithEventType
(WithBearer
(ConnectionId addr) (TraceSendRecv (SigSubmission crypto)))
forall b c a. (b -> c) -> (a -> b) -> a -> c
. ConnectionId addr
-> TraceSendRecv (SigSubmission crypto)
-> WithBearer
(ConnectionId addr) (TraceSendRecv (SigSubmission crypto))
forall peerid a. peerid -> a -> WithBearer peerid a
Mx.WithBearer ConnectionId addr
connId (TraceSendRecv (SigSubmission crypto)
-> WithEventType
(WithBearer
(ConnectionId addr) (TraceSendRecv (SigSubmission crypto))))
-> Tracer
m
(WithEventType
(WithBearer
(ConnectionId addr) (TraceSendRecv (SigSubmission crypto))))
-> Tracer m (TraceSendRecv (SigSubmission crypto))
forall (f :: * -> *) a b. Contravariant f => (a -> b) -> f b -> f a
>$< Tracer
m
(WithEventType
(WithBearer
(ConnectionId addr) (TraceSendRecv (SigSubmission crypto))))
forall ev. ToJSON ev => Tracer m (WithEventType ev)
tracer
else Tracer m (TraceSendRecv (SigSubmission crypto))
forall (m :: * -> *) a. Applicative m => Tracer m a
nullTracer)
AnnotatedCodec
(SigSubmission crypto) DeserialiseFailure m ByteString
sigSubmissionCodec
ProtocolSizeLimits (SigSubmission crypto) ByteString
sigSubmissionSizeLimits
ProtocolTimeLimits (SigSubmission crypto)
sigSubmissionTimeLimits
Channel m ByteString
channel
(PeerPipelined (SigSubmission crypto) 'AsServer 'StInit m ()
-> m ((), Maybe ByteString))
-> PeerPipelined (SigSubmission crypto) 'AsServer 'StInit m ()
-> m ((), Maybe ByteString)
forall a b. (a -> b) -> a -> b
$ TxSubmissionServerPipelined SigId (Sig crypto) m ()
-> PeerPipelined (SigSubmission crypto) 'AsServer 'StInit m ()
forall txid tx (m :: * -> *) a.
Functor m =>
TxSubmissionServerPipelined txid tx m a
-> ServerPipelined (TxSubmission2 txid tx) 'StInit m a
txSubmissionServerPeerPipelined
(TxSubmissionServerPipelined SigId (Sig crypto) m ()
-> PeerPipelined (SigSubmission crypto) 'AsServer 'StInit m ())
-> TxSubmissionServerPipelined SigId (Sig crypto) m ()
-> PeerPipelined (SigSubmission crypto) 'AsServer 'StInit m ()
forall a b. (a -> b) -> a -> b
$ Tracer m (TraceTxSubmissionInbound SigId (Sig crypto))
-> TxSubmissionInitDelay
-> TxSubmissionMempoolWriter SigId (Sig crypto) Int m
-> PeerTxAPI m SigId (Sig crypto)
-> TxSubmissionServerPipelined SigId (Sig crypto) m ()
forall txid tx idx (m :: * -> *).
(MonadDelay m, MonadThrow m, Ord txid) =>
Tracer m (TraceTxSubmissionInbound txid tx)
-> TxSubmissionInitDelay
-> TxSubmissionMempoolWriter txid tx idx m
-> PeerTxAPI m txid tx
-> TxSubmissionServerPipelined txid tx m ()
txSubmissionInboundV2
(if Bool
sigSubmissionInboundTracer
then String
-> WithBearer
(ConnectionId addr) (TraceTxSubmissionInbound SigId (Sig crypto))
-> WithEventType
(WithBearer
(ConnectionId addr) (TraceTxSubmissionInbound SigId (Sig crypto)))
forall a. String -> a -> WithEventType a
WithEventType String
"SigSubmission.Inbound" (WithBearer
(ConnectionId addr) (TraceTxSubmissionInbound SigId (Sig crypto))
-> WithEventType
(WithBearer
(ConnectionId addr) (TraceTxSubmissionInbound SigId (Sig crypto))))
-> (TraceTxSubmissionInbound SigId (Sig crypto)
-> WithBearer
(ConnectionId addr) (TraceTxSubmissionInbound SigId (Sig crypto)))
-> TraceTxSubmissionInbound SigId (Sig crypto)
-> WithEventType
(WithBearer
(ConnectionId addr) (TraceTxSubmissionInbound SigId (Sig crypto)))
forall b c a. (b -> c) -> (a -> b) -> a -> c
. ConnectionId addr
-> TraceTxSubmissionInbound SigId (Sig crypto)
-> WithBearer
(ConnectionId addr) (TraceTxSubmissionInbound SigId (Sig crypto))
forall peerid a. peerid -> a -> WithBearer peerid a
Mx.WithBearer ConnectionId addr
connId (TraceTxSubmissionInbound SigId (Sig crypto)
-> WithEventType
(WithBearer
(ConnectionId addr) (TraceTxSubmissionInbound SigId (Sig crypto))))
-> Tracer
m
(WithEventType
(WithBearer
(ConnectionId addr) (TraceTxSubmissionInbound SigId (Sig crypto))))
-> Tracer m (TraceTxSubmissionInbound SigId (Sig crypto))
forall (f :: * -> *) a b. Contravariant f => (a -> b) -> f b -> f a
>$< Tracer
m
(WithEventType
(WithBearer
(ConnectionId addr) (TraceTxSubmissionInbound SigId (Sig crypto))))
forall ev. ToJSON ev => Tracer m (WithEventType ev)
tracer
else Tracer m (TraceTxSubmissionInbound SigId (Sig crypto))
forall (m :: * -> *) a. Applicative m => Tracer m a
nullTracer)
TxSubmissionInitDelay
_SIG_SUBMISSION_INIT_DELAY
TxSubmissionMempoolWriter SigId (Sig crypto) Int m
mempoolWriter
PeerTxAPI m SigId (Sig crypto)
peerSigAPI
aKeepAliveClient
:: NodeToNodeVersion
-> ExpandedInitiatorContext addr m
-> Channel m BL.ByteString
-> m ((), Maybe BL.ByteString)
aKeepAliveClient :: ClientApp addr m ()
aKeepAliveClient NodeToNodeVersion
_version
ExpandedInitiatorContext {
eicConnectionId :: forall addr (m :: * -> *).
ExpandedInitiatorContext addr m -> ConnectionId addr
eicConnectionId = ConnectionId addr
connId
, eicControlMessage :: forall addr (m :: * -> *).
ExpandedInitiatorContext addr m -> ControlMessageSTM m
eicControlMessage = ControlMessageSTM m
controlMessageSTM
}
Channel m ByteString
channel = do
String -> m ()
forall (m :: * -> *). MonadThread m => String -> m ()
labelThisThread String
"KeepAlive.Client"
let kacApp :: StrictTVar m (Map (ConnectionId addr) PeerGSV)
-> m ((), Maybe ByteString)
kacApp StrictTVar m (Map (ConnectionId addr) PeerGSV)
dqCtx =
Tracer m (TraceSendRecv KeepAlive)
-> Codec KeepAlive DeserialiseFailure m ByteString
-> ProtocolSizeLimits KeepAlive ByteString
-> ProtocolTimeLimits KeepAlive
-> Channel m ByteString
-> Peer KeepAlive 'AsClient 'NonPipelined 'StClient m ()
-> m ((), Maybe ByteString)
forall ps (st :: ps) (pr :: PeerRole) failure bytes (m :: * -> *)
a.
(MonadAsync m, MonadFork m, MonadMask m, MonadThrow (STM m),
MonadTimer m, ShowProxy ps,
forall (st' :: ps) stok. (stok ~ StateToken st') => Show stok,
Show failure) =>
Tracer m (TraceSendRecv ps)
-> Codec ps failure m bytes
-> ProtocolSizeLimits ps bytes
-> ProtocolTimeLimits ps
-> Channel m bytes
-> Peer ps pr 'NonPipelined st m a
-> m (a, Maybe bytes)
runPeerWithLimits
(if Bool
keepAliveClientProtocolTracer
then String
-> WithBearer (ConnectionId addr) (TraceSendRecv KeepAlive)
-> WithEventType
(WithBearer (ConnectionId addr) (TraceSendRecv KeepAlive))
forall a. String -> a -> WithEventType a
WithEventType String
"KeepAlive.Protocol.Client" (WithBearer (ConnectionId addr) (TraceSendRecv KeepAlive)
-> WithEventType
(WithBearer (ConnectionId addr) (TraceSendRecv KeepAlive)))
-> (TraceSendRecv KeepAlive
-> WithBearer (ConnectionId addr) (TraceSendRecv KeepAlive))
-> TraceSendRecv KeepAlive
-> WithEventType
(WithBearer (ConnectionId addr) (TraceSendRecv KeepAlive))
forall b c a. (b -> c) -> (a -> b) -> a -> c
. ConnectionId addr
-> TraceSendRecv KeepAlive
-> WithBearer (ConnectionId addr) (TraceSendRecv KeepAlive)
forall peerid a. peerid -> a -> WithBearer peerid a
Mx.WithBearer ConnectionId addr
connId (TraceSendRecv KeepAlive
-> WithEventType
(WithBearer (ConnectionId addr) (TraceSendRecv KeepAlive)))
-> Tracer
m
(WithEventType
(WithBearer (ConnectionId addr) (TraceSendRecv KeepAlive)))
-> Tracer m (TraceSendRecv KeepAlive)
forall (f :: * -> *) a b. Contravariant f => (a -> b) -> f b -> f a
>$< Tracer
m
(WithEventType
(WithBearer (ConnectionId addr) (TraceSendRecv KeepAlive)))
forall ev. ToJSON ev => Tracer m (WithEventType ev)
tracer
else Tracer m (TraceSendRecv KeepAlive)
forall (m :: * -> *) a. Applicative m => Tracer m a
nullTracer)
Codec KeepAlive DeserialiseFailure m ByteString
keepAliveCodec
ProtocolSizeLimits KeepAlive ByteString
keepAliveSizeLimits
ProtocolTimeLimits KeepAlive
keepAliveTimeLimits
Channel m ByteString
channel
(Peer KeepAlive 'AsClient 'NonPipelined 'StClient m ()
-> m ((), Maybe ByteString))
-> Peer KeepAlive 'AsClient 'NonPipelined 'StClient m ()
-> m ((), Maybe ByteString)
forall a b. (a -> b) -> a -> b
$ KeepAliveClient m ()
-> Peer KeepAlive 'AsClient 'NonPipelined 'StClient m ()
forall (m :: * -> *) a.
MonadThrow m =>
KeepAliveClient m a -> Client KeepAlive 'NonPipelined 'StClient m a
keepAliveClientPeer
(KeepAliveClient m ()
-> Peer KeepAlive 'AsClient 'NonPipelined 'StClient m ())
-> KeepAliveClient m ()
-> Peer KeepAlive 'AsClient 'NonPipelined 'StClient m ()
forall a b. (a -> b) -> a -> b
$ Tracer m (TraceKeepAliveClient (ConnectionId addr))
-> StdGen
-> ControlMessageSTM m
-> ConnectionId addr
-> StrictTVar m (Map (ConnectionId addr) PeerGSV)
-> KeepAliveInterval
-> KeepAliveClient m ()
forall (m :: * -> *) peer.
(MonadTimer m, Ord peer) =>
Tracer m (TraceKeepAliveClient peer)
-> StdGen
-> ControlMessageSTM m
-> peer
-> StrictTVar m (Map peer PeerGSV)
-> KeepAliveInterval
-> KeepAliveClient m ()
keepAliveClient Tracer m (TraceKeepAliveClient (ConnectionId addr))
forall (m :: * -> *) a. Applicative m => Tracer m a
nullTracer
(Int -> StdGen
mkStdGen Int
0)
ControlMessageSTM m
controlMessageSTM
ConnectionId addr
connId
StrictTVar m (Map (ConnectionId addr) PeerGSV)
dqCtx
(DiffTime -> KeepAliveInterval
KeepAliveInterval DiffTime
10)
((), trailing) <- FetchClientRegistry (ConnectionId addr) () () m
-> ConnectionId addr
-> (StrictTVar m (Map (ConnectionId addr) PeerGSV)
-> m ((), Maybe ByteString))
-> m ((), Maybe ByteString)
forall (m :: * -> *) a peer header block.
(MonadSTM m, MonadFork m, MonadMask m, Ord peer) =>
FetchClientRegistry peer header block m
-> peer -> (StrictTVar m (Map peer PeerGSV) -> m a) -> m a
bracketKeepAliveClient FetchClientRegistry (ConnectionId addr) () () m
fetchClientRegistry ConnectionId addr
connId StrictTVar m (Map (ConnectionId addr) PeerGSV)
-> m ((), Maybe ByteString)
kacApp
return ((), trailing)
aKeepAliveServer
:: NodeToNodeVersion
-> ResponderContext addr
-> Channel m BL.ByteString
-> m ((), Maybe BL.ByteString)
aKeepAliveServer :: ServerApp addr m ()
aKeepAliveServer NodeToNodeVersion
_version
ResponderContext {
rcConnectionId :: forall addr. ResponderContext addr -> ConnectionId addr
rcConnectionId = ConnectionId addr
connId
}
Channel m ByteString
channel = do
String -> m ()
forall (m :: * -> *). MonadThread m => String -> m ()
labelThisThread String
"KeepAlive.Server"
Tracer m (TraceSendRecv KeepAlive)
-> Codec KeepAlive DeserialiseFailure m ByteString
-> ProtocolSizeLimits KeepAlive ByteString
-> ProtocolTimeLimits KeepAlive
-> Channel m ByteString
-> Peer KeepAlive 'AsServer 'NonPipelined 'StClient m ()
-> m ((), Maybe ByteString)
forall ps (st :: ps) (pr :: PeerRole) failure bytes (m :: * -> *)
a.
(MonadAsync m, MonadFork m, MonadMask m, MonadThrow (STM m),
MonadTimer m, ShowProxy ps,
forall (st' :: ps) stok. (stok ~ StateToken st') => Show stok,
Show failure) =>
Tracer m (TraceSendRecv ps)
-> Codec ps failure m bytes
-> ProtocolSizeLimits ps bytes
-> ProtocolTimeLimits ps
-> Channel m bytes
-> Peer ps pr 'NonPipelined st m a
-> m (a, Maybe bytes)
runPeerWithLimits
(if Bool
keepAliveServerProtocolTracer
then String
-> WithBearer (ConnectionId addr) (TraceSendRecv KeepAlive)
-> WithEventType
(WithBearer (ConnectionId addr) (TraceSendRecv KeepAlive))
forall a. String -> a -> WithEventType a
WithEventType String
"KeepAlive.Protocol.Server" (WithBearer (ConnectionId addr) (TraceSendRecv KeepAlive)
-> WithEventType
(WithBearer (ConnectionId addr) (TraceSendRecv KeepAlive)))
-> (TraceSendRecv KeepAlive
-> WithBearer (ConnectionId addr) (TraceSendRecv KeepAlive))
-> TraceSendRecv KeepAlive
-> WithEventType
(WithBearer (ConnectionId addr) (TraceSendRecv KeepAlive))
forall b c a. (b -> c) -> (a -> b) -> a -> c
. ConnectionId addr
-> TraceSendRecv KeepAlive
-> WithBearer (ConnectionId addr) (TraceSendRecv KeepAlive)
forall peerid a. peerid -> a -> WithBearer peerid a
Mx.WithBearer ConnectionId addr
connId (TraceSendRecv KeepAlive
-> WithEventType
(WithBearer (ConnectionId addr) (TraceSendRecv KeepAlive)))
-> Tracer
m
(WithEventType
(WithBearer (ConnectionId addr) (TraceSendRecv KeepAlive)))
-> Tracer m (TraceSendRecv KeepAlive)
forall (f :: * -> *) a b. Contravariant f => (a -> b) -> f b -> f a
>$< Tracer
m
(WithEventType
(WithBearer (ConnectionId addr) (TraceSendRecv KeepAlive)))
forall ev. ToJSON ev => Tracer m (WithEventType ev)
tracer
else Tracer m (TraceSendRecv KeepAlive)
forall (m :: * -> *) a. Applicative m => Tracer m a
nullTracer)
Codec KeepAlive DeserialiseFailure m ByteString
keepAliveCodec
ProtocolSizeLimits KeepAlive ByteString
keepAliveSizeLimits
ProtocolTimeLimits KeepAlive
keepAliveTimeLimits
Channel m ByteString
channel
(Peer KeepAlive 'AsServer 'NonPipelined 'StClient m ()
-> m ((), Maybe ByteString))
-> Peer KeepAlive 'AsServer 'NonPipelined 'StClient m ()
-> m ((), Maybe ByteString)
forall a b. (a -> b) -> a -> b
$ KeepAliveServer m ()
-> Peer KeepAlive 'AsServer 'NonPipelined 'StClient m ()
forall (m :: * -> *) a.
Functor m =>
KeepAliveServer m a -> Server KeepAlive 'NonPipelined 'StClient m a
keepAliveServerPeer
KeepAliveServer m ()
forall (m :: * -> *). Applicative m => KeepAliveServer m ()
keepAliveServer
aPeerSharingClient
:: NodeToNodeVersion
-> ExpandedInitiatorContext addr m
-> Channel m BL.ByteString
-> m ((), Maybe BL.ByteString)
aPeerSharingClient :: ClientApp addr m ()
aPeerSharingClient NodeToNodeVersion
_version
ExpandedInitiatorContext {
eicConnectionId :: forall addr (m :: * -> *).
ExpandedInitiatorContext addr m -> ConnectionId addr
eicConnectionId = ConnectionId addr
connId
, eicControlMessage :: forall addr (m :: * -> *).
ExpandedInitiatorContext addr m -> ControlMessageSTM m
eicControlMessage = ControlMessageSTM m
controlMessageSTM
}
Channel m ByteString
channel = do
String -> m ()
forall (m :: * -> *). MonadThread m => String -> m ()
labelThisThread String
"PeerSharing.Client"
PeerSharingRegistry addr m
-> addr
-> (PeerSharingController addr m -> m ((), Maybe ByteString))
-> m ((), Maybe ByteString)
forall peer (m :: * -> *) a.
(Ord peer, MonadSTM m, MonadThrow m) =>
PeerSharingRegistry peer m
-> peer -> (PeerSharingController peer m -> m a) -> m a
bracketPeerSharingClient PeerSharingRegistry addr m
peerSharingRegistry (ConnectionId addr -> addr
forall addr. ConnectionId addr -> addr
remoteAddress ConnectionId addr
connId)
((PeerSharingController addr m -> m ((), Maybe ByteString))
-> m ((), Maybe ByteString))
-> (PeerSharingController addr m -> m ((), Maybe ByteString))
-> m ((), Maybe ByteString)
forall a b. (a -> b) -> a -> b
$ \PeerSharingController addr m
controller -> do
psClient <- ControlMessageSTM m
-> PeerSharingController addr m -> m (PeerSharingClient addr m ())
forall (m :: * -> *) peer.
(Alternative (STM m), MonadMVar m, MonadSTM m, MonadThrow m) =>
ControlMessageSTM m
-> PeerSharingController peer m -> m (PeerSharingClient peer m ())
peerSharingClient ControlMessageSTM m
controlMessageSTM PeerSharingController addr m
controller
((), trailing) <- runPeerWithLimits
(if peerSharingClientProtocolTracer
then WithEventType "PeerSharing.Protocol.Client" . Mx.WithBearer connId >$< tracer
else nullTracer)
peerSharingCodec
peerSharingSizeLimits
peerSharingTimeLimits
channel
(peerSharingClientPeer psClient)
return ((), trailing)
aPeerSharingServer
:: NodeToNodeVersion
-> ResponderContext addr
-> Channel m BL.ByteString
-> m ((), Maybe BL.ByteString)
aPeerSharingServer :: ServerApp addr m ()
aPeerSharingServer NodeToNodeVersion
_version
ResponderContext {
rcConnectionId :: forall addr. ResponderContext addr -> ConnectionId addr
rcConnectionId = ConnectionId addr
connId
}
Channel m ByteString
channel = do
String -> m ()
forall (m :: * -> *). MonadThread m => String -> m ()
labelThisThread String
"PeerSharing.Server"
Tracer m (TraceSendRecv (PeerSharing addr))
-> Codec (PeerSharing addr) DeserialiseFailure m ByteString
-> ProtocolSizeLimits (PeerSharing addr) ByteString
-> ProtocolTimeLimits (PeerSharing addr)
-> Channel m ByteString
-> Peer (PeerSharing addr) 'AsServer 'NonPipelined 'StIdle m ()
-> m ((), Maybe ByteString)
forall ps (st :: ps) (pr :: PeerRole) failure bytes (m :: * -> *)
a.
(MonadAsync m, MonadFork m, MonadMask m, MonadThrow (STM m),
MonadTimer m, ShowProxy ps,
forall (st' :: ps) stok. (stok ~ StateToken st') => Show stok,
Show failure) =>
Tracer m (TraceSendRecv ps)
-> Codec ps failure m bytes
-> ProtocolSizeLimits ps bytes
-> ProtocolTimeLimits ps
-> Channel m bytes
-> Peer ps pr 'NonPipelined st m a
-> m (a, Maybe bytes)
runPeerWithLimits
(if Bool
peerSharingServerProtocolTracer
then String
-> WithBearer
(ConnectionId addr) (TraceSendRecv (PeerSharing addr))
-> WithEventType
(WithBearer (ConnectionId addr) (TraceSendRecv (PeerSharing addr)))
forall a. String -> a -> WithEventType a
WithEventType String
"PeerSharing.Protocol.Server" (WithBearer (ConnectionId addr) (TraceSendRecv (PeerSharing addr))
-> WithEventType
(WithBearer
(ConnectionId addr) (TraceSendRecv (PeerSharing addr))))
-> (TraceSendRecv (PeerSharing addr)
-> WithBearer
(ConnectionId addr) (TraceSendRecv (PeerSharing addr)))
-> TraceSendRecv (PeerSharing addr)
-> WithEventType
(WithBearer (ConnectionId addr) (TraceSendRecv (PeerSharing addr)))
forall b c a. (b -> c) -> (a -> b) -> a -> c
. ConnectionId addr
-> TraceSendRecv (PeerSharing addr)
-> WithBearer
(ConnectionId addr) (TraceSendRecv (PeerSharing addr))
forall peerid a. peerid -> a -> WithBearer peerid a
Mx.WithBearer ConnectionId addr
connId (TraceSendRecv (PeerSharing addr)
-> WithEventType
(WithBearer
(ConnectionId addr) (TraceSendRecv (PeerSharing addr))))
-> Tracer
m
(WithEventType
(WithBearer
(ConnectionId addr) (TraceSendRecv (PeerSharing addr))))
-> Tracer m (TraceSendRecv (PeerSharing addr))
forall (f :: * -> *) a b. Contravariant f => (a -> b) -> f b -> f a
>$< Tracer
m
(WithEventType
(WithBearer
(ConnectionId addr) (TraceSendRecv (PeerSharing addr))))
forall ev. ToJSON ev => Tracer m (WithEventType ev)
tracer
else Tracer m (TraceSendRecv (PeerSharing addr))
forall (m :: * -> *) a. Applicative m => Tracer m a
nullTracer)
Codec (PeerSharing addr) DeserialiseFailure m ByteString
peerSharingCodec
ProtocolSizeLimits (PeerSharing addr) ByteString
peerSharingSizeLimits
ProtocolTimeLimits (PeerSharing addr)
peerSharingTimeLimits
Channel m ByteString
channel
(Peer (PeerSharing addr) 'AsServer 'NonPipelined 'StIdle m ()
-> m ((), Maybe ByteString))
-> Peer (PeerSharing addr) 'AsServer 'NonPipelined 'StIdle m ()
-> m ((), Maybe ByteString)
forall a b. (a -> b) -> a -> b
$ PeerSharingServer addr m
-> Peer (PeerSharing addr) 'AsServer 'NonPipelined 'StIdle m ()
forall (m :: * -> *) peerAddress.
Monad m =>
PeerSharingServer peerAddress m
-> Server (PeerSharing peerAddress) 'NonPipelined 'StIdle m ()
peerSharingServerPeer
(PeerSharingServer addr m
-> Peer (PeerSharing addr) 'AsServer 'NonPipelined 'StIdle m ())
-> PeerSharingServer addr m
-> Peer (PeerSharing addr) 'AsServer 'NonPipelined 'StIdle m ()
forall a b. (a -> b) -> a -> b
$ PeerSharingAPI addr StdGen m -> PeerSharingServer addr m
forall (m :: * -> *) peer s.
(MonadSTM m, MonadMonotonicTime m, Hashable peer, RandomGen s) =>
PeerSharingAPI peer s m -> PeerSharingServer peer m
peerSharingServer PeerSharingAPI addr StdGen m
peerSharingAPI
data Protocols appType initiatorCtx responderCtx bytes m a b =
Protocols {
forall (appType :: Mode) initiatorCtx responderCtx bytes
(m :: * -> *) a b.
Protocols appType initiatorCtx responderCtx bytes m a b
-> RunMiniProtocol appType initiatorCtx responderCtx bytes m a b
sigSubmissionProtocol :: RunMiniProtocol appType initiatorCtx responderCtx bytes m a b
, forall (appType :: Mode) initiatorCtx responderCtx bytes
(m :: * -> *) a b.
Protocols appType initiatorCtx responderCtx bytes m a b
-> RunMiniProtocol appType initiatorCtx responderCtx bytes m a b
keepAliveProtocol :: RunMiniProtocol appType initiatorCtx responderCtx bytes m a b
, forall (appType :: Mode) initiatorCtx responderCtx bytes
(m :: * -> *) a b.
Protocols appType initiatorCtx responderCtx bytes m a b
-> RunMiniProtocol appType initiatorCtx responderCtx bytes m a b
peerSharingProtocol :: RunMiniProtocol appType initiatorCtx responderCtx bytes m a b
}
sigSubmissionMiniProtocolNum :: Mx.MiniProtocolNum
sigSubmissionMiniProtocolNum :: MiniProtocolNum
sigSubmissionMiniProtocolNum = Word16 -> MiniProtocolNum
Mx.MiniProtocolNum Word16
11
keepAliveMiniProtocolNum :: Mx.MiniProtocolNum
keepAliveMiniProtocolNum :: MiniProtocolNum
keepAliveMiniProtocolNum = Word16 -> MiniProtocolNum
Mx.MiniProtocolNum Word16
12
peerSharingMiniProtocolNum :: Mx.MiniProtocolNum
peerSharingMiniProtocolNum :: MiniProtocolNum
peerSharingMiniProtocolNum = Word16 -> MiniProtocolNum
Mx.MiniProtocolNum Word16
13
nodeToNodeProtocols
:: LimitsAndTimeouts crypto addr
-> Protocols appType initiatorCtx responderCtx bytes m a b
-> NodeToNodeVersion
-> NodeToNodeVersionData
-> OuroborosBundle appType initiatorCtx responderCtx bytes m a b
nodeToNodeProtocols :: forall crypto addr (appType :: Mode) initiatorCtx responderCtx
bytes (m :: * -> *) a b.
LimitsAndTimeouts crypto addr
-> Protocols appType initiatorCtx responderCtx bytes m a b
-> NodeToNodeVersion
-> NodeToNodeVersionData
-> OuroborosBundle appType initiatorCtx responderCtx bytes m a b
nodeToNodeProtocols LimitsAndTimeouts {
MiniProtocolLimits
sigSubmissionLimits :: MiniProtocolLimits
sigSubmissionLimits :: forall crypto addr.
LimitsAndTimeouts crypto addr -> MiniProtocolLimits
sigSubmissionLimits
, MiniProtocolLimits
keepAliveLimits :: MiniProtocolLimits
keepAliveLimits :: forall crypto addr.
LimitsAndTimeouts crypto addr -> MiniProtocolLimits
keepAliveLimits
, MiniProtocolLimits
peerSharingLimits :: MiniProtocolLimits
peerSharingLimits :: forall crypto addr.
LimitsAndTimeouts crypto addr -> MiniProtocolLimits
peerSharingLimits
}
Protocols {
RunMiniProtocol appType initiatorCtx responderCtx bytes m a b
sigSubmissionProtocol :: forall (appType :: Mode) initiatorCtx responderCtx bytes
(m :: * -> *) a b.
Protocols appType initiatorCtx responderCtx bytes m a b
-> RunMiniProtocol appType initiatorCtx responderCtx bytes m a b
sigSubmissionProtocol :: RunMiniProtocol appType initiatorCtx responderCtx bytes m a b
sigSubmissionProtocol
, RunMiniProtocol appType initiatorCtx responderCtx bytes m a b
keepAliveProtocol :: forall (appType :: Mode) initiatorCtx responderCtx bytes
(m :: * -> *) a b.
Protocols appType initiatorCtx responderCtx bytes m a b
-> RunMiniProtocol appType initiatorCtx responderCtx bytes m a b
keepAliveProtocol :: RunMiniProtocol appType initiatorCtx responderCtx bytes m a b
keepAliveProtocol
, RunMiniProtocol appType initiatorCtx responderCtx bytes m a b
peerSharingProtocol :: forall (appType :: Mode) initiatorCtx responderCtx bytes
(m :: * -> *) a b.
Protocols appType initiatorCtx responderCtx bytes m a b
-> RunMiniProtocol appType initiatorCtx responderCtx bytes m a b
peerSharingProtocol :: RunMiniProtocol appType initiatorCtx responderCtx bytes m a b
peerSharingProtocol
}
NodeToNodeVersion
_version
NodeToNodeVersionData {
PeerSharing
peerSharing :: PeerSharing
peerSharing :: NodeToNodeVersionData -> PeerSharing
peerSharing
}
=
WithProtocolTemperature
'Hot [MiniProtocol appType initiatorCtx responderCtx bytes m a b]
-> WithProtocolTemperature
'Warm [MiniProtocol appType initiatorCtx responderCtx bytes m a b]
-> WithProtocolTemperature
'Established
[MiniProtocol appType initiatorCtx responderCtx bytes m a b]
-> TemperatureBundle
[MiniProtocol appType initiatorCtx responderCtx bytes m a b]
forall a.
WithProtocolTemperature 'Hot a
-> WithProtocolTemperature 'Warm a
-> WithProtocolTemperature 'Established a
-> TemperatureBundle a
TemperatureBundle
([MiniProtocol appType initiatorCtx responderCtx bytes m a b]
-> WithProtocolTemperature
'Hot [MiniProtocol appType initiatorCtx responderCtx bytes m a b]
forall a. a -> WithProtocolTemperature 'Hot a
WithHot [
MiniProtocol {
miniProtocolNum :: MiniProtocolNum
miniProtocolNum = MiniProtocolNum
sigSubmissionMiniProtocolNum
, miniProtocolStart :: StartOnDemandOrEagerly
miniProtocolStart = StartOnDemandOrEagerly
StartOnDemandAny
, miniProtocolLimits :: MiniProtocolLimits
miniProtocolLimits = MiniProtocolLimits
sigSubmissionLimits
, miniProtocolRun :: RunMiniProtocol appType initiatorCtx responderCtx bytes m a b
miniProtocolRun = RunMiniProtocol appType initiatorCtx responderCtx bytes m a b
sigSubmissionProtocol
}
])
([MiniProtocol appType initiatorCtx responderCtx bytes m a b]
-> WithProtocolTemperature
'Warm [MiniProtocol appType initiatorCtx responderCtx bytes m a b]
forall a. a -> WithProtocolTemperature 'Warm a
WithWarm [])
([MiniProtocol appType initiatorCtx responderCtx bytes m a b]
-> WithProtocolTemperature
'Established
[MiniProtocol appType initiatorCtx responderCtx bytes m a b]
forall a. a -> WithProtocolTemperature 'Established a
WithEstablished ([MiniProtocol appType initiatorCtx responderCtx bytes m a b]
-> WithProtocolTemperature
'Established
[MiniProtocol appType initiatorCtx responderCtx bytes m a b])
-> [MiniProtocol appType initiatorCtx responderCtx bytes m a b]
-> WithProtocolTemperature
'Established
[MiniProtocol appType initiatorCtx responderCtx bytes m a b]
forall a b. (a -> b) -> a -> b
$
MiniProtocol {
miniProtocolNum :: MiniProtocolNum
miniProtocolNum = MiniProtocolNum
keepAliveMiniProtocolNum
, miniProtocolStart :: StartOnDemandOrEagerly
miniProtocolStart = StartOnDemandOrEagerly
StartOnDemandAny
, miniProtocolLimits :: MiniProtocolLimits
miniProtocolLimits = MiniProtocolLimits
keepAliveLimits
, miniProtocolRun :: RunMiniProtocol appType initiatorCtx responderCtx bytes m a b
miniProtocolRun = RunMiniProtocol appType initiatorCtx responderCtx bytes m a b
keepAliveProtocol
}
MiniProtocol appType initiatorCtx responderCtx bytes m a b
-> [MiniProtocol appType initiatorCtx responderCtx bytes m a b]
-> [MiniProtocol appType initiatorCtx responderCtx bytes m a b]
forall a. a -> [a] -> [a]
: case PeerSharing
peerSharing of
PeerSharing
PeerSharingEnabled ->
[ MiniProtocol {
miniProtocolNum :: MiniProtocolNum
miniProtocolNum = MiniProtocolNum
peerSharingMiniProtocolNum
, miniProtocolStart :: StartOnDemandOrEagerly
miniProtocolStart = StartOnDemandOrEagerly
StartOnDemand
, miniProtocolLimits :: MiniProtocolLimits
miniProtocolLimits = MiniProtocolLimits
peerSharingLimits
, miniProtocolRun :: RunMiniProtocol appType initiatorCtx responderCtx bytes m a b
miniProtocolRun = RunMiniProtocol appType initiatorCtx responderCtx bytes m a b
peerSharingProtocol
}
]
PeerSharing
PeerSharingDisabled ->
[]
)
initiatorProtocols
:: LimitsAndTimeouts crypto addr
-> Apps addr m a b
-> NodeToNodeVersion
-> NodeToNodeVersionData
-> OuroborosBundleWithExpandedCtx 'InitiatorMode addr BL.ByteString m a Void
initiatorProtocols :: forall crypto addr (m :: * -> *) a b.
LimitsAndTimeouts crypto addr
-> Apps addr m a b
-> NodeToNodeVersion
-> NodeToNodeVersionData
-> OuroborosBundleWithExpandedCtx
'InitiatorMode addr ByteString m a Void
initiatorProtocols LimitsAndTimeouts crypto addr
limitsAndTimeouts
Apps {
ClientApp addr m a
aSigSubmissionClient :: forall addr (m :: * -> *) a b.
Apps addr m a b -> ClientApp addr m a
aSigSubmissionClient :: ClientApp addr m a
aSigSubmissionClient
, ClientApp addr m a
aKeepAliveClient :: forall addr (m :: * -> *) a b.
Apps addr m a b -> ClientApp addr m a
aKeepAliveClient :: ClientApp addr m a
aKeepAliveClient
, ClientApp addr m a
aPeerSharingClient :: forall addr (m :: * -> *) a b.
Apps addr m a b -> ClientApp addr m a
aPeerSharingClient :: ClientApp addr m a
aPeerSharingClient
}
NodeToNodeVersion
version =
LimitsAndTimeouts crypto addr
-> Protocols
'InitiatorMode
(ExpandedInitiatorContext addr m)
(ResponderContext addr)
ByteString
m
a
Void
-> NodeToNodeVersion
-> NodeToNodeVersionData
-> OuroborosBundle
'InitiatorMode
(ExpandedInitiatorContext addr m)
(ResponderContext addr)
ByteString
m
a
Void
forall crypto addr (appType :: Mode) initiatorCtx responderCtx
bytes (m :: * -> *) a b.
LimitsAndTimeouts crypto addr
-> Protocols appType initiatorCtx responderCtx bytes m a b
-> NodeToNodeVersion
-> NodeToNodeVersionData
-> OuroborosBundle appType initiatorCtx responderCtx bytes m a b
nodeToNodeProtocols
LimitsAndTimeouts crypto addr
limitsAndTimeouts
(Protocols {
sigSubmissionProtocol :: RunMiniProtocol
'InitiatorMode
(ExpandedInitiatorContext addr m)
(ResponderContext addr)
ByteString
m
a
Void
sigSubmissionProtocol =
MiniProtocolCb (ExpandedInitiatorContext addr m) ByteString m a
-> RunMiniProtocol
'InitiatorMode
(ExpandedInitiatorContext addr m)
(ResponderContext addr)
ByteString
m
a
Void
forall initiatorCtx bytes (m :: * -> *) a responderCtx.
MiniProtocolCb initiatorCtx bytes m a
-> RunMiniProtocol
'InitiatorMode initiatorCtx responderCtx bytes m a Void
InitiatorProtocolOnly ((ExpandedInitiatorContext addr m
-> Channel m ByteString -> m (a, Maybe ByteString))
-> MiniProtocolCb (ExpandedInitiatorContext addr m) ByteString m a
forall ctx bytes (m :: * -> *) a.
(ctx -> Channel m bytes -> m (a, Maybe bytes))
-> MiniProtocolCb ctx bytes m a
MiniProtocolCb (ClientApp addr m a
aSigSubmissionClient NodeToNodeVersion
version))
, keepAliveProtocol :: RunMiniProtocol
'InitiatorMode
(ExpandedInitiatorContext addr m)
(ResponderContext addr)
ByteString
m
a
Void
keepAliveProtocol =
MiniProtocolCb (ExpandedInitiatorContext addr m) ByteString m a
-> RunMiniProtocol
'InitiatorMode
(ExpandedInitiatorContext addr m)
(ResponderContext addr)
ByteString
m
a
Void
forall initiatorCtx bytes (m :: * -> *) a responderCtx.
MiniProtocolCb initiatorCtx bytes m a
-> RunMiniProtocol
'InitiatorMode initiatorCtx responderCtx bytes m a Void
InitiatorProtocolOnly ((ExpandedInitiatorContext addr m
-> Channel m ByteString -> m (a, Maybe ByteString))
-> MiniProtocolCb (ExpandedInitiatorContext addr m) ByteString m a
forall ctx bytes (m :: * -> *) a.
(ctx -> Channel m bytes -> m (a, Maybe bytes))
-> MiniProtocolCb ctx bytes m a
MiniProtocolCb (ClientApp addr m a
aKeepAliveClient NodeToNodeVersion
version))
, peerSharingProtocol :: RunMiniProtocol
'InitiatorMode
(ExpandedInitiatorContext addr m)
(ResponderContext addr)
ByteString
m
a
Void
peerSharingProtocol =
MiniProtocolCb (ExpandedInitiatorContext addr m) ByteString m a
-> RunMiniProtocol
'InitiatorMode
(ExpandedInitiatorContext addr m)
(ResponderContext addr)
ByteString
m
a
Void
forall initiatorCtx bytes (m :: * -> *) a responderCtx.
MiniProtocolCb initiatorCtx bytes m a
-> RunMiniProtocol
'InitiatorMode initiatorCtx responderCtx bytes m a Void
InitiatorProtocolOnly ((ExpandedInitiatorContext addr m
-> Channel m ByteString -> m (a, Maybe ByteString))
-> MiniProtocolCb (ExpandedInitiatorContext addr m) ByteString m a
forall ctx bytes (m :: * -> *) a.
(ctx -> Channel m bytes -> m (a, Maybe bytes))
-> MiniProtocolCb ctx bytes m a
MiniProtocolCb (ClientApp addr m a
aPeerSharingClient NodeToNodeVersion
version))
})
NodeToNodeVersion
version
initiatorAndResponderProtocols
:: LimitsAndTimeouts crypto addr
-> Apps addr m a b
-> NodeToNodeVersion
-> NodeToNodeVersionData
-> OuroborosBundleWithExpandedCtx 'InitiatorResponderMode addr BL.ByteString m a b
initiatorAndResponderProtocols :: forall crypto addr (m :: * -> *) a b.
LimitsAndTimeouts crypto addr
-> Apps addr m a b
-> NodeToNodeVersion
-> NodeToNodeVersionData
-> OuroborosBundleWithExpandedCtx
'InitiatorResponderMode addr ByteString m a b
initiatorAndResponderProtocols LimitsAndTimeouts crypto addr
limitsAndTimeouts
Apps {
ClientApp addr m a
aSigSubmissionClient :: forall addr (m :: * -> *) a b.
Apps addr m a b -> ClientApp addr m a
aSigSubmissionClient :: ClientApp addr m a
aSigSubmissionClient
, ServerApp addr m b
aSigSubmissionServer :: forall addr (m :: * -> *) a b.
Apps addr m a b -> ServerApp addr m b
aSigSubmissionServer :: ServerApp addr m b
aSigSubmissionServer
, ClientApp addr m a
aKeepAliveClient :: forall addr (m :: * -> *) a b.
Apps addr m a b -> ClientApp addr m a
aKeepAliveClient :: ClientApp addr m a
aKeepAliveClient
, ServerApp addr m b
aKeepAliveServer :: forall addr (m :: * -> *) a b.
Apps addr m a b -> ServerApp addr m b
aKeepAliveServer :: ServerApp addr m b
aKeepAliveServer
, ClientApp addr m a
aPeerSharingClient :: forall addr (m :: * -> *) a b.
Apps addr m a b -> ClientApp addr m a
aPeerSharingClient :: ClientApp addr m a
aPeerSharingClient
, ServerApp addr m b
aPeerSharingServer :: forall addr (m :: * -> *) a b.
Apps addr m a b -> ServerApp addr m b
aPeerSharingServer :: ServerApp addr m b
aPeerSharingServer
}
NodeToNodeVersion
version =
LimitsAndTimeouts crypto addr
-> Protocols
'InitiatorResponderMode
(ExpandedInitiatorContext addr m)
(ResponderContext addr)
ByteString
m
a
b
-> NodeToNodeVersion
-> NodeToNodeVersionData
-> OuroborosBundle
'InitiatorResponderMode
(ExpandedInitiatorContext addr m)
(ResponderContext addr)
ByteString
m
a
b
forall crypto addr (appType :: Mode) initiatorCtx responderCtx
bytes (m :: * -> *) a b.
LimitsAndTimeouts crypto addr
-> Protocols appType initiatorCtx responderCtx bytes m a b
-> NodeToNodeVersion
-> NodeToNodeVersionData
-> OuroborosBundle appType initiatorCtx responderCtx bytes m a b
nodeToNodeProtocols
LimitsAndTimeouts crypto addr
limitsAndTimeouts
(Protocols {
sigSubmissionProtocol :: RunMiniProtocol
'InitiatorResponderMode
(ExpandedInitiatorContext addr m)
(ResponderContext addr)
ByteString
m
a
b
sigSubmissionProtocol =
MiniProtocolCb (ExpandedInitiatorContext addr m) ByteString m a
-> MiniProtocolCb (ResponderContext addr) ByteString m b
-> RunMiniProtocol
'InitiatorResponderMode
(ExpandedInitiatorContext addr m)
(ResponderContext addr)
ByteString
m
a
b
forall initiatorCtx bytes (m :: * -> *) a responderCtx b.
MiniProtocolCb initiatorCtx bytes m a
-> MiniProtocolCb responderCtx bytes m b
-> RunMiniProtocol
'InitiatorResponderMode initiatorCtx responderCtx bytes m a b
InitiatorAndResponderProtocol
((ExpandedInitiatorContext addr m
-> Channel m ByteString -> m (a, Maybe ByteString))
-> MiniProtocolCb (ExpandedInitiatorContext addr m) ByteString m a
forall ctx bytes (m :: * -> *) a.
(ctx -> Channel m bytes -> m (a, Maybe bytes))
-> MiniProtocolCb ctx bytes m a
MiniProtocolCb (ClientApp addr m a
aSigSubmissionClient NodeToNodeVersion
version))
((ResponderContext addr
-> Channel m ByteString -> m (b, Maybe ByteString))
-> MiniProtocolCb (ResponderContext addr) ByteString m b
forall ctx bytes (m :: * -> *) a.
(ctx -> Channel m bytes -> m (a, Maybe bytes))
-> MiniProtocolCb ctx bytes m a
MiniProtocolCb (ServerApp addr m b
aSigSubmissionServer NodeToNodeVersion
version))
, keepAliveProtocol :: RunMiniProtocol
'InitiatorResponderMode
(ExpandedInitiatorContext addr m)
(ResponderContext addr)
ByteString
m
a
b
keepAliveProtocol =
MiniProtocolCb (ExpandedInitiatorContext addr m) ByteString m a
-> MiniProtocolCb (ResponderContext addr) ByteString m b
-> RunMiniProtocol
'InitiatorResponderMode
(ExpandedInitiatorContext addr m)
(ResponderContext addr)
ByteString
m
a
b
forall initiatorCtx bytes (m :: * -> *) a responderCtx b.
MiniProtocolCb initiatorCtx bytes m a
-> MiniProtocolCb responderCtx bytes m b
-> RunMiniProtocol
'InitiatorResponderMode initiatorCtx responderCtx bytes m a b
InitiatorAndResponderProtocol
((ExpandedInitiatorContext addr m
-> Channel m ByteString -> m (a, Maybe ByteString))
-> MiniProtocolCb (ExpandedInitiatorContext addr m) ByteString m a
forall ctx bytes (m :: * -> *) a.
(ctx -> Channel m bytes -> m (a, Maybe bytes))
-> MiniProtocolCb ctx bytes m a
MiniProtocolCb (ClientApp addr m a
aKeepAliveClient NodeToNodeVersion
version))
((ResponderContext addr
-> Channel m ByteString -> m (b, Maybe ByteString))
-> MiniProtocolCb (ResponderContext addr) ByteString m b
forall ctx bytes (m :: * -> *) a.
(ctx -> Channel m bytes -> m (a, Maybe bytes))
-> MiniProtocolCb ctx bytes m a
MiniProtocolCb (ServerApp addr m b
aKeepAliveServer NodeToNodeVersion
version))
, peerSharingProtocol :: RunMiniProtocol
'InitiatorResponderMode
(ExpandedInitiatorContext addr m)
(ResponderContext addr)
ByteString
m
a
b
peerSharingProtocol =
MiniProtocolCb (ExpandedInitiatorContext addr m) ByteString m a
-> MiniProtocolCb (ResponderContext addr) ByteString m b
-> RunMiniProtocol
'InitiatorResponderMode
(ExpandedInitiatorContext addr m)
(ResponderContext addr)
ByteString
m
a
b
forall initiatorCtx bytes (m :: * -> *) a responderCtx b.
MiniProtocolCb initiatorCtx bytes m a
-> MiniProtocolCb responderCtx bytes m b
-> RunMiniProtocol
'InitiatorResponderMode initiatorCtx responderCtx bytes m a b
InitiatorAndResponderProtocol
((ExpandedInitiatorContext addr m
-> Channel m ByteString -> m (a, Maybe ByteString))
-> MiniProtocolCb (ExpandedInitiatorContext addr m) ByteString m a
forall ctx bytes (m :: * -> *) a.
(ctx -> Channel m bytes -> m (a, Maybe bytes))
-> MiniProtocolCb ctx bytes m a
MiniProtocolCb (ClientApp addr m a
aPeerSharingClient NodeToNodeVersion
version))
((ResponderContext addr
-> Channel m ByteString -> m (b, Maybe ByteString))
-> MiniProtocolCb (ResponderContext addr) ByteString m b
forall ctx bytes (m :: * -> *) a.
(ctx -> Channel m bytes -> m (a, Maybe bytes))
-> MiniProtocolCb ctx bytes m a
MiniProtocolCb (ServerApp addr m b
aPeerSharingServer NodeToNodeVersion
version))
})
NodeToNodeVersion
version
data Codecs crypto addr m =
Codecs {
forall crypto addr (m :: * -> *).
Codecs crypto addr m
-> AnnotatedCodec
(SigSubmission crypto) DeserialiseFailure m ByteString
sigSubmissionCodec :: AnnotatedCodec (SigSubmission crypto)
CBOR.DeserialiseFailure m BL.ByteString
, forall crypto addr (m :: * -> *).
Codecs crypto addr m
-> Codec KeepAlive DeserialiseFailure m ByteString
keepAliveCodec :: Codec KeepAlive
CBOR.DeserialiseFailure m BL.ByteString
, forall crypto addr (m :: * -> *).
Codecs crypto addr m
-> Codec (PeerSharing addr) DeserialiseFailure m ByteString
peerSharingCodec :: Codec (Protocol.PeerSharing addr)
CBOR.DeserialiseFailure m BL.ByteString
}
dmqCodecs :: ( Crypto crypto
, MonadST m
)
=> (addr -> CBOR.Encoding)
-> (forall s. CBOR.Decoder s addr)
-> Codecs crypto addr m
dmqCodecs :: forall crypto (m :: * -> *) addr.
(Crypto crypto, MonadST m) =>
(addr -> Encoding)
-> (forall s. Decoder s addr) -> Codecs crypto addr m
dmqCodecs addr -> Encoding
encodeAddr forall s. Decoder s addr
decodeAddr =
Codecs {
sigSubmissionCodec :: AnnotatedCodec
(SigSubmission crypto) DeserialiseFailure m ByteString
sigSubmissionCodec = AnnotatedCodec
(SigSubmission crypto) DeserialiseFailure m ByteString
forall crypto (m :: * -> *).
(Crypto crypto, MonadST m) =>
AnnotatedCodec
(SigSubmission crypto) DeserialiseFailure m ByteString
codecSigSubmission
, keepAliveCodec :: Codec KeepAlive DeserialiseFailure m ByteString
keepAliveCodec = Codec KeepAlive DeserialiseFailure m ByteString
forall (m :: * -> *).
MonadST m =>
Codec KeepAlive DeserialiseFailure m ByteString
codecKeepAlive_v2
, peerSharingCodec :: Codec (PeerSharing addr) DeserialiseFailure m ByteString
peerSharingCodec = (addr -> Encoding)
-> (forall s. Decoder s addr)
-> Codec (PeerSharing addr) DeserialiseFailure m ByteString
forall (m :: * -> *) peerAddress.
MonadST m =>
(peerAddress -> Encoding)
-> (forall s. Decoder s peerAddress)
-> Codec (PeerSharing peerAddress) DeserialiseFailure m ByteString
codecPeerSharing addr -> Encoding
encodeAddr Decoder s addr
forall s. Decoder s addr
decodeAddr
}
data LimitsAndTimeouts crypto addr =
LimitsAndTimeouts {
forall crypto addr.
LimitsAndTimeouts crypto addr -> MiniProtocolLimits
sigSubmissionLimits
:: MiniProtocolLimits
, forall crypto addr.
LimitsAndTimeouts crypto addr
-> ProtocolSizeLimits (SigSubmission crypto) ByteString
sigSubmissionSizeLimits
:: ProtocolSizeLimits (SigSubmission crypto) BL.ByteString
, forall crypto addr.
LimitsAndTimeouts crypto addr
-> ProtocolTimeLimits (SigSubmission crypto)
sigSubmissionTimeLimits
:: ProtocolTimeLimits (SigSubmission crypto)
, forall crypto addr.
LimitsAndTimeouts crypto addr -> MiniProtocolLimits
keepAliveLimits
:: MiniProtocolLimits
, forall crypto addr.
LimitsAndTimeouts crypto addr
-> ProtocolSizeLimits KeepAlive ByteString
keepAliveSizeLimits
:: ProtocolSizeLimits KeepAlive BL.ByteString
, forall crypto addr.
LimitsAndTimeouts crypto addr -> ProtocolTimeLimits KeepAlive
keepAliveTimeLimits
:: ProtocolTimeLimits KeepAlive
, forall crypto addr.
LimitsAndTimeouts crypto addr -> MiniProtocolLimits
peerSharingLimits
:: MiniProtocolLimits
, forall crypto addr.
LimitsAndTimeouts crypto addr
-> ProtocolTimeLimits (PeerSharing addr)
peerSharingTimeLimits
:: ProtocolTimeLimits (Protocol.PeerSharing addr)
, forall crypto addr.
LimitsAndTimeouts crypto addr
-> ProtocolSizeLimits (PeerSharing addr) ByteString
peerSharingSizeLimits
:: ProtocolSizeLimits (Protocol.PeerSharing addr) BL.ByteString
}
dmqLimitsAndTimeouts :: LimitsAndTimeouts crypto addr
dmqLimitsAndTimeouts :: forall crypto addr. LimitsAndTimeouts crypto addr
dmqLimitsAndTimeouts =
LimitsAndTimeouts {
sigSubmissionLimits :: MiniProtocolLimits
sigSubmissionLimits =
MiniProtocolLimits {
maximumIngressQueue :: Int
maximumIngressQueue = Int
forall a. Bounded a => a
maxBound
}
, sigSubmissionTimeLimits :: ProtocolTimeLimits (SigSubmission crypto)
sigSubmissionTimeLimits = ProtocolTimeLimits (SigSubmission crypto)
forall crypto. ProtocolTimeLimits (SigSubmission crypto)
timeLimitsSigSubmission
, sigSubmissionSizeLimits :: ProtocolSizeLimits (SigSubmission crypto) ByteString
sigSubmissionSizeLimits = (ByteString -> Word)
-> ProtocolSizeLimits (SigSubmission crypto) ByteString
forall crypto bytes.
(bytes -> Word) -> ProtocolSizeLimits (SigSubmission crypto) bytes
byteLimitsSigSubmission ByteString -> Word
size
, keepAliveLimits :: MiniProtocolLimits
keepAliveLimits =
MiniProtocolLimits {
maximumIngressQueue :: Int
maximumIngressQueue = Int -> Int
addSafetyMargin Int
1280
}
, keepAliveTimeLimits :: ProtocolTimeLimits KeepAlive
keepAliveTimeLimits = ProtocolTimeLimits KeepAlive
timeLimitsKeepAlive
, keepAliveSizeLimits :: ProtocolSizeLimits KeepAlive ByteString
keepAliveSizeLimits = (ByteString -> Word) -> ProtocolSizeLimits KeepAlive ByteString
forall bytes. (bytes -> Word) -> ProtocolSizeLimits KeepAlive bytes
byteLimitsKeepAlive ByteString -> Word
size
, peerSharingLimits :: MiniProtocolLimits
peerSharingLimits =
MiniProtocolLimits {
maximumIngressQueue :: Int
maximumIngressQueue = Int
4 Int -> Int -> Int
forall a. Num a => a -> a -> a
* Int
1440
}
, peerSharingTimeLimits :: ProtocolTimeLimits (PeerSharing addr)
peerSharingTimeLimits = ProtocolTimeLimits (PeerSharing addr)
forall peerAddress. ProtocolTimeLimits (PeerSharing peerAddress)
timeLimitsPeerSharing
, peerSharingSizeLimits :: ProtocolSizeLimits (PeerSharing addr) ByteString
peerSharingSizeLimits = (ByteString -> Word)
-> ProtocolSizeLimits (PeerSharing addr) ByteString
forall peerAddress bytes.
(bytes -> Word)
-> ProtocolSizeLimits (PeerSharing peerAddress) bytes
byteLimitsPeerSharing ByteString -> Word
size
}
where
size :: BL.ByteString -> Word
size :: ByteString -> Word
size = Int64 -> Word
forall a b. (Integral a, Num b) => a -> b
fromIntegral (Int64 -> Word) -> (ByteString -> Int64) -> ByteString -> Word
forall b c a. (b -> c) -> (a -> b) -> a -> c
. ByteString -> Int64
BL.length
type HandshakeTr ntnAddr = Mx.WithBearer (ConnectionId ntnAddr) (TraceSendRecv (Handshake NodeToNodeVersion CBOR.Term))
ntnHandshakeArguments
:: MonadST m
=> Tracer m (HandshakeTr ntnAddr)
-> HandshakeArguments
(ConnectionId ntnAddr)
NodeToNodeVersion
NodeToNodeVersionData
m
ntnHandshakeArguments :: forall (m :: * -> *) ntnAddr.
MonadST m =>
Tracer m (HandshakeTr ntnAddr)
-> HandshakeArguments
(ConnectionId ntnAddr) NodeToNodeVersion NodeToNodeVersionData m
ntnHandshakeArguments Tracer m (HandshakeTr ntnAddr)
tracer =
HandshakeArguments {
haHandshakeTracer :: Tracer m (HandshakeTr ntnAddr)
haHandshakeTracer = Tracer m (HandshakeTr ntnAddr)
tracer
, haBearerTracer :: Tracer m (WithBearer (ConnectionId ntnAddr) BearerTrace)
haBearerTracer = Tracer m (WithBearer (ConnectionId ntnAddr) BearerTrace)
forall (m :: * -> *) a. Applicative m => Tracer m a
nullTracer
, haHandshakeCodec :: Codec
(Handshake NodeToNodeVersion Term) DeserialiseFailure m ByteString
haHandshakeCodec = CodecCBORTerm (Text, Maybe Int) NodeToNodeVersion
-> Codec
(Handshake NodeToNodeVersion Term) DeserialiseFailure m ByteString
forall vNumber (m :: * -> *) failure.
(MonadST m, Ord vNumber, Show failure) =>
CodecCBORTerm (failure, Maybe Int) vNumber
-> Codec (Handshake vNumber Term) DeserialiseFailure m ByteString
codecHandshake CodecCBORTerm (Text, Maybe Int) NodeToNodeVersion
nodeToNodeVersionCodec
, haVersionDataCodec :: VersionDataCodec Term NodeToNodeVersion NodeToNodeVersionData
haVersionDataCodec = (NodeToNodeVersion -> CodecCBORTerm Text NodeToNodeVersionData)
-> VersionDataCodec Term NodeToNodeVersion NodeToNodeVersionData
forall vNumber vData.
(vNumber -> CodecCBORTerm Text vData)
-> VersionDataCodec Term vNumber vData
cborTermVersionDataCodec NodeToNodeVersion -> CodecCBORTerm Text NodeToNodeVersionData
nodeToNodeCodecCBORTerm
, haAcceptVersion :: NodeToNodeVersionData
-> NodeToNodeVersionData -> Accept NodeToNodeVersionData
haAcceptVersion = NodeToNodeVersionData
-> NodeToNodeVersionData -> Accept NodeToNodeVersionData
forall v. Acceptable v => v -> v -> Accept v
acceptableVersion
, haQueryVersion :: NodeToNodeVersionData -> Bool
haQueryVersion = NodeToNodeVersionData -> Bool
forall v. Queryable v => v -> Bool
queryVersion
, haTimeLimits :: ProtocolTimeLimits (Handshake NodeToNodeVersion Term)
haTimeLimits = ProtocolTimeLimits (Handshake NodeToNodeVersion Term)
forall {k} (vNumber :: k).
ProtocolTimeLimits (Handshake vNumber Term)
timeLimitsHandshake
}
stdVersionDataNTN :: NetworkMagic
-> DiffusionMode
-> PeerSharing
-> NodeToNodeVersionData
stdVersionDataNTN :: NetworkMagic
-> DiffusionMode -> PeerSharing -> NodeToNodeVersionData
stdVersionDataNTN NetworkMagic
networkMagic DiffusionMode
diffusionMode PeerSharing
peerSharing =
NodeToNodeVersionData
{ NetworkMagic
networkMagic :: NetworkMagic
networkMagic :: NetworkMagic
networkMagic
, DiffusionMode
diffusionMode :: DiffusionMode
diffusionMode :: DiffusionMode
diffusionMode
, PeerSharing
peerSharing :: PeerSharing
peerSharing :: PeerSharing
peerSharing
, query :: Bool
query = Bool
False
}
_MAX_SIGS_TO_ACK :: NumTxIdsToAck
_MAX_SIGS_TO_ACK :: NumTxIdsToAck
_MAX_SIGS_TO_ACK = NumTxIdsToAck
20
_SIG_SUBMISSION_INIT_DELAY :: TxSubmissionInitDelay
_SIG_SUBMISSION_INIT_DELAY :: TxSubmissionInitDelay
_SIG_SUBMISSION_INIT_DELAY = TxSubmissionInitDelay
NoTxSubmissionInitDelay
addSafetyMargin :: Int -> Int
addSafetyMargin :: Int -> Int
addSafetyMargin Int
x = Int
x Int -> Int -> Int
forall a. Num a => a -> a -> a
+ Int
x Int -> Int -> Int
forall a. Integral a => a -> a -> a
`div` Int
10