{-# LANGUAGE DataKinds           #-}
{-# LANGUAGE FlexibleContexts    #-}
{-# LANGUAGE NamedFieldPuns      #-}
{-# LANGUAGE OverloadedStrings   #-}
{-# LANGUAGE RankNTypes          #-}
{-# LANGUAGE ScopedTypeVariables #-}

module DMQ.NodeToNode
  ( RemoteAddress
  , module DMQ.NodeToNode.Version
  , ClientApp
  , ServerApp
  , Apps (..)
  , ntnApps
  , Protocols (..)
  , nodeToNodeProtocols
  , initiatorProtocols
  , initiatorAndResponderProtocols
  , dmqCodecs
  , LimitsAndTimeouts
  , dmqLimitsAndTimeouts
  , HandshakeTr
  , ntnHandshakeArguments
  , stdVersionDataNTN
  ) where


import Control.Applicative (Alternative)
import Control.Concurrent.Class.MonadMVar.Strict
import Control.Concurrent.Class.MonadSTM.Strict
import Control.Monad.Class.MonadAsync
import Control.Monad.Class.MonadFork
import Control.Monad.Class.MonadST
import Control.Monad.Class.MonadThrow
import Control.Monad.Class.MonadTimer.SI
import Control.Tracer (Tracer, nullTracer)

import Codec.CBOR.Decoding qualified as CBOR
import Codec.CBOR.Encoding qualified as CBOR
import Codec.CBOR.Read qualified as CBOR
import Codec.CBOR.Term qualified as CBOR
import Data.Aeson qualified as Aeson
import Data.ByteString.Lazy qualified as BL
import Data.Functor.Contravariant ((>$<))
import Data.Hashable (Hashable)
import Data.Typeable
import Data.Void (Void)
import System.Random (mkStdGen)

import Network.Mux.Trace qualified as Mx
import Network.Mux.Types (Mode (..))
import Network.Mux.Types qualified as Mx
import Network.TypedProtocol.Codec (AnnotatedCodec, Codec)

import Cardano.KESAgent.KES.Crypto (Crypto (..))

import DMQ.Configuration (Configuration, Configuration' (..), I (..))
import DMQ.Diffusion.NodeKernel (NodeKernel (..))
import DMQ.NodeToNode.Version
import DMQ.Protocol.SigSubmission.Codec
import DMQ.Protocol.SigSubmission.Type
import DMQ.Tracer

import Ouroboros.Network.BlockFetch.ClientRegistry (bracketKeepAliveClient)
import Ouroboros.Network.Channel (Channel)
import Ouroboros.Network.ConnectionId (ConnectionId (..))
import Ouroboros.Network.Context (ExpandedInitiatorContext (..),
           ResponderContext (..))
import Ouroboros.Network.DiffusionMode
import Ouroboros.Network.Driver.Limits (runAnnotatedPeerWithLimits,
           runPeerWithLimits, runPipelinedAnnotatedPeerWithLimits)
import Ouroboros.Network.Driver.Simple (TraceSendRecv)
import Ouroboros.Network.Handshake.Acceptable (Acceptable (..))
import Ouroboros.Network.Handshake.Queryable (Queryable (..))
import Ouroboros.Network.KeepAlive (KeepAliveInterval (..), keepAliveClient,
           keepAliveServer)
import Ouroboros.Network.Magic (NetworkMagic (..))
import Ouroboros.Network.Mux (MiniProtocol (..), MiniProtocolCb (..),
           MiniProtocolLimits (..), OuroborosBundle,
           OuroborosBundleWithExpandedCtx, RunMiniProtocol (..),
           StartOnDemandOrEagerly (..), TemperatureBundle (..),
           WithProtocolTemperature (..))
import Ouroboros.Network.PeerSelection (PeerSharing (..))
import Ouroboros.Network.PeerSharing (bracketPeerSharingClient,
           peerSharingClient, peerSharingServer)
import Ouroboros.Network.Snocket (RemoteAddress)
import Ouroboros.Network.TxSubmission.Inbound.V2 as SigSubmission
import Ouroboros.Network.TxSubmission.Mempool.Simple qualified as Mempool
import Ouroboros.Network.TxSubmission.Outbound

import Ouroboros.Network.OrphanInstances ()

import Ouroboros.Network.Protocol.Handshake (Handshake, HandshakeArguments (..))
import Ouroboros.Network.Protocol.Handshake.Codec (cborTermVersionDataCodec,
           codecHandshake, timeLimitsHandshake)
import Ouroboros.Network.Protocol.KeepAlive.Client (keepAliveClientPeer)
import Ouroboros.Network.Protocol.KeepAlive.Codec (byteLimitsKeepAlive,
           codecKeepAlive_v2, timeLimitsKeepAlive)
import Ouroboros.Network.Protocol.KeepAlive.Server (keepAliveServerPeer)
import Ouroboros.Network.Protocol.KeepAlive.Type (KeepAlive)
import Ouroboros.Network.Protocol.Limits (ProtocolSizeLimits,
           ProtocolTimeLimits)
import Ouroboros.Network.Protocol.PeerSharing.Client (peerSharingClientPeer)
import Ouroboros.Network.Protocol.PeerSharing.Codec (byteLimitsPeerSharing,
           codecPeerSharing, timeLimitsPeerSharing)
import Ouroboros.Network.Protocol.PeerSharing.Server (peerSharingServerPeer)
import Ouroboros.Network.Protocol.PeerSharing.Type qualified as Protocol
import Ouroboros.Network.Protocol.TxSubmission2.Client (txSubmissionClientPeer)
import Ouroboros.Network.Protocol.TxSubmission2.Server
           (txSubmissionServerPeerPipelined)

-- TODO: if we add `versionNumber` to `ctx` we could use `RunMiniProtocolCb`.
-- This makes sense, since `ctx` already contains `versionData`.
type ClientApp addr m a =
     NodeToNodeVersion
  -> ExpandedInitiatorContext addr m
  -> Channel m BL.ByteString
  -> m (a, Maybe BL.ByteString)

type ServerApp addr m a =
     NodeToNodeVersion
  -> ResponderContext addr
  -> Channel m BL.ByteString
  -> m (a, Maybe BL.ByteString)

data Apps addr m a b =
  Apps {
    -- | Start a sig-submission client
    forall addr (m :: * -> *) a b.
Apps addr m a b -> ClientApp addr m a
aSigSubmissionClient :: ClientApp addr m a

    -- | Start a sig-submission server
  , forall addr (m :: * -> *) a b.
Apps addr m a b -> ServerApp addr m b
aSigSubmissionServer :: ServerApp addr m b

    -- | Start a keep-alive client.
  , forall addr (m :: * -> *) a b.
Apps addr m a b -> ClientApp addr m a
aKeepAliveClient     :: ClientApp addr m a

    -- | Start a keep-alive server.
  , forall addr (m :: * -> *) a b.
Apps addr m a b -> ServerApp addr m b
aKeepAliveServer     :: ServerApp addr m b

    -- | Start a peer-sharing client.
  , forall addr (m :: * -> *) a b.
Apps addr m a b -> ClientApp addr m a
aPeerSharingClient   :: ClientApp addr m a

    -- | Start a peer-sharing server.
  , forall addr (m :: * -> *) a b.
Apps addr m a b -> ServerApp addr m b
aPeerSharingServer   :: ServerApp addr m b
  }

ntnApps
  :: forall crypto m addr .
    ( Crypto crypto
    , Typeable crypto
    , Alternative (STM m)
    , MonadAsync m
    , MonadDelay m
    , MonadFork m
    , MonadMask m
    , MonadMVar m
    , MonadThrow (STM m)
    , MonadTimer m
    , Ord addr
    , Show addr
    , Hashable addr
    , Aeson.ToJSON addr
    )
 => (forall ev. Aeson.ToJSON ev => Tracer m (WithEventType ev))
 -> Configuration
 -> NodeKernel crypto addr m
 -> Codecs crypto addr m
 -> LimitsAndTimeouts crypto addr
 -> TxDecisionPolicy
 -> Apps addr m () ()
ntnApps :: forall crypto (m :: * -> *) addr.
(Crypto crypto, Typeable crypto, Alternative (STM m), MonadAsync m,
 MonadDelay m, MonadFork m, MonadMask m, MonadMVar m,
 MonadThrow (STM m), MonadTimer m, Ord addr, Show addr,
 Hashable addr, ToJSON addr) =>
(forall ev. ToJSON ev => Tracer m (WithEventType ev))
-> Configuration
-> NodeKernel crypto addr m
-> Codecs crypto addr m
-> LimitsAndTimeouts crypto addr
-> TxDecisionPolicy
-> Apps addr m () ()
ntnApps
    forall ev. ToJSON ev => Tracer m (WithEventType ev)
tracer
    Configuration {
      dmqcSigSubmissionClientProtocolTracer :: forall (f :: * -> *). Configuration' f -> f Bool
dmqcSigSubmissionClientProtocolTracer  = I Bool
sigSubmissionClientProtocolTracer
    , dmqcSigSubmissionServerProtocolTracer :: forall (f :: * -> *). Configuration' f -> f Bool
dmqcSigSubmissionServerProtocolTracer  = I Bool
sigSubmissionServerProtocolTracer
    , dmqcKeepAliveClientProtocolTracer :: forall (f :: * -> *). Configuration' f -> f Bool
dmqcKeepAliveClientProtocolTracer      = I Bool
keepAliveClientProtocolTracer
    , dmqcKeepAliveServerProtocolTracer :: forall (f :: * -> *). Configuration' f -> f Bool
dmqcKeepAliveServerProtocolTracer      = I Bool
keepAliveServerProtocolTracer
    , dmqcPeerSharingClientProtocolTracer :: forall (f :: * -> *). Configuration' f -> f Bool
dmqcPeerSharingClientProtocolTracer    = I Bool
peerSharingClientProtocolTracer
    , dmqcPeerSharingServerProtocolTracer :: forall (f :: * -> *). Configuration' f -> f Bool
dmqcPeerSharingServerProtocolTracer    = I Bool
peerSharingServerProtocolTracer

    , dmqcSigSubmissionOutboundTracer :: forall (f :: * -> *). Configuration' f -> f Bool
dmqcSigSubmissionOutboundTracer        = I Bool
sigSubmissionOutboundTracer
    , dmqcSigSubmissionInboundTracer :: forall (f :: * -> *). Configuration' f -> f Bool
dmqcSigSubmissionInboundTracer         = I Bool
sigSubmissionInboundTracer
    , dmqcSigSubmissionLogicTracer :: forall (f :: * -> *). Configuration' f -> f Bool
dmqcSigSubmissionLogicTracer           = I Bool
sigSubmissionLogicTracer
    }
    NodeKernel {
      FetchClientRegistry (ConnectionId addr) () () m
fetchClientRegistry :: FetchClientRegistry (ConnectionId addr) () () m
fetchClientRegistry :: forall crypto ntnAddr (m :: * -> *).
NodeKernel crypto ntnAddr m
-> FetchClientRegistry (ConnectionId ntnAddr) () () m
fetchClientRegistry
    , PeerSharingRegistry addr m
peerSharingRegistry :: PeerSharingRegistry addr m
peerSharingRegistry :: forall crypto ntnAddr (m :: * -> *).
NodeKernel crypto ntnAddr m -> PeerSharingRegistry ntnAddr m
peerSharingRegistry
    , PeerSharingAPI addr StdGen m
peerSharingAPI :: PeerSharingAPI addr StdGen m
peerSharingAPI :: forall crypto ntnAddr (m :: * -> *).
NodeKernel crypto ntnAddr m -> PeerSharingAPI ntnAddr StdGen m
peerSharingAPI
    , Mempool m (Sig crypto)
mempool :: Mempool m (Sig crypto)
mempool :: forall crypto ntnAddr (m :: * -> *).
NodeKernel crypto ntnAddr m -> Mempool m (Sig crypto)
mempool
    , TxChannelsVar m addr SigId (Sig crypto)
sigChannelVar :: TxChannelsVar m addr SigId (Sig crypto)
sigChannelVar :: forall crypto ntnAddr (m :: * -> *).
NodeKernel crypto ntnAddr m
-> TxChannelsVar m ntnAddr SigId (Sig crypto)
sigChannelVar
    , TxMempoolSem m
sigMempoolSem :: TxMempoolSem m
sigMempoolSem :: forall crypto ntnAddr (m :: * -> *).
NodeKernel crypto ntnAddr m -> TxMempoolSem m
sigMempoolSem
    , SharedTxStateVar m addr SigId (Sig crypto)
sigSharedTxStateVar :: SharedTxStateVar m addr SigId (Sig crypto)
sigSharedTxStateVar :: forall crypto ntnAddr (m :: * -> *).
NodeKernel crypto ntnAddr m
-> SharedTxStateVar m ntnAddr SigId (Sig crypto)
sigSharedTxStateVar
    }
    Codecs {
      AnnotatedCodec
  (SigSubmission crypto) DeserialiseFailure m ByteString
sigSubmissionCodec :: AnnotatedCodec
  (SigSubmission crypto) DeserialiseFailure m ByteString
sigSubmissionCodec :: forall crypto addr (m :: * -> *).
Codecs crypto addr m
-> AnnotatedCodec
     (SigSubmission crypto) DeserialiseFailure m ByteString
sigSubmissionCodec
    , Codec KeepAlive DeserialiseFailure m ByteString
keepAliveCodec :: Codec KeepAlive DeserialiseFailure m ByteString
keepAliveCodec :: forall crypto addr (m :: * -> *).
Codecs crypto addr m
-> Codec KeepAlive DeserialiseFailure m ByteString
keepAliveCodec
    , Codec (PeerSharing addr) DeserialiseFailure m ByteString
peerSharingCodec :: Codec (PeerSharing addr) DeserialiseFailure m ByteString
peerSharingCodec :: forall crypto addr (m :: * -> *).
Codecs crypto addr m
-> Codec (PeerSharing addr) DeserialiseFailure m ByteString
peerSharingCodec
    }
    LimitsAndTimeouts {
      ProtocolSizeLimits (SigSubmission crypto) ByteString
sigSubmissionSizeLimits :: ProtocolSizeLimits (SigSubmission crypto) ByteString
sigSubmissionSizeLimits :: forall crypto addr.
LimitsAndTimeouts crypto addr
-> ProtocolSizeLimits (SigSubmission crypto) ByteString
sigSubmissionSizeLimits
    , ProtocolTimeLimits (SigSubmission crypto)
sigSubmissionTimeLimits :: ProtocolTimeLimits (SigSubmission crypto)
sigSubmissionTimeLimits :: forall crypto addr.
LimitsAndTimeouts crypto addr
-> ProtocolTimeLimits (SigSubmission crypto)
sigSubmissionTimeLimits
    , ProtocolSizeLimits KeepAlive ByteString
keepAliveSizeLimits :: ProtocolSizeLimits KeepAlive ByteString
keepAliveSizeLimits :: forall crypto addr.
LimitsAndTimeouts crypto addr
-> ProtocolSizeLimits KeepAlive ByteString
keepAliveSizeLimits
    , ProtocolTimeLimits KeepAlive
keepAliveTimeLimits :: ProtocolTimeLimits KeepAlive
keepAliveTimeLimits :: forall crypto addr.
LimitsAndTimeouts crypto addr -> ProtocolTimeLimits KeepAlive
keepAliveTimeLimits
    , ProtocolTimeLimits (PeerSharing addr)
peerSharingTimeLimits :: ProtocolTimeLimits (PeerSharing addr)
peerSharingTimeLimits :: forall crypto addr.
LimitsAndTimeouts crypto addr
-> ProtocolTimeLimits (PeerSharing addr)
peerSharingTimeLimits
    , ProtocolSizeLimits (PeerSharing addr) ByteString
peerSharingSizeLimits :: ProtocolSizeLimits (PeerSharing addr) ByteString
peerSharingSizeLimits :: forall crypto addr.
LimitsAndTimeouts crypto addr
-> ProtocolSizeLimits (PeerSharing addr) ByteString
peerSharingSizeLimits
    }
    TxDecisionPolicy
sigDecisionPolicy
    =
    Apps {
      ClientApp addr m ()
aSigSubmissionClient :: ClientApp addr m ()
aSigSubmissionClient :: ClientApp addr m ()
aSigSubmissionClient
    , ServerApp addr m ()
aSigSubmissionServer :: ServerApp addr m ()
aSigSubmissionServer :: ServerApp addr m ()
aSigSubmissionServer
    , ClientApp addr m ()
aKeepAliveClient :: ClientApp addr m ()
aKeepAliveClient :: ClientApp addr m ()
aKeepAliveClient
    , ServerApp addr m ()
aKeepAliveServer :: ServerApp addr m ()
aKeepAliveServer :: ServerApp addr m ()
aKeepAliveServer
    , ClientApp addr m ()
aPeerSharingClient :: ClientApp addr m ()
aPeerSharingClient :: ClientApp addr m ()
aPeerSharingClient
    , ServerApp addr m ()
aPeerSharingServer :: ServerApp addr m ()
aPeerSharingServer :: ServerApp addr m ()
aPeerSharingServer
    }
  where
    sigSize :: Sig crypto -> SizeInBytes
    sigSize :: Sig crypto -> SizeInBytes
sigSize Sig crypto
_ = SizeInBytes
0 -- TODO

    mempoolReader :: TxSubmissionMempoolReader SigId (Sig crypto) Int m
mempoolReader = (Sig crypto -> SigId)
-> (Sig crypto -> SizeInBytes)
-> Mempool m (Sig crypto)
-> TxSubmissionMempoolReader SigId (Sig crypto) Int m
forall tx txid (m :: * -> *).
(MonadSTM m, Eq txid) =>
(tx -> txid)
-> (tx -> SizeInBytes)
-> Mempool m tx
-> TxSubmissionMempoolReader txid tx Int m
Mempool.getReader Sig crypto -> SigId
forall crypto. Sig crypto -> SigId
sigId Sig crypto -> SizeInBytes
sigSize Mempool m (Sig crypto)
mempool
    -- TODO: invalid signatures are just omitted from the mempool. For DMQ
    -- we need to validate signatures when we received them, and shutdown
    -- connection if we receive one, rather than validate them in the
    -- mempool.
    mempoolWriter :: TxSubmissionMempoolWriter SigId (Sig crypto) Int m
mempoolWriter = (Sig crypto -> SigId)
-> m ()
-> (() -> Sig crypto -> Either Void ())
-> (Void -> Bool)
-> Mempool m (Sig crypto)
-> TxSubmissionMempoolWriter SigId (Sig crypto) Int m
forall tx txid ctx failure (m :: * -> *).
(MonadSTM m, MonadThrow m, Ord txid, Typeable txid,
 Typeable failure, Show txid, Show failure) =>
(tx -> txid)
-> m ctx
-> (ctx -> tx -> Either failure ())
-> (failure -> Bool)
-> Mempool m tx
-> TxSubmissionMempoolWriter txid tx Int m
Mempool.getWriter Sig crypto -> SigId
forall crypto. Sig crypto -> SigId
sigId
                                      (() -> m ()
forall a. a -> m a
forall (f :: * -> *) a. Applicative f => a -> f a
pure ())
                                      (\()
_ Sig crypto
_ -> () -> Either Void ()
forall a b. b -> Either a b
Right () :: Either Void ())
                                      (\Void
_ -> Bool
True)
                                      Mempool m (Sig crypto)
mempool

    aSigSubmissionClient
      :: NodeToNodeVersion
      -> ExpandedInitiatorContext addr m
      -> Channel m BL.ByteString
      -> m ((), Maybe BL.ByteString)
    aSigSubmissionClient :: ClientApp addr m ()
aSigSubmissionClient NodeToNodeVersion
version
                         ExpandedInitiatorContext {
                           eicConnectionId :: forall addr (m :: * -> *).
ExpandedInitiatorContext addr m -> ConnectionId addr
eicConnectionId   = ConnectionId addr
connId,
                           eicControlMessage :: forall addr (m :: * -> *).
ExpandedInitiatorContext addr m -> ControlMessageSTM m
eicControlMessage = ControlMessageSTM m
controlMessage
                         } Channel m ByteString
channel =
      Tracer m (TraceSendRecv (SigSubmission crypto))
-> AnnotatedCodec
     (SigSubmission crypto) DeserialiseFailure m ByteString
-> ProtocolSizeLimits (SigSubmission crypto) ByteString
-> ProtocolTimeLimits (SigSubmission crypto)
-> Channel m ByteString
-> Peer (SigSubmission crypto) 'AsClient 'NonPipelined 'StInit m ()
-> m ((), Maybe ByteString)
forall ps (st :: ps) (pr :: PeerRole) failure bytes (m :: * -> *)
       a.
(MonadAsync m, MonadFork m, MonadMask m, MonadThrow (STM m),
 MonadTimer m, Monoid bytes, ShowProxy ps,
 forall (st' :: ps) stok. (stok ~ StateToken st') => Show stok,
 Show failure) =>
Tracer m (TraceSendRecv ps)
-> AnnotatedCodec ps failure m bytes
-> ProtocolSizeLimits ps bytes
-> ProtocolTimeLimits ps
-> Channel m bytes
-> Peer ps pr 'NonPipelined st m a
-> m (a, Maybe bytes)
runAnnotatedPeerWithLimits
        (if Bool
sigSubmissionClientProtocolTracer
          then String
-> WithBearer
     (ConnectionId addr) (TraceSendRecv (SigSubmission crypto))
-> WithEventType
     (WithBearer
        (ConnectionId addr) (TraceSendRecv (SigSubmission crypto)))
forall a. String -> a -> WithEventType a
WithEventType String
"SigSubmission.Protocol.Client" (WithBearer
   (ConnectionId addr) (TraceSendRecv (SigSubmission crypto))
 -> WithEventType
      (WithBearer
         (ConnectionId addr) (TraceSendRecv (SigSubmission crypto))))
-> (TraceSendRecv (SigSubmission crypto)
    -> WithBearer
         (ConnectionId addr) (TraceSendRecv (SigSubmission crypto)))
-> TraceSendRecv (SigSubmission crypto)
-> WithEventType
     (WithBearer
        (ConnectionId addr) (TraceSendRecv (SigSubmission crypto)))
forall b c a. (b -> c) -> (a -> b) -> a -> c
. ConnectionId addr
-> TraceSendRecv (SigSubmission crypto)
-> WithBearer
     (ConnectionId addr) (TraceSendRecv (SigSubmission crypto))
forall peerid a. peerid -> a -> WithBearer peerid a
Mx.WithBearer ConnectionId addr
connId (TraceSendRecv (SigSubmission crypto)
 -> WithEventType
      (WithBearer
         (ConnectionId addr) (TraceSendRecv (SigSubmission crypto))))
-> Tracer
     m
     (WithEventType
        (WithBearer
           (ConnectionId addr) (TraceSendRecv (SigSubmission crypto))))
-> Tracer m (TraceSendRecv (SigSubmission crypto))
forall (f :: * -> *) a b. Contravariant f => (a -> b) -> f b -> f a
>$< Tracer
  m
  (WithEventType
     (WithBearer
        (ConnectionId addr) (TraceSendRecv (SigSubmission crypto))))
forall ev. ToJSON ev => Tracer m (WithEventType ev)
tracer
          else Tracer m (TraceSendRecv (SigSubmission crypto))
forall (m :: * -> *) a. Applicative m => Tracer m a
nullTracer)
        AnnotatedCodec
  (SigSubmission crypto) DeserialiseFailure m ByteString
sigSubmissionCodec
        ProtocolSizeLimits (SigSubmission crypto) ByteString
sigSubmissionSizeLimits
        ProtocolTimeLimits (SigSubmission crypto)
sigSubmissionTimeLimits
        Channel m ByteString
channel
        (Peer (SigSubmission crypto) 'AsClient 'NonPipelined 'StInit m ()
 -> m ((), Maybe ByteString))
-> Peer (SigSubmission crypto) 'AsClient 'NonPipelined 'StInit m ()
-> m ((), Maybe ByteString)
forall a b. (a -> b) -> a -> b
$ TxSubmissionClient SigId (Sig crypto) m ()
-> Peer (SigSubmission crypto) 'AsClient 'NonPipelined 'StInit m ()
forall txid tx (m :: * -> *) a.
Monad m =>
TxSubmissionClient txid tx m a
-> Client (TxSubmission2 txid tx) 'NonPipelined 'StInit m a
txSubmissionClientPeer
        (TxSubmissionClient SigId (Sig crypto) m ()
 -> Peer
      (SigSubmission crypto) 'AsClient 'NonPipelined 'StInit m ())
-> TxSubmissionClient SigId (Sig crypto) m ()
-> Peer (SigSubmission crypto) 'AsClient 'NonPipelined 'StInit m ()
forall a b. (a -> b) -> a -> b
$ Tracer m (TraceTxSubmissionOutbound SigId (Sig crypto))
-> NumTxIdsToAck
-> TxSubmissionMempoolReader SigId (Sig crypto) Int m
-> NodeToNodeVersion
-> ControlMessageSTM m
-> TxSubmissionClient SigId (Sig crypto) m ()
forall version txid tx idx (m :: * -> *).
(Ord txid, Ord idx, MonadSTM m, MonadThrow m) =>
Tracer m (TraceTxSubmissionOutbound txid tx)
-> NumTxIdsToAck
-> TxSubmissionMempoolReader txid tx idx m
-> version
-> ControlMessageSTM m
-> TxSubmissionClient txid tx m ()
txSubmissionOutbound
            (if Bool
sigSubmissionOutboundTracer
               then String
-> WithBearer
     (ConnectionId addr) (TraceTxSubmissionOutbound SigId (Sig crypto))
-> WithEventType
     (WithBearer
        (ConnectionId addr) (TraceTxSubmissionOutbound SigId (Sig crypto)))
forall a. String -> a -> WithEventType a
WithEventType String
"SigSubmission.Outbound" (WithBearer
   (ConnectionId addr) (TraceTxSubmissionOutbound SigId (Sig crypto))
 -> WithEventType
      (WithBearer
         (ConnectionId addr)
         (TraceTxSubmissionOutbound SigId (Sig crypto))))
-> (TraceTxSubmissionOutbound SigId (Sig crypto)
    -> WithBearer
         (ConnectionId addr) (TraceTxSubmissionOutbound SigId (Sig crypto)))
-> TraceTxSubmissionOutbound SigId (Sig crypto)
-> WithEventType
     (WithBearer
        (ConnectionId addr) (TraceTxSubmissionOutbound SigId (Sig crypto)))
forall b c a. (b -> c) -> (a -> b) -> a -> c
. ConnectionId addr
-> TraceTxSubmissionOutbound SigId (Sig crypto)
-> WithBearer
     (ConnectionId addr) (TraceTxSubmissionOutbound SigId (Sig crypto))
forall peerid a. peerid -> a -> WithBearer peerid a
Mx.WithBearer ConnectionId addr
connId (TraceTxSubmissionOutbound SigId (Sig crypto)
 -> WithEventType
      (WithBearer
         (ConnectionId addr)
         (TraceTxSubmissionOutbound SigId (Sig crypto))))
-> Tracer
     m
     (WithEventType
        (WithBearer
           (ConnectionId addr)
           (TraceTxSubmissionOutbound SigId (Sig crypto))))
-> Tracer m (TraceTxSubmissionOutbound SigId (Sig crypto))
forall (f :: * -> *) a b. Contravariant f => (a -> b) -> f b -> f a
>$< Tracer
  m
  (WithEventType
     (WithBearer
        (ConnectionId addr)
        (TraceTxSubmissionOutbound SigId (Sig crypto))))
forall ev. ToJSON ev => Tracer m (WithEventType ev)
tracer
               else Tracer m (TraceTxSubmissionOutbound SigId (Sig crypto))
forall (m :: * -> *) a. Applicative m => Tracer m a
nullTracer)
            NumTxIdsToAck
_MAX_SIGS_TO_ACK
            TxSubmissionMempoolReader SigId (Sig crypto) Int m
mempoolReader
            NodeToNodeVersion
version
            ControlMessageSTM m
controlMessage


    aSigSubmissionServer
      :: NodeToNodeVersion
      -> ResponderContext addr
      -> Channel m BL.ByteString
      -> m ((), Maybe BL.ByteString)
    aSigSubmissionServer :: ServerApp addr m ()
aSigSubmissionServer NodeToNodeVersion
_version ResponderContext { rcConnectionId :: forall addr. ResponderContext addr -> ConnectionId addr
rcConnectionId = ConnectionId addr
connId } Channel m ByteString
channel =
        Tracer m (TraceTxLogic addr SigId (Sig crypto))
-> TxChannelsVar m addr SigId (Sig crypto)
-> TxMempoolSem m
-> TxDecisionPolicy
-> SharedTxStateVar m addr SigId (Sig crypto)
-> TxSubmissionMempoolReader SigId (Sig crypto) Int m
-> TxSubmissionMempoolWriter SigId (Sig crypto) Int m
-> (Sig crypto -> SizeInBytes)
-> addr
-> (PeerTxAPI m SigId (Sig crypto) -> m ((), Maybe ByteString))
-> m ((), Maybe ByteString)
forall tx peeraddr txid idx (m :: * -> *) a.
(MonadMask m, MonadMVar m, MonadSTM m, MonadMonotonicTime m,
 Ord txid, Show txid, Typeable txid, Ord peeraddr, Show peeraddr) =>
Tracer m (TraceTxLogic peeraddr txid tx)
-> TxChannelsVar m peeraddr txid tx
-> TxMempoolSem m
-> TxDecisionPolicy
-> SharedTxStateVar m peeraddr txid tx
-> TxSubmissionMempoolReader txid tx idx m
-> TxSubmissionMempoolWriter txid tx idx m
-> (tx -> SizeInBytes)
-> peeraddr
-> (PeerTxAPI m txid tx -> m a)
-> m a
SigSubmission.withPeer
          (if Bool
sigSubmissionLogicTracer
             then String
-> WithBearer
     (ConnectionId addr) (TraceTxLogic addr SigId (Sig crypto))
-> WithEventType
     (WithBearer
        (ConnectionId addr) (TraceTxLogic addr SigId (Sig crypto)))
forall a. String -> a -> WithEventType a
WithEventType String
"SigSubmission.Logic" (WithBearer
   (ConnectionId addr) (TraceTxLogic addr SigId (Sig crypto))
 -> WithEventType
      (WithBearer
         (ConnectionId addr) (TraceTxLogic addr SigId (Sig crypto))))
-> (TraceTxLogic addr SigId (Sig crypto)
    -> WithBearer
         (ConnectionId addr) (TraceTxLogic addr SigId (Sig crypto)))
-> TraceTxLogic addr SigId (Sig crypto)
-> WithEventType
     (WithBearer
        (ConnectionId addr) (TraceTxLogic addr SigId (Sig crypto)))
forall b c a. (b -> c) -> (a -> b) -> a -> c
. ConnectionId addr
-> TraceTxLogic addr SigId (Sig crypto)
-> WithBearer
     (ConnectionId addr) (TraceTxLogic addr SigId (Sig crypto))
forall peerid a. peerid -> a -> WithBearer peerid a
Mx.WithBearer ConnectionId addr
connId (TraceTxLogic addr SigId (Sig crypto)
 -> WithEventType
      (WithBearer
         (ConnectionId addr) (TraceTxLogic addr SigId (Sig crypto))))
-> Tracer
     m
     (WithEventType
        (WithBearer
           (ConnectionId addr) (TraceTxLogic addr SigId (Sig crypto))))
-> Tracer m (TraceTxLogic addr SigId (Sig crypto))
forall (f :: * -> *) a b. Contravariant f => (a -> b) -> f b -> f a
>$< Tracer
  m
  (WithEventType
     (WithBearer
        (ConnectionId addr) (TraceTxLogic addr SigId (Sig crypto))))
forall ev. ToJSON ev => Tracer m (WithEventType ev)
tracer
             else Tracer m (TraceTxLogic addr SigId (Sig crypto))
forall (m :: * -> *) a. Applicative m => Tracer m a
nullTracer)
          TxChannelsVar m addr SigId (Sig crypto)
sigChannelVar
          TxMempoolSem m
sigMempoolSem
          TxDecisionPolicy
sigDecisionPolicy
          SharedTxStateVar m addr SigId (Sig crypto)
sigSharedTxStateVar
          TxSubmissionMempoolReader SigId (Sig crypto) Int m
mempoolReader
          TxSubmissionMempoolWriter SigId (Sig crypto) Int m
mempoolWriter
          Sig crypto -> SizeInBytes
sigSize
          (ConnectionId addr -> addr
forall addr. ConnectionId addr -> addr
remoteAddress ConnectionId addr
connId)
          ((PeerTxAPI m SigId (Sig crypto) -> m ((), Maybe ByteString))
 -> m ((), Maybe ByteString))
-> (PeerTxAPI m SigId (Sig crypto) -> m ((), Maybe ByteString))
-> m ((), Maybe ByteString)
forall a b. (a -> b) -> a -> b
$ \(PeerTxAPI m SigId (Sig crypto)
peerSigAPI :: PeerTxAPI m SigId (Sig crypto)) ->
            Tracer m (TraceSendRecv (SigSubmission crypto))
-> AnnotatedCodec
     (SigSubmission crypto) DeserialiseFailure m ByteString
-> ProtocolSizeLimits (SigSubmission crypto) ByteString
-> ProtocolTimeLimits (SigSubmission crypto)
-> Channel m ByteString
-> PeerPipelined (SigSubmission crypto) 'AsServer 'StInit m ()
-> m ((), Maybe ByteString)
forall ps (st :: ps) (pr :: PeerRole) failure bytes (m :: * -> *)
       a.
(MonadAsync m, MonadFork m, MonadMask m, MonadTimer m,
 MonadThrow (STM m), Monoid bytes, ShowProxy ps,
 forall (st' :: ps) stok. (stok ~ StateToken st') => Show stok,
 Show failure) =>
Tracer m (TraceSendRecv ps)
-> AnnotatedCodec ps failure m bytes
-> ProtocolSizeLimits ps bytes
-> ProtocolTimeLimits ps
-> Channel m bytes
-> PeerPipelined ps pr st m a
-> m (a, Maybe bytes)
runPipelinedAnnotatedPeerWithLimits
              (if Bool
sigSubmissionServerProtocolTracer
                 then String
-> WithBearer
     (ConnectionId addr) (TraceSendRecv (SigSubmission crypto))
-> WithEventType
     (WithBearer
        (ConnectionId addr) (TraceSendRecv (SigSubmission crypto)))
forall a. String -> a -> WithEventType a
WithEventType String
"SigSubmission.Protocol.Server" (WithBearer
   (ConnectionId addr) (TraceSendRecv (SigSubmission crypto))
 -> WithEventType
      (WithBearer
         (ConnectionId addr) (TraceSendRecv (SigSubmission crypto))))
-> (TraceSendRecv (SigSubmission crypto)
    -> WithBearer
         (ConnectionId addr) (TraceSendRecv (SigSubmission crypto)))
-> TraceSendRecv (SigSubmission crypto)
-> WithEventType
     (WithBearer
        (ConnectionId addr) (TraceSendRecv (SigSubmission crypto)))
forall b c a. (b -> c) -> (a -> b) -> a -> c
. ConnectionId addr
-> TraceSendRecv (SigSubmission crypto)
-> WithBearer
     (ConnectionId addr) (TraceSendRecv (SigSubmission crypto))
forall peerid a. peerid -> a -> WithBearer peerid a
Mx.WithBearer ConnectionId addr
connId (TraceSendRecv (SigSubmission crypto)
 -> WithEventType
      (WithBearer
         (ConnectionId addr) (TraceSendRecv (SigSubmission crypto))))
-> Tracer
     m
     (WithEventType
        (WithBearer
           (ConnectionId addr) (TraceSendRecv (SigSubmission crypto))))
-> Tracer m (TraceSendRecv (SigSubmission crypto))
forall (f :: * -> *) a b. Contravariant f => (a -> b) -> f b -> f a
>$< Tracer
  m
  (WithEventType
     (WithBearer
        (ConnectionId addr) (TraceSendRecv (SigSubmission crypto))))
forall ev. ToJSON ev => Tracer m (WithEventType ev)
tracer
                 else Tracer m (TraceSendRecv (SigSubmission crypto))
forall (m :: * -> *) a. Applicative m => Tracer m a
nullTracer)
              AnnotatedCodec
  (SigSubmission crypto) DeserialiseFailure m ByteString
sigSubmissionCodec
              ProtocolSizeLimits (SigSubmission crypto) ByteString
sigSubmissionSizeLimits
              ProtocolTimeLimits (SigSubmission crypto)
sigSubmissionTimeLimits
              Channel m ByteString
channel
              (PeerPipelined (SigSubmission crypto) 'AsServer 'StInit m ()
 -> m ((), Maybe ByteString))
-> PeerPipelined (SigSubmission crypto) 'AsServer 'StInit m ()
-> m ((), Maybe ByteString)
forall a b. (a -> b) -> a -> b
$ TxSubmissionServerPipelined SigId (Sig crypto) m ()
-> PeerPipelined (SigSubmission crypto) 'AsServer 'StInit m ()
forall txid tx (m :: * -> *) a.
Functor m =>
TxSubmissionServerPipelined txid tx m a
-> ServerPipelined (TxSubmission2 txid tx) 'StInit m a
txSubmissionServerPeerPipelined
              (TxSubmissionServerPipelined SigId (Sig crypto) m ()
 -> PeerPipelined (SigSubmission crypto) 'AsServer 'StInit m ())
-> TxSubmissionServerPipelined SigId (Sig crypto) m ()
-> PeerPipelined (SigSubmission crypto) 'AsServer 'StInit m ()
forall a b. (a -> b) -> a -> b
$ Tracer m (TraceTxSubmissionInbound SigId (Sig crypto))
-> TxSubmissionInitDelay
-> TxSubmissionMempoolWriter SigId (Sig crypto) Int m
-> PeerTxAPI m SigId (Sig crypto)
-> TxSubmissionServerPipelined SigId (Sig crypto) m ()
forall txid tx idx (m :: * -> *).
(MonadDelay m, MonadThrow m, Ord txid) =>
Tracer m (TraceTxSubmissionInbound txid tx)
-> TxSubmissionInitDelay
-> TxSubmissionMempoolWriter txid tx idx m
-> PeerTxAPI m txid tx
-> TxSubmissionServerPipelined txid tx m ()
txSubmissionInboundV2
                  (if Bool
sigSubmissionInboundTracer
                     then String
-> WithBearer
     (ConnectionId addr) (TraceTxSubmissionInbound SigId (Sig crypto))
-> WithEventType
     (WithBearer
        (ConnectionId addr) (TraceTxSubmissionInbound SigId (Sig crypto)))
forall a. String -> a -> WithEventType a
WithEventType String
"SigSubmission.Inbound" (WithBearer
   (ConnectionId addr) (TraceTxSubmissionInbound SigId (Sig crypto))
 -> WithEventType
      (WithBearer
         (ConnectionId addr) (TraceTxSubmissionInbound SigId (Sig crypto))))
-> (TraceTxSubmissionInbound SigId (Sig crypto)
    -> WithBearer
         (ConnectionId addr) (TraceTxSubmissionInbound SigId (Sig crypto)))
-> TraceTxSubmissionInbound SigId (Sig crypto)
-> WithEventType
     (WithBearer
        (ConnectionId addr) (TraceTxSubmissionInbound SigId (Sig crypto)))
forall b c a. (b -> c) -> (a -> b) -> a -> c
. ConnectionId addr
-> TraceTxSubmissionInbound SigId (Sig crypto)
-> WithBearer
     (ConnectionId addr) (TraceTxSubmissionInbound SigId (Sig crypto))
forall peerid a. peerid -> a -> WithBearer peerid a
Mx.WithBearer ConnectionId addr
connId (TraceTxSubmissionInbound SigId (Sig crypto)
 -> WithEventType
      (WithBearer
         (ConnectionId addr) (TraceTxSubmissionInbound SigId (Sig crypto))))
-> Tracer
     m
     (WithEventType
        (WithBearer
           (ConnectionId addr) (TraceTxSubmissionInbound SigId (Sig crypto))))
-> Tracer m (TraceTxSubmissionInbound SigId (Sig crypto))
forall (f :: * -> *) a b. Contravariant f => (a -> b) -> f b -> f a
>$< Tracer
  m
  (WithEventType
     (WithBearer
        (ConnectionId addr) (TraceTxSubmissionInbound SigId (Sig crypto))))
forall ev. ToJSON ev => Tracer m (WithEventType ev)
tracer
                     else Tracer m (TraceTxSubmissionInbound SigId (Sig crypto))
forall (m :: * -> *) a. Applicative m => Tracer m a
nullTracer)
                  TxSubmissionInitDelay
_SIG_SUBMISSION_INIT_DELAY
                  TxSubmissionMempoolWriter SigId (Sig crypto) Int m
mempoolWriter
                  PeerTxAPI m SigId (Sig crypto)
peerSigAPI


    aKeepAliveClient
      :: NodeToNodeVersion
      -> ExpandedInitiatorContext addr m
      -> Channel m BL.ByteString
      -> m ((), Maybe BL.ByteString)
    aKeepAliveClient :: ClientApp addr m ()
aKeepAliveClient NodeToNodeVersion
_version
                     ExpandedInitiatorContext {
                       eicConnectionId :: forall addr (m :: * -> *).
ExpandedInitiatorContext addr m -> ConnectionId addr
eicConnectionId   = ConnectionId addr
connId
                     , eicControlMessage :: forall addr (m :: * -> *).
ExpandedInitiatorContext addr m -> ControlMessageSTM m
eicControlMessage = ControlMessageSTM m
controlMessageSTM
                     }
                     Channel m ByteString
channel = do
      String -> m ()
forall (m :: * -> *). MonadThread m => String -> m ()
labelThisThread String
"KeepAlive.Client"
      let kacApp :: StrictTVar m (Map (ConnectionId addr) PeerGSV)
-> m ((), Maybe ByteString)
kacApp StrictTVar m (Map (ConnectionId addr) PeerGSV)
dqCtx =
            Tracer m (TraceSendRecv KeepAlive)
-> Codec KeepAlive DeserialiseFailure m ByteString
-> ProtocolSizeLimits KeepAlive ByteString
-> ProtocolTimeLimits KeepAlive
-> Channel m ByteString
-> Peer KeepAlive 'AsClient 'NonPipelined 'StClient m ()
-> m ((), Maybe ByteString)
forall ps (st :: ps) (pr :: PeerRole) failure bytes (m :: * -> *)
       a.
(MonadAsync m, MonadFork m, MonadMask m, MonadThrow (STM m),
 MonadTimer m, ShowProxy ps,
 forall (st' :: ps) stok. (stok ~ StateToken st') => Show stok,
 Show failure) =>
Tracer m (TraceSendRecv ps)
-> Codec ps failure m bytes
-> ProtocolSizeLimits ps bytes
-> ProtocolTimeLimits ps
-> Channel m bytes
-> Peer ps pr 'NonPipelined st m a
-> m (a, Maybe bytes)
runPeerWithLimits
              (if Bool
keepAliveClientProtocolTracer
                 then String
-> WithBearer (ConnectionId addr) (TraceSendRecv KeepAlive)
-> WithEventType
     (WithBearer (ConnectionId addr) (TraceSendRecv KeepAlive))
forall a. String -> a -> WithEventType a
WithEventType String
"KeepAlive.Protocol.Client" (WithBearer (ConnectionId addr) (TraceSendRecv KeepAlive)
 -> WithEventType
      (WithBearer (ConnectionId addr) (TraceSendRecv KeepAlive)))
-> (TraceSendRecv KeepAlive
    -> WithBearer (ConnectionId addr) (TraceSendRecv KeepAlive))
-> TraceSendRecv KeepAlive
-> WithEventType
     (WithBearer (ConnectionId addr) (TraceSendRecv KeepAlive))
forall b c a. (b -> c) -> (a -> b) -> a -> c
. ConnectionId addr
-> TraceSendRecv KeepAlive
-> WithBearer (ConnectionId addr) (TraceSendRecv KeepAlive)
forall peerid a. peerid -> a -> WithBearer peerid a
Mx.WithBearer ConnectionId addr
connId (TraceSendRecv KeepAlive
 -> WithEventType
      (WithBearer (ConnectionId addr) (TraceSendRecv KeepAlive)))
-> Tracer
     m
     (WithEventType
        (WithBearer (ConnectionId addr) (TraceSendRecv KeepAlive)))
-> Tracer m (TraceSendRecv KeepAlive)
forall (f :: * -> *) a b. Contravariant f => (a -> b) -> f b -> f a
>$< Tracer
  m
  (WithEventType
     (WithBearer (ConnectionId addr) (TraceSendRecv KeepAlive)))
forall ev. ToJSON ev => Tracer m (WithEventType ev)
tracer
                 else Tracer m (TraceSendRecv KeepAlive)
forall (m :: * -> *) a. Applicative m => Tracer m a
nullTracer)
              Codec KeepAlive DeserialiseFailure m ByteString
keepAliveCodec
              ProtocolSizeLimits KeepAlive ByteString
keepAliveSizeLimits
              ProtocolTimeLimits KeepAlive
keepAliveTimeLimits
              Channel m ByteString
channel
              (Peer KeepAlive 'AsClient 'NonPipelined 'StClient m ()
 -> m ((), Maybe ByteString))
-> Peer KeepAlive 'AsClient 'NonPipelined 'StClient m ()
-> m ((), Maybe ByteString)
forall a b. (a -> b) -> a -> b
$ KeepAliveClient m ()
-> Peer KeepAlive 'AsClient 'NonPipelined 'StClient m ()
forall (m :: * -> *) a.
MonadThrow m =>
KeepAliveClient m a -> Client KeepAlive 'NonPipelined 'StClient m a
keepAliveClientPeer
              (KeepAliveClient m ()
 -> Peer KeepAlive 'AsClient 'NonPipelined 'StClient m ())
-> KeepAliveClient m ()
-> Peer KeepAlive 'AsClient 'NonPipelined 'StClient m ()
forall a b. (a -> b) -> a -> b
$ Tracer m (TraceKeepAliveClient (ConnectionId addr))
-> StdGen
-> ControlMessageSTM m
-> ConnectionId addr
-> StrictTVar m (Map (ConnectionId addr) PeerGSV)
-> KeepAliveInterval
-> KeepAliveClient m ()
forall (m :: * -> *) peer.
(MonadTimer m, Ord peer) =>
Tracer m (TraceKeepAliveClient peer)
-> StdGen
-> ControlMessageSTM m
-> peer
-> StrictTVar m (Map peer PeerGSV)
-> KeepAliveInterval
-> KeepAliveClient m ()
keepAliveClient Tracer m (TraceKeepAliveClient (ConnectionId addr))
forall (m :: * -> *) a. Applicative m => Tracer m a
nullTracer
                                (Int -> StdGen
mkStdGen Int
0)
                                ControlMessageSTM m
controlMessageSTM
                                ConnectionId addr
connId
                                StrictTVar m (Map (ConnectionId addr) PeerGSV)
dqCtx
                                (DiffTime -> KeepAliveInterval
KeepAliveInterval DiffTime
10)

      ((), trailing) <- FetchClientRegistry (ConnectionId addr) () () m
-> ConnectionId addr
-> (StrictTVar m (Map (ConnectionId addr) PeerGSV)
    -> m ((), Maybe ByteString))
-> m ((), Maybe ByteString)
forall (m :: * -> *) a peer header block.
(MonadSTM m, MonadFork m, MonadMask m, Ord peer) =>
FetchClientRegistry peer header block m
-> peer -> (StrictTVar m (Map peer PeerGSV) -> m a) -> m a
bracketKeepAliveClient FetchClientRegistry (ConnectionId addr) () () m
fetchClientRegistry ConnectionId addr
connId StrictTVar m (Map (ConnectionId addr) PeerGSV)
-> m ((), Maybe ByteString)
kacApp
      return ((), trailing)

    aKeepAliveServer
      :: NodeToNodeVersion
      -> ResponderContext addr
      -> Channel m BL.ByteString
      -> m ((), Maybe BL.ByteString)
    aKeepAliveServer :: ServerApp addr m ()
aKeepAliveServer NodeToNodeVersion
_version
                     ResponderContext {
                       rcConnectionId :: forall addr. ResponderContext addr -> ConnectionId addr
rcConnectionId = ConnectionId addr
connId
                     }
                     Channel m ByteString
channel = do
      String -> m ()
forall (m :: * -> *). MonadThread m => String -> m ()
labelThisThread String
"KeepAlive.Server"
      Tracer m (TraceSendRecv KeepAlive)
-> Codec KeepAlive DeserialiseFailure m ByteString
-> ProtocolSizeLimits KeepAlive ByteString
-> ProtocolTimeLimits KeepAlive
-> Channel m ByteString
-> Peer KeepAlive 'AsServer 'NonPipelined 'StClient m ()
-> m ((), Maybe ByteString)
forall ps (st :: ps) (pr :: PeerRole) failure bytes (m :: * -> *)
       a.
(MonadAsync m, MonadFork m, MonadMask m, MonadThrow (STM m),
 MonadTimer m, ShowProxy ps,
 forall (st' :: ps) stok. (stok ~ StateToken st') => Show stok,
 Show failure) =>
Tracer m (TraceSendRecv ps)
-> Codec ps failure m bytes
-> ProtocolSizeLimits ps bytes
-> ProtocolTimeLimits ps
-> Channel m bytes
-> Peer ps pr 'NonPipelined st m a
-> m (a, Maybe bytes)
runPeerWithLimits
        (if Bool
keepAliveServerProtocolTracer
           then String
-> WithBearer (ConnectionId addr) (TraceSendRecv KeepAlive)
-> WithEventType
     (WithBearer (ConnectionId addr) (TraceSendRecv KeepAlive))
forall a. String -> a -> WithEventType a
WithEventType String
"KeepAlive.Protocol.Server" (WithBearer (ConnectionId addr) (TraceSendRecv KeepAlive)
 -> WithEventType
      (WithBearer (ConnectionId addr) (TraceSendRecv KeepAlive)))
-> (TraceSendRecv KeepAlive
    -> WithBearer (ConnectionId addr) (TraceSendRecv KeepAlive))
-> TraceSendRecv KeepAlive
-> WithEventType
     (WithBearer (ConnectionId addr) (TraceSendRecv KeepAlive))
forall b c a. (b -> c) -> (a -> b) -> a -> c
. ConnectionId addr
-> TraceSendRecv KeepAlive
-> WithBearer (ConnectionId addr) (TraceSendRecv KeepAlive)
forall peerid a. peerid -> a -> WithBearer peerid a
Mx.WithBearer ConnectionId addr
connId (TraceSendRecv KeepAlive
 -> WithEventType
      (WithBearer (ConnectionId addr) (TraceSendRecv KeepAlive)))
-> Tracer
     m
     (WithEventType
        (WithBearer (ConnectionId addr) (TraceSendRecv KeepAlive)))
-> Tracer m (TraceSendRecv KeepAlive)
forall (f :: * -> *) a b. Contravariant f => (a -> b) -> f b -> f a
>$< Tracer
  m
  (WithEventType
     (WithBearer (ConnectionId addr) (TraceSendRecv KeepAlive)))
forall ev. ToJSON ev => Tracer m (WithEventType ev)
tracer
           else Tracer m (TraceSendRecv KeepAlive)
forall (m :: * -> *) a. Applicative m => Tracer m a
nullTracer)
        Codec KeepAlive DeserialiseFailure m ByteString
keepAliveCodec
        ProtocolSizeLimits KeepAlive ByteString
keepAliveSizeLimits
        ProtocolTimeLimits KeepAlive
keepAliveTimeLimits
        Channel m ByteString
channel
        (Peer KeepAlive 'AsServer 'NonPipelined 'StClient m ()
 -> m ((), Maybe ByteString))
-> Peer KeepAlive 'AsServer 'NonPipelined 'StClient m ()
-> m ((), Maybe ByteString)
forall a b. (a -> b) -> a -> b
$ KeepAliveServer m ()
-> Peer KeepAlive 'AsServer 'NonPipelined 'StClient m ()
forall (m :: * -> *) a.
Functor m =>
KeepAliveServer m a -> Server KeepAlive 'NonPipelined 'StClient m a
keepAliveServerPeer
          KeepAliveServer m ()
forall (m :: * -> *). Applicative m => KeepAliveServer m ()
keepAliveServer

    aPeerSharingClient
      :: NodeToNodeVersion
      -> ExpandedInitiatorContext addr m
      -> Channel m BL.ByteString
      -> m ((), Maybe BL.ByteString)
    aPeerSharingClient :: ClientApp addr m ()
aPeerSharingClient NodeToNodeVersion
_version
                       ExpandedInitiatorContext {
                         eicConnectionId :: forall addr (m :: * -> *).
ExpandedInitiatorContext addr m -> ConnectionId addr
eicConnectionId   = ConnectionId addr
connId
                       , eicControlMessage :: forall addr (m :: * -> *).
ExpandedInitiatorContext addr m -> ControlMessageSTM m
eicControlMessage = ControlMessageSTM m
controlMessageSTM
                       }
                       Channel m ByteString
channel = do
      String -> m ()
forall (m :: * -> *). MonadThread m => String -> m ()
labelThisThread String
"PeerSharing.Client"
      PeerSharingRegistry addr m
-> addr
-> (PeerSharingController addr m -> m ((), Maybe ByteString))
-> m ((), Maybe ByteString)
forall peer (m :: * -> *) a.
(Ord peer, MonadSTM m, MonadThrow m) =>
PeerSharingRegistry peer m
-> peer -> (PeerSharingController peer m -> m a) -> m a
bracketPeerSharingClient PeerSharingRegistry addr m
peerSharingRegistry (ConnectionId addr -> addr
forall addr. ConnectionId addr -> addr
remoteAddress ConnectionId addr
connId)
        ((PeerSharingController addr m -> m ((), Maybe ByteString))
 -> m ((), Maybe ByteString))
-> (PeerSharingController addr m -> m ((), Maybe ByteString))
-> m ((), Maybe ByteString)
forall a b. (a -> b) -> a -> b
$ \PeerSharingController addr m
controller -> do
          psClient <- ControlMessageSTM m
-> PeerSharingController addr m -> m (PeerSharingClient addr m ())
forall (m :: * -> *) peer.
(Alternative (STM m), MonadMVar m, MonadSTM m, MonadThrow m) =>
ControlMessageSTM m
-> PeerSharingController peer m -> m (PeerSharingClient peer m ())
peerSharingClient ControlMessageSTM m
controlMessageSTM PeerSharingController addr m
controller
          ((), trailing) <- runPeerWithLimits
            (if peerSharingClientProtocolTracer
               then WithEventType "PeerSharing.Protocol.Client" . Mx.WithBearer connId >$< tracer
               else nullTracer)
            peerSharingCodec
            peerSharingSizeLimits
            peerSharingTimeLimits
            channel
            (peerSharingClientPeer psClient)
          return ((), trailing)

    aPeerSharingServer
      :: NodeToNodeVersion
      -> ResponderContext addr
      -> Channel m BL.ByteString
      -> m ((), Maybe BL.ByteString)
    aPeerSharingServer :: ServerApp addr m ()
aPeerSharingServer NodeToNodeVersion
_version
                       ResponderContext {
                         rcConnectionId :: forall addr. ResponderContext addr -> ConnectionId addr
rcConnectionId = ConnectionId addr
connId
                       }
                       Channel m ByteString
channel = do
      String -> m ()
forall (m :: * -> *). MonadThread m => String -> m ()
labelThisThread String
"PeerSharing.Server"
      Tracer m (TraceSendRecv (PeerSharing addr))
-> Codec (PeerSharing addr) DeserialiseFailure m ByteString
-> ProtocolSizeLimits (PeerSharing addr) ByteString
-> ProtocolTimeLimits (PeerSharing addr)
-> Channel m ByteString
-> Peer (PeerSharing addr) 'AsServer 'NonPipelined 'StIdle m ()
-> m ((), Maybe ByteString)
forall ps (st :: ps) (pr :: PeerRole) failure bytes (m :: * -> *)
       a.
(MonadAsync m, MonadFork m, MonadMask m, MonadThrow (STM m),
 MonadTimer m, ShowProxy ps,
 forall (st' :: ps) stok. (stok ~ StateToken st') => Show stok,
 Show failure) =>
Tracer m (TraceSendRecv ps)
-> Codec ps failure m bytes
-> ProtocolSizeLimits ps bytes
-> ProtocolTimeLimits ps
-> Channel m bytes
-> Peer ps pr 'NonPipelined st m a
-> m (a, Maybe bytes)
runPeerWithLimits
        (if Bool
peerSharingServerProtocolTracer
           then String
-> WithBearer
     (ConnectionId addr) (TraceSendRecv (PeerSharing addr))
-> WithEventType
     (WithBearer (ConnectionId addr) (TraceSendRecv (PeerSharing addr)))
forall a. String -> a -> WithEventType a
WithEventType String
"PeerSharing.Protocol.Server" (WithBearer (ConnectionId addr) (TraceSendRecv (PeerSharing addr))
 -> WithEventType
      (WithBearer
         (ConnectionId addr) (TraceSendRecv (PeerSharing addr))))
-> (TraceSendRecv (PeerSharing addr)
    -> WithBearer
         (ConnectionId addr) (TraceSendRecv (PeerSharing addr)))
-> TraceSendRecv (PeerSharing addr)
-> WithEventType
     (WithBearer (ConnectionId addr) (TraceSendRecv (PeerSharing addr)))
forall b c a. (b -> c) -> (a -> b) -> a -> c
. ConnectionId addr
-> TraceSendRecv (PeerSharing addr)
-> WithBearer
     (ConnectionId addr) (TraceSendRecv (PeerSharing addr))
forall peerid a. peerid -> a -> WithBearer peerid a
Mx.WithBearer ConnectionId addr
connId (TraceSendRecv (PeerSharing addr)
 -> WithEventType
      (WithBearer
         (ConnectionId addr) (TraceSendRecv (PeerSharing addr))))
-> Tracer
     m
     (WithEventType
        (WithBearer
           (ConnectionId addr) (TraceSendRecv (PeerSharing addr))))
-> Tracer m (TraceSendRecv (PeerSharing addr))
forall (f :: * -> *) a b. Contravariant f => (a -> b) -> f b -> f a
>$< Tracer
  m
  (WithEventType
     (WithBearer
        (ConnectionId addr) (TraceSendRecv (PeerSharing addr))))
forall ev. ToJSON ev => Tracer m (WithEventType ev)
tracer
           else Tracer m (TraceSendRecv (PeerSharing addr))
forall (m :: * -> *) a. Applicative m => Tracer m a
nullTracer)
        Codec (PeerSharing addr) DeserialiseFailure m ByteString
peerSharingCodec
        ProtocolSizeLimits (PeerSharing addr) ByteString
peerSharingSizeLimits
        ProtocolTimeLimits (PeerSharing addr)
peerSharingTimeLimits
        Channel m ByteString
channel
        (Peer (PeerSharing addr) 'AsServer 'NonPipelined 'StIdle m ()
 -> m ((), Maybe ByteString))
-> Peer (PeerSharing addr) 'AsServer 'NonPipelined 'StIdle m ()
-> m ((), Maybe ByteString)
forall a b. (a -> b) -> a -> b
$ PeerSharingServer addr m
-> Peer (PeerSharing addr) 'AsServer 'NonPipelined 'StIdle m ()
forall (m :: * -> *) peerAddress.
Monad m =>
PeerSharingServer peerAddress m
-> Server (PeerSharing peerAddress) 'NonPipelined 'StIdle m ()
peerSharingServerPeer
        (PeerSharingServer addr m
 -> Peer (PeerSharing addr) 'AsServer 'NonPipelined 'StIdle m ())
-> PeerSharingServer addr m
-> Peer (PeerSharing addr) 'AsServer 'NonPipelined 'StIdle m ()
forall a b. (a -> b) -> a -> b
$ PeerSharingAPI addr StdGen m -> PeerSharingServer addr m
forall (m :: * -> *) peer s.
(MonadSTM m, MonadMonotonicTime m, Hashable peer, RandomGen s) =>
PeerSharingAPI peer s m -> PeerSharingServer peer m
peerSharingServer PeerSharingAPI addr StdGen m
peerSharingAPI


data Protocols appType initiatorCtx responderCtx bytes m a b =
  Protocols {
    forall (appType :: Mode) initiatorCtx responderCtx bytes
       (m :: * -> *) a b.
Protocols appType initiatorCtx responderCtx bytes m a b
-> RunMiniProtocol appType initiatorCtx responderCtx bytes m a b
sigSubmissionProtocol :: RunMiniProtocol appType initiatorCtx responderCtx bytes m a b

    -- | keep-alive mini-protocol
    --
  , forall (appType :: Mode) initiatorCtx responderCtx bytes
       (m :: * -> *) a b.
Protocols appType initiatorCtx responderCtx bytes m a b
-> RunMiniProtocol appType initiatorCtx responderCtx bytes m a b
keepAliveProtocol    :: RunMiniProtocol appType initiatorCtx responderCtx bytes m a b

    -- | peer sharing mini-protocol
    --
  , forall (appType :: Mode) initiatorCtx responderCtx bytes
       (m :: * -> *) a b.
Protocols appType initiatorCtx responderCtx bytes m a b
-> RunMiniProtocol appType initiatorCtx responderCtx bytes m a b
peerSharingProtocol  :: RunMiniProtocol appType initiatorCtx responderCtx bytes m a b
  }

sigSubmissionMiniProtocolNum :: Mx.MiniProtocolNum
sigSubmissionMiniProtocolNum :: MiniProtocolNum
sigSubmissionMiniProtocolNum = Word16 -> MiniProtocolNum
Mx.MiniProtocolNum Word16
11

keepAliveMiniProtocolNum :: Mx.MiniProtocolNum
keepAliveMiniProtocolNum :: MiniProtocolNum
keepAliveMiniProtocolNum = Word16 -> MiniProtocolNum
Mx.MiniProtocolNum Word16
12

peerSharingMiniProtocolNum :: Mx.MiniProtocolNum
peerSharingMiniProtocolNum :: MiniProtocolNum
peerSharingMiniProtocolNum = Word16 -> MiniProtocolNum
Mx.MiniProtocolNum Word16
13

nodeToNodeProtocols
  :: LimitsAndTimeouts crypto addr
  -> Protocols appType initiatorCtx responderCtx bytes m a b
  -> NodeToNodeVersion
  -- ^ negotiated version number
  -> NodeToNodeVersionData
  -- ^ negotiated version data
  -> OuroborosBundle appType initiatorCtx responderCtx bytes m a b
nodeToNodeProtocols :: forall crypto addr (appType :: Mode) initiatorCtx responderCtx
       bytes (m :: * -> *) a b.
LimitsAndTimeouts crypto addr
-> Protocols appType initiatorCtx responderCtx bytes m a b
-> NodeToNodeVersion
-> NodeToNodeVersionData
-> OuroborosBundle appType initiatorCtx responderCtx bytes m a b
nodeToNodeProtocols LimitsAndTimeouts {
                      MiniProtocolLimits
sigSubmissionLimits :: MiniProtocolLimits
sigSubmissionLimits :: forall crypto addr.
LimitsAndTimeouts crypto addr -> MiniProtocolLimits
sigSubmissionLimits
                    , MiniProtocolLimits
keepAliveLimits :: MiniProtocolLimits
keepAliveLimits :: forall crypto addr.
LimitsAndTimeouts crypto addr -> MiniProtocolLimits
keepAliveLimits
                    , MiniProtocolLimits
peerSharingLimits :: MiniProtocolLimits
peerSharingLimits :: forall crypto addr.
LimitsAndTimeouts crypto addr -> MiniProtocolLimits
peerSharingLimits
                    }
                    Protocols {
                      RunMiniProtocol appType initiatorCtx responderCtx bytes m a b
sigSubmissionProtocol :: forall (appType :: Mode) initiatorCtx responderCtx bytes
       (m :: * -> *) a b.
Protocols appType initiatorCtx responderCtx bytes m a b
-> RunMiniProtocol appType initiatorCtx responderCtx bytes m a b
sigSubmissionProtocol :: RunMiniProtocol appType initiatorCtx responderCtx bytes m a b
sigSubmissionProtocol
                    , RunMiniProtocol appType initiatorCtx responderCtx bytes m a b
keepAliveProtocol :: forall (appType :: Mode) initiatorCtx responderCtx bytes
       (m :: * -> *) a b.
Protocols appType initiatorCtx responderCtx bytes m a b
-> RunMiniProtocol appType initiatorCtx responderCtx bytes m a b
keepAliveProtocol :: RunMiniProtocol appType initiatorCtx responderCtx bytes m a b
keepAliveProtocol
                    , RunMiniProtocol appType initiatorCtx responderCtx bytes m a b
peerSharingProtocol :: forall (appType :: Mode) initiatorCtx responderCtx bytes
       (m :: * -> *) a b.
Protocols appType initiatorCtx responderCtx bytes m a b
-> RunMiniProtocol appType initiatorCtx responderCtx bytes m a b
peerSharingProtocol :: RunMiniProtocol appType initiatorCtx responderCtx bytes m a b
peerSharingProtocol
                    }
                    NodeToNodeVersion
_version
                    NodeToNodeVersionData {
                      PeerSharing
peerSharing :: PeerSharing
peerSharing :: NodeToNodeVersionData -> PeerSharing
peerSharing
                    }
                    =
    WithProtocolTemperature
  'Hot [MiniProtocol appType initiatorCtx responderCtx bytes m a b]
-> WithProtocolTemperature
     'Warm [MiniProtocol appType initiatorCtx responderCtx bytes m a b]
-> WithProtocolTemperature
     'Established
     [MiniProtocol appType initiatorCtx responderCtx bytes m a b]
-> TemperatureBundle
     [MiniProtocol appType initiatorCtx responderCtx bytes m a b]
forall a.
WithProtocolTemperature 'Hot a
-> WithProtocolTemperature 'Warm a
-> WithProtocolTemperature 'Established a
-> TemperatureBundle a
TemperatureBundle
      -- Hot protocols
      ([MiniProtocol appType initiatorCtx responderCtx bytes m a b]
-> WithProtocolTemperature
     'Hot [MiniProtocol appType initiatorCtx responderCtx bytes m a b]
forall a. a -> WithProtocolTemperature 'Hot a
WithHot [
        MiniProtocol {
          miniProtocolNum :: MiniProtocolNum
miniProtocolNum    = MiniProtocolNum
sigSubmissionMiniProtocolNum
        , miniProtocolStart :: StartOnDemandOrEagerly
miniProtocolStart  = StartOnDemandOrEagerly
StartOnDemandAny
        , miniProtocolLimits :: MiniProtocolLimits
miniProtocolLimits = MiniProtocolLimits
sigSubmissionLimits
        , miniProtocolRun :: RunMiniProtocol appType initiatorCtx responderCtx bytes m a b
miniProtocolRun    = RunMiniProtocol appType initiatorCtx responderCtx bytes m a b
sigSubmissionProtocol
        }
      ])

      -- Warm protocols
      ([MiniProtocol appType initiatorCtx responderCtx bytes m a b]
-> WithProtocolTemperature
     'Warm [MiniProtocol appType initiatorCtx responderCtx bytes m a b]
forall a. a -> WithProtocolTemperature 'Warm a
WithWarm [])

      -- Established protocols: 'keep-alive', 'peer-sharing'.
      ([MiniProtocol appType initiatorCtx responderCtx bytes m a b]
-> WithProtocolTemperature
     'Established
     [MiniProtocol appType initiatorCtx responderCtx bytes m a b]
forall a. a -> WithProtocolTemperature 'Established a
WithEstablished ([MiniProtocol appType initiatorCtx responderCtx bytes m a b]
 -> WithProtocolTemperature
      'Established
      [MiniProtocol appType initiatorCtx responderCtx bytes m a b])
-> [MiniProtocol appType initiatorCtx responderCtx bytes m a b]
-> WithProtocolTemperature
     'Established
     [MiniProtocol appType initiatorCtx responderCtx bytes m a b]
forall a b. (a -> b) -> a -> b
$
        MiniProtocol {
          miniProtocolNum :: MiniProtocolNum
miniProtocolNum    = MiniProtocolNum
keepAliveMiniProtocolNum
        , miniProtocolStart :: StartOnDemandOrEagerly
miniProtocolStart  = StartOnDemandOrEagerly
StartOnDemandAny
        , miniProtocolLimits :: MiniProtocolLimits
miniProtocolLimits = MiniProtocolLimits
keepAliveLimits
        , miniProtocolRun :: RunMiniProtocol appType initiatorCtx responderCtx bytes m a b
miniProtocolRun    = RunMiniProtocol appType initiatorCtx responderCtx bytes m a b
keepAliveProtocol
        }
        MiniProtocol appType initiatorCtx responderCtx bytes m a b
-> [MiniProtocol appType initiatorCtx responderCtx bytes m a b]
-> [MiniProtocol appType initiatorCtx responderCtx bytes m a b]
forall a. a -> [a] -> [a]
: case PeerSharing
peerSharing of
            PeerSharing
PeerSharingEnabled ->
              [ MiniProtocol {
                  miniProtocolNum :: MiniProtocolNum
miniProtocolNum    = MiniProtocolNum
peerSharingMiniProtocolNum
                , miniProtocolStart :: StartOnDemandOrEagerly
miniProtocolStart  = StartOnDemandOrEagerly
StartOnDemand
                , miniProtocolLimits :: MiniProtocolLimits
miniProtocolLimits = MiniProtocolLimits
peerSharingLimits
                , miniProtocolRun :: RunMiniProtocol appType initiatorCtx responderCtx bytes m a b
miniProtocolRun    = RunMiniProtocol appType initiatorCtx responderCtx bytes m a b
peerSharingProtocol
                }
              ]
            PeerSharing
PeerSharingDisabled ->
              []
      )

initiatorProtocols
  :: LimitsAndTimeouts crypto addr
  -> Apps addr m a b
  -> NodeToNodeVersion
  -> NodeToNodeVersionData
  -> OuroborosBundleWithExpandedCtx 'InitiatorMode addr BL.ByteString m a Void
initiatorProtocols :: forall crypto addr (m :: * -> *) a b.
LimitsAndTimeouts crypto addr
-> Apps addr m a b
-> NodeToNodeVersion
-> NodeToNodeVersionData
-> OuroborosBundleWithExpandedCtx
     'InitiatorMode addr ByteString m a Void
initiatorProtocols LimitsAndTimeouts crypto addr
limitsAndTimeouts
                   Apps {
                     ClientApp addr m a
aSigSubmissionClient :: forall addr (m :: * -> *) a b.
Apps addr m a b -> ClientApp addr m a
aSigSubmissionClient :: ClientApp addr m a
aSigSubmissionClient
                   , ClientApp addr m a
aKeepAliveClient :: forall addr (m :: * -> *) a b.
Apps addr m a b -> ClientApp addr m a
aKeepAliveClient :: ClientApp addr m a
aKeepAliveClient
                   , ClientApp addr m a
aPeerSharingClient :: forall addr (m :: * -> *) a b.
Apps addr m a b -> ClientApp addr m a
aPeerSharingClient :: ClientApp addr m a
aPeerSharingClient
                   }
                   NodeToNodeVersion
version =
  LimitsAndTimeouts crypto addr
-> Protocols
     'InitiatorMode
     (ExpandedInitiatorContext addr m)
     (ResponderContext addr)
     ByteString
     m
     a
     Void
-> NodeToNodeVersion
-> NodeToNodeVersionData
-> OuroborosBundle
     'InitiatorMode
     (ExpandedInitiatorContext addr m)
     (ResponderContext addr)
     ByteString
     m
     a
     Void
forall crypto addr (appType :: Mode) initiatorCtx responderCtx
       bytes (m :: * -> *) a b.
LimitsAndTimeouts crypto addr
-> Protocols appType initiatorCtx responderCtx bytes m a b
-> NodeToNodeVersion
-> NodeToNodeVersionData
-> OuroborosBundle appType initiatorCtx responderCtx bytes m a b
nodeToNodeProtocols
    LimitsAndTimeouts crypto addr
limitsAndTimeouts
    (Protocols {
      sigSubmissionProtocol :: RunMiniProtocol
  'InitiatorMode
  (ExpandedInitiatorContext addr m)
  (ResponderContext addr)
  ByteString
  m
  a
  Void
sigSubmissionProtocol =
        MiniProtocolCb (ExpandedInitiatorContext addr m) ByteString m a
-> RunMiniProtocol
     'InitiatorMode
     (ExpandedInitiatorContext addr m)
     (ResponderContext addr)
     ByteString
     m
     a
     Void
forall initiatorCtx bytes (m :: * -> *) a responderCtx.
MiniProtocolCb initiatorCtx bytes m a
-> RunMiniProtocol
     'InitiatorMode initiatorCtx responderCtx bytes m a Void
InitiatorProtocolOnly ((ExpandedInitiatorContext addr m
 -> Channel m ByteString -> m (a, Maybe ByteString))
-> MiniProtocolCb (ExpandedInitiatorContext addr m) ByteString m a
forall ctx bytes (m :: * -> *) a.
(ctx -> Channel m bytes -> m (a, Maybe bytes))
-> MiniProtocolCb ctx bytes m a
MiniProtocolCb (ClientApp addr m a
aSigSubmissionClient NodeToNodeVersion
version))
    , keepAliveProtocol :: RunMiniProtocol
  'InitiatorMode
  (ExpandedInitiatorContext addr m)
  (ResponderContext addr)
  ByteString
  m
  a
  Void
keepAliveProtocol =
        MiniProtocolCb (ExpandedInitiatorContext addr m) ByteString m a
-> RunMiniProtocol
     'InitiatorMode
     (ExpandedInitiatorContext addr m)
     (ResponderContext addr)
     ByteString
     m
     a
     Void
forall initiatorCtx bytes (m :: * -> *) a responderCtx.
MiniProtocolCb initiatorCtx bytes m a
-> RunMiniProtocol
     'InitiatorMode initiatorCtx responderCtx bytes m a Void
InitiatorProtocolOnly ((ExpandedInitiatorContext addr m
 -> Channel m ByteString -> m (a, Maybe ByteString))
-> MiniProtocolCb (ExpandedInitiatorContext addr m) ByteString m a
forall ctx bytes (m :: * -> *) a.
(ctx -> Channel m bytes -> m (a, Maybe bytes))
-> MiniProtocolCb ctx bytes m a
MiniProtocolCb (ClientApp addr m a
aKeepAliveClient NodeToNodeVersion
version))
    , peerSharingProtocol :: RunMiniProtocol
  'InitiatorMode
  (ExpandedInitiatorContext addr m)
  (ResponderContext addr)
  ByteString
  m
  a
  Void
peerSharingProtocol =
        MiniProtocolCb (ExpandedInitiatorContext addr m) ByteString m a
-> RunMiniProtocol
     'InitiatorMode
     (ExpandedInitiatorContext addr m)
     (ResponderContext addr)
     ByteString
     m
     a
     Void
forall initiatorCtx bytes (m :: * -> *) a responderCtx.
MiniProtocolCb initiatorCtx bytes m a
-> RunMiniProtocol
     'InitiatorMode initiatorCtx responderCtx bytes m a Void
InitiatorProtocolOnly ((ExpandedInitiatorContext addr m
 -> Channel m ByteString -> m (a, Maybe ByteString))
-> MiniProtocolCb (ExpandedInitiatorContext addr m) ByteString m a
forall ctx bytes (m :: * -> *) a.
(ctx -> Channel m bytes -> m (a, Maybe bytes))
-> MiniProtocolCb ctx bytes m a
MiniProtocolCb (ClientApp addr m a
aPeerSharingClient NodeToNodeVersion
version))
    })
    NodeToNodeVersion
version

initiatorAndResponderProtocols
  :: LimitsAndTimeouts crypto addr
  -> Apps addr m a b
  -> NodeToNodeVersion
  -> NodeToNodeVersionData
  -> OuroborosBundleWithExpandedCtx 'InitiatorResponderMode addr BL.ByteString m a b
initiatorAndResponderProtocols :: forall crypto addr (m :: * -> *) a b.
LimitsAndTimeouts crypto addr
-> Apps addr m a b
-> NodeToNodeVersion
-> NodeToNodeVersionData
-> OuroborosBundleWithExpandedCtx
     'InitiatorResponderMode addr ByteString m a b
initiatorAndResponderProtocols LimitsAndTimeouts crypto addr
limitsAndTimeouts
                               Apps {
                                 ClientApp addr m a
aSigSubmissionClient :: forall addr (m :: * -> *) a b.
Apps addr m a b -> ClientApp addr m a
aSigSubmissionClient :: ClientApp addr m a
aSigSubmissionClient
                               , ServerApp addr m b
aSigSubmissionServer :: forall addr (m :: * -> *) a b.
Apps addr m a b -> ServerApp addr m b
aSigSubmissionServer :: ServerApp addr m b
aSigSubmissionServer
                               , ClientApp addr m a
aKeepAliveClient :: forall addr (m :: * -> *) a b.
Apps addr m a b -> ClientApp addr m a
aKeepAliveClient :: ClientApp addr m a
aKeepAliveClient
                               , ServerApp addr m b
aKeepAliveServer :: forall addr (m :: * -> *) a b.
Apps addr m a b -> ServerApp addr m b
aKeepAliveServer :: ServerApp addr m b
aKeepAliveServer
                               , ClientApp addr m a
aPeerSharingClient :: forall addr (m :: * -> *) a b.
Apps addr m a b -> ClientApp addr m a
aPeerSharingClient :: ClientApp addr m a
aPeerSharingClient
                               , ServerApp addr m b
aPeerSharingServer :: forall addr (m :: * -> *) a b.
Apps addr m a b -> ServerApp addr m b
aPeerSharingServer :: ServerApp addr m b
aPeerSharingServer
                               }
                               NodeToNodeVersion
version =
  LimitsAndTimeouts crypto addr
-> Protocols
     'InitiatorResponderMode
     (ExpandedInitiatorContext addr m)
     (ResponderContext addr)
     ByteString
     m
     a
     b
-> NodeToNodeVersion
-> NodeToNodeVersionData
-> OuroborosBundle
     'InitiatorResponderMode
     (ExpandedInitiatorContext addr m)
     (ResponderContext addr)
     ByteString
     m
     a
     b
forall crypto addr (appType :: Mode) initiatorCtx responderCtx
       bytes (m :: * -> *) a b.
LimitsAndTimeouts crypto addr
-> Protocols appType initiatorCtx responderCtx bytes m a b
-> NodeToNodeVersion
-> NodeToNodeVersionData
-> OuroborosBundle appType initiatorCtx responderCtx bytes m a b
nodeToNodeProtocols
    LimitsAndTimeouts crypto addr
limitsAndTimeouts
    (Protocols {
      sigSubmissionProtocol :: RunMiniProtocol
  'InitiatorResponderMode
  (ExpandedInitiatorContext addr m)
  (ResponderContext addr)
  ByteString
  m
  a
  b
sigSubmissionProtocol =
        MiniProtocolCb (ExpandedInitiatorContext addr m) ByteString m a
-> MiniProtocolCb (ResponderContext addr) ByteString m b
-> RunMiniProtocol
     'InitiatorResponderMode
     (ExpandedInitiatorContext addr m)
     (ResponderContext addr)
     ByteString
     m
     a
     b
forall initiatorCtx bytes (m :: * -> *) a responderCtx b.
MiniProtocolCb initiatorCtx bytes m a
-> MiniProtocolCb responderCtx bytes m b
-> RunMiniProtocol
     'InitiatorResponderMode initiatorCtx responderCtx bytes m a b
InitiatorAndResponderProtocol
           ((ExpandedInitiatorContext addr m
 -> Channel m ByteString -> m (a, Maybe ByteString))
-> MiniProtocolCb (ExpandedInitiatorContext addr m) ByteString m a
forall ctx bytes (m :: * -> *) a.
(ctx -> Channel m bytes -> m (a, Maybe bytes))
-> MiniProtocolCb ctx bytes m a
MiniProtocolCb (ClientApp addr m a
aSigSubmissionClient NodeToNodeVersion
version))
           ((ResponderContext addr
 -> Channel m ByteString -> m (b, Maybe ByteString))
-> MiniProtocolCb (ResponderContext addr) ByteString m b
forall ctx bytes (m :: * -> *) a.
(ctx -> Channel m bytes -> m (a, Maybe bytes))
-> MiniProtocolCb ctx bytes m a
MiniProtocolCb (ServerApp addr m b
aSigSubmissionServer NodeToNodeVersion
version))
    , keepAliveProtocol :: RunMiniProtocol
  'InitiatorResponderMode
  (ExpandedInitiatorContext addr m)
  (ResponderContext addr)
  ByteString
  m
  a
  b
keepAliveProtocol =
        MiniProtocolCb (ExpandedInitiatorContext addr m) ByteString m a
-> MiniProtocolCb (ResponderContext addr) ByteString m b
-> RunMiniProtocol
     'InitiatorResponderMode
     (ExpandedInitiatorContext addr m)
     (ResponderContext addr)
     ByteString
     m
     a
     b
forall initiatorCtx bytes (m :: * -> *) a responderCtx b.
MiniProtocolCb initiatorCtx bytes m a
-> MiniProtocolCb responderCtx bytes m b
-> RunMiniProtocol
     'InitiatorResponderMode initiatorCtx responderCtx bytes m a b
InitiatorAndResponderProtocol
           ((ExpandedInitiatorContext addr m
 -> Channel m ByteString -> m (a, Maybe ByteString))
-> MiniProtocolCb (ExpandedInitiatorContext addr m) ByteString m a
forall ctx bytes (m :: * -> *) a.
(ctx -> Channel m bytes -> m (a, Maybe bytes))
-> MiniProtocolCb ctx bytes m a
MiniProtocolCb (ClientApp addr m a
aKeepAliveClient NodeToNodeVersion
version))
           ((ResponderContext addr
 -> Channel m ByteString -> m (b, Maybe ByteString))
-> MiniProtocolCb (ResponderContext addr) ByteString m b
forall ctx bytes (m :: * -> *) a.
(ctx -> Channel m bytes -> m (a, Maybe bytes))
-> MiniProtocolCb ctx bytes m a
MiniProtocolCb (ServerApp addr m b
aKeepAliveServer NodeToNodeVersion
version))
    , peerSharingProtocol :: RunMiniProtocol
  'InitiatorResponderMode
  (ExpandedInitiatorContext addr m)
  (ResponderContext addr)
  ByteString
  m
  a
  b
peerSharingProtocol =
        MiniProtocolCb (ExpandedInitiatorContext addr m) ByteString m a
-> MiniProtocolCb (ResponderContext addr) ByteString m b
-> RunMiniProtocol
     'InitiatorResponderMode
     (ExpandedInitiatorContext addr m)
     (ResponderContext addr)
     ByteString
     m
     a
     b
forall initiatorCtx bytes (m :: * -> *) a responderCtx b.
MiniProtocolCb initiatorCtx bytes m a
-> MiniProtocolCb responderCtx bytes m b
-> RunMiniProtocol
     'InitiatorResponderMode initiatorCtx responderCtx bytes m a b
InitiatorAndResponderProtocol
           ((ExpandedInitiatorContext addr m
 -> Channel m ByteString -> m (a, Maybe ByteString))
-> MiniProtocolCb (ExpandedInitiatorContext addr m) ByteString m a
forall ctx bytes (m :: * -> *) a.
(ctx -> Channel m bytes -> m (a, Maybe bytes))
-> MiniProtocolCb ctx bytes m a
MiniProtocolCb (ClientApp addr m a
aPeerSharingClient NodeToNodeVersion
version))
           ((ResponderContext addr
 -> Channel m ByteString -> m (b, Maybe ByteString))
-> MiniProtocolCb (ResponderContext addr) ByteString m b
forall ctx bytes (m :: * -> *) a.
(ctx -> Channel m bytes -> m (a, Maybe bytes))
-> MiniProtocolCb ctx bytes m a
MiniProtocolCb (ServerApp addr m b
aPeerSharingServer NodeToNodeVersion
version))
    })
    NodeToNodeVersion
version

data Codecs crypto addr m =
  Codecs {
    forall crypto addr (m :: * -> *).
Codecs crypto addr m
-> AnnotatedCodec
     (SigSubmission crypto) DeserialiseFailure m ByteString
sigSubmissionCodec :: AnnotatedCodec (SigSubmission crypto)
                            CBOR.DeserialiseFailure m BL.ByteString
  , forall crypto addr (m :: * -> *).
Codecs crypto addr m
-> Codec KeepAlive DeserialiseFailure m ByteString
keepAliveCodec     :: Codec KeepAlive
                            CBOR.DeserialiseFailure m BL.ByteString
  , forall crypto addr (m :: * -> *).
Codecs crypto addr m
-> Codec (PeerSharing addr) DeserialiseFailure m ByteString
peerSharingCodec   :: Codec (Protocol.PeerSharing addr)
                            CBOR.DeserialiseFailure m BL.ByteString
  }

dmqCodecs :: ( Crypto crypto
             , MonadST m
             )
          => (addr -> CBOR.Encoding)
          -> (forall s. CBOR.Decoder s addr)
          -> Codecs crypto addr m
dmqCodecs :: forall crypto (m :: * -> *) addr.
(Crypto crypto, MonadST m) =>
(addr -> Encoding)
-> (forall s. Decoder s addr) -> Codecs crypto addr m
dmqCodecs addr -> Encoding
encodeAddr forall s. Decoder s addr
decodeAddr =
  Codecs {
    sigSubmissionCodec :: AnnotatedCodec
  (SigSubmission crypto) DeserialiseFailure m ByteString
sigSubmissionCodec = AnnotatedCodec
  (SigSubmission crypto) DeserialiseFailure m ByteString
forall crypto (m :: * -> *).
(Crypto crypto, MonadST m) =>
AnnotatedCodec
  (SigSubmission crypto) DeserialiseFailure m ByteString
codecSigSubmission
  , keepAliveCodec :: Codec KeepAlive DeserialiseFailure m ByteString
keepAliveCodec     = Codec KeepAlive DeserialiseFailure m ByteString
forall (m :: * -> *).
MonadST m =>
Codec KeepAlive DeserialiseFailure m ByteString
codecKeepAlive_v2
  , peerSharingCodec :: Codec (PeerSharing addr) DeserialiseFailure m ByteString
peerSharingCodec   = (addr -> Encoding)
-> (forall s. Decoder s addr)
-> Codec (PeerSharing addr) DeserialiseFailure m ByteString
forall (m :: * -> *) peerAddress.
MonadST m =>
(peerAddress -> Encoding)
-> (forall s. Decoder s peerAddress)
-> Codec (PeerSharing peerAddress) DeserialiseFailure m ByteString
codecPeerSharing addr -> Encoding
encodeAddr Decoder s addr
forall s. Decoder s addr
decodeAddr
  }

data LimitsAndTimeouts crypto addr =
  LimitsAndTimeouts {
    -- sig-submission
    forall crypto addr.
LimitsAndTimeouts crypto addr -> MiniProtocolLimits
sigSubmissionLimits
      :: MiniProtocolLimits
  , forall crypto addr.
LimitsAndTimeouts crypto addr
-> ProtocolSizeLimits (SigSubmission crypto) ByteString
sigSubmissionSizeLimits
      :: ProtocolSizeLimits (SigSubmission crypto) BL.ByteString
  , forall crypto addr.
LimitsAndTimeouts crypto addr
-> ProtocolTimeLimits (SigSubmission crypto)
sigSubmissionTimeLimits
      :: ProtocolTimeLimits (SigSubmission crypto)

    -- keep-alive
  , forall crypto addr.
LimitsAndTimeouts crypto addr -> MiniProtocolLimits
keepAliveLimits
      :: MiniProtocolLimits
  , forall crypto addr.
LimitsAndTimeouts crypto addr
-> ProtocolSizeLimits KeepAlive ByteString
keepAliveSizeLimits
      :: ProtocolSizeLimits KeepAlive BL.ByteString
  , forall crypto addr.
LimitsAndTimeouts crypto addr -> ProtocolTimeLimits KeepAlive
keepAliveTimeLimits
      :: ProtocolTimeLimits KeepAlive

    -- peer sharing
  , forall crypto addr.
LimitsAndTimeouts crypto addr -> MiniProtocolLimits
peerSharingLimits
      :: MiniProtocolLimits
  , forall crypto addr.
LimitsAndTimeouts crypto addr
-> ProtocolTimeLimits (PeerSharing addr)
peerSharingTimeLimits
      :: ProtocolTimeLimits (Protocol.PeerSharing addr)
  , forall crypto addr.
LimitsAndTimeouts crypto addr
-> ProtocolSizeLimits (PeerSharing addr) ByteString
peerSharingSizeLimits
      :: ProtocolSizeLimits (Protocol.PeerSharing addr) BL.ByteString
  }

dmqLimitsAndTimeouts :: LimitsAndTimeouts crypto addr
dmqLimitsAndTimeouts :: forall crypto addr. LimitsAndTimeouts crypto addr
dmqLimitsAndTimeouts =
    LimitsAndTimeouts {
      sigSubmissionLimits :: MiniProtocolLimits
sigSubmissionLimits =
        MiniProtocolLimits {
          -- TODO
          maximumIngressQueue :: Int
maximumIngressQueue = Int
forall a. Bounded a => a
maxBound
        }
    , sigSubmissionTimeLimits :: ProtocolTimeLimits (SigSubmission crypto)
sigSubmissionTimeLimits = ProtocolTimeLimits (SigSubmission crypto)
forall crypto. ProtocolTimeLimits (SigSubmission crypto)
timeLimitsSigSubmission
    , sigSubmissionSizeLimits :: ProtocolSizeLimits (SigSubmission crypto) ByteString
sigSubmissionSizeLimits = (ByteString -> Word)
-> ProtocolSizeLimits (SigSubmission crypto) ByteString
forall crypto bytes.
(bytes -> Word) -> ProtocolSizeLimits (SigSubmission crypto) bytes
byteLimitsSigSubmission ByteString -> Word
size

    , keepAliveLimits :: MiniProtocolLimits
keepAliveLimits     =
        MiniProtocolLimits {
          -- One small outstanding message.
          maximumIngressQueue :: Int
maximumIngressQueue = Int -> Int
addSafetyMargin Int
1280
        }

    , keepAliveTimeLimits :: ProtocolTimeLimits KeepAlive
keepAliveTimeLimits = ProtocolTimeLimits KeepAlive
timeLimitsKeepAlive
    , keepAliveSizeLimits :: ProtocolSizeLimits KeepAlive ByteString
keepAliveSizeLimits = (ByteString -> Word) -> ProtocolSizeLimits KeepAlive ByteString
forall bytes. (bytes -> Word) -> ProtocolSizeLimits KeepAlive bytes
byteLimitsKeepAlive ByteString -> Word
size

    , peerSharingLimits :: MiniProtocolLimits
peerSharingLimits   =
        MiniProtocolLimits {
          -- This protocol does not need to be pipelined and a peer can only ask
          -- for a maximum of 255 peers each time. Hence a reply can have up to
          -- 255 IP (IPv4 or IPv6) addresses so 255 * 16 = 4080. TCP has an initial
          -- window size of 4 and a TCP segment is 1440, which gives us 4 * 1440 =
          -- 5760 bytes to fit into a single RTT. So setting the maximum ingress
          -- queue to be a single RTT should be enough to cover for CBOR overhead.
          maximumIngressQueue :: Int
maximumIngressQueue = Int
4 Int -> Int -> Int
forall a. Num a => a -> a -> a
* Int
1440
        }
    , peerSharingTimeLimits :: ProtocolTimeLimits (PeerSharing addr)
peerSharingTimeLimits = ProtocolTimeLimits (PeerSharing addr)
forall peerAddress. ProtocolTimeLimits (PeerSharing peerAddress)
timeLimitsPeerSharing
    , peerSharingSizeLimits :: ProtocolSizeLimits (PeerSharing addr) ByteString
peerSharingSizeLimits = (ByteString -> Word)
-> ProtocolSizeLimits (PeerSharing addr) ByteString
forall peerAddress bytes.
(bytes -> Word)
-> ProtocolSizeLimits (PeerSharing peerAddress) bytes
byteLimitsPeerSharing ByteString -> Word
size
    }
  where
    size :: BL.ByteString -> Word
    size :: ByteString -> Word
size = Int64 -> Word
forall a b. (Integral a, Num b) => a -> b
fromIntegral (Int64 -> Word) -> (ByteString -> Int64) -> ByteString -> Word
forall b c a. (b -> c) -> (a -> b) -> a -> c
. ByteString -> Int64
BL.length


type HandshakeTr ntnAddr = Mx.WithBearer (ConnectionId ntnAddr) (TraceSendRecv (Handshake NodeToNodeVersion CBOR.Term))

ntnHandshakeArguments
  :: MonadST m
  => Tracer m (HandshakeTr ntnAddr)
  -> HandshakeArguments
      (ConnectionId ntnAddr)
      NodeToNodeVersion
      NodeToNodeVersionData
      m
ntnHandshakeArguments :: forall (m :: * -> *) ntnAddr.
MonadST m =>
Tracer m (HandshakeTr ntnAddr)
-> HandshakeArguments
     (ConnectionId ntnAddr) NodeToNodeVersion NodeToNodeVersionData m
ntnHandshakeArguments Tracer m (HandshakeTr ntnAddr)
tracer =
  HandshakeArguments {
    haHandshakeTracer :: Tracer m (HandshakeTr ntnAddr)
haHandshakeTracer  = Tracer m (HandshakeTr ntnAddr)
tracer
  , haBearerTracer :: Tracer m (WithBearer (ConnectionId ntnAddr) BearerTrace)
haBearerTracer     = Tracer m (WithBearer (ConnectionId ntnAddr) BearerTrace)
forall (m :: * -> *) a. Applicative m => Tracer m a
nullTracer -- TODO
  , haHandshakeCodec :: Codec
  (Handshake NodeToNodeVersion Term) DeserialiseFailure m ByteString
haHandshakeCodec   = CodecCBORTerm (Text, Maybe Int) NodeToNodeVersion
-> Codec
     (Handshake NodeToNodeVersion Term) DeserialiseFailure m ByteString
forall vNumber (m :: * -> *) failure.
(MonadST m, Ord vNumber, Show failure) =>
CodecCBORTerm (failure, Maybe Int) vNumber
-> Codec (Handshake vNumber Term) DeserialiseFailure m ByteString
codecHandshake CodecCBORTerm (Text, Maybe Int) NodeToNodeVersion
nodeToNodeVersionCodec
  , haVersionDataCodec :: VersionDataCodec Term NodeToNodeVersion NodeToNodeVersionData
haVersionDataCodec = (NodeToNodeVersion -> CodecCBORTerm Text NodeToNodeVersionData)
-> VersionDataCodec Term NodeToNodeVersion NodeToNodeVersionData
forall vNumber vData.
(vNumber -> CodecCBORTerm Text vData)
-> VersionDataCodec Term vNumber vData
cborTermVersionDataCodec NodeToNodeVersion -> CodecCBORTerm Text NodeToNodeVersionData
nodeToNodeCodecCBORTerm
  , haAcceptVersion :: NodeToNodeVersionData
-> NodeToNodeVersionData -> Accept NodeToNodeVersionData
haAcceptVersion    = NodeToNodeVersionData
-> NodeToNodeVersionData -> Accept NodeToNodeVersionData
forall v. Acceptable v => v -> v -> Accept v
acceptableVersion
  , haQueryVersion :: NodeToNodeVersionData -> Bool
haQueryVersion     = NodeToNodeVersionData -> Bool
forall v. Queryable v => v -> Bool
queryVersion
  , haTimeLimits :: ProtocolTimeLimits (Handshake NodeToNodeVersion Term)
haTimeLimits       = ProtocolTimeLimits (Handshake NodeToNodeVersion Term)
forall {k} (vNumber :: k).
ProtocolTimeLimits (Handshake vNumber Term)
timeLimitsHandshake
  }

stdVersionDataNTN :: NetworkMagic
                  -> DiffusionMode
                  -> PeerSharing
                  -> NodeToNodeVersionData
stdVersionDataNTN :: NetworkMagic
-> DiffusionMode -> PeerSharing -> NodeToNodeVersionData
stdVersionDataNTN NetworkMagic
networkMagic DiffusionMode
diffusionMode PeerSharing
peerSharing =
  NodeToNodeVersionData
    { NetworkMagic
networkMagic :: NetworkMagic
networkMagic :: NetworkMagic
networkMagic
    , DiffusionMode
diffusionMode :: DiffusionMode
diffusionMode :: DiffusionMode
diffusionMode
    , PeerSharing
peerSharing :: PeerSharing
peerSharing :: PeerSharing
peerSharing
    , query :: Bool
query = Bool
False
    }

-- TODO: choose wisely, is a protocol parameter.
_MAX_SIGS_TO_ACK :: NumTxIdsToAck
_MAX_SIGS_TO_ACK :: NumTxIdsToAck
_MAX_SIGS_TO_ACK = NumTxIdsToAck
20

_SIG_SUBMISSION_INIT_DELAY :: TxSubmissionInitDelay
_SIG_SUBMISSION_INIT_DELAY :: TxSubmissionInitDelay
_SIG_SUBMISSION_INIT_DELAY = TxSubmissionInitDelay
NoTxSubmissionInitDelay


-- TODO: this is duplicated code, similar function is in
-- `Cardano.Network.NodeToNode` module.
addSafetyMargin :: Int -> Int
addSafetyMargin :: Int -> Int
addSafetyMargin Int
x = Int
x Int -> Int -> Int
forall a. Num a => a -> a -> a
+ Int
x Int -> Int -> Int
forall a. Integral a => a -> a -> a
`div` Int
10