{-# LANGUAGE DataKinds           #-}
{-# LANGUAGE FlexibleContexts    #-}
{-# LANGUAGE NamedFieldPuns      #-}
{-# LANGUAGE OverloadedStrings   #-}
{-# LANGUAGE RankNTypes          #-}
{-# LANGUAGE ScopedTypeVariables #-}

module DMQ.NodeToNode
  ( RemoteAddress
  , module DMQ.NodeToNode.Version
  , ClientApp
  , ServerApp
  , Apps (..)
  , ntnApps
  , Protocols (..)
  , nodeToNodeProtocols
  , initiatorProtocols
  , initiatorAndResponderProtocols
  , dmqCodecs
  , LimitsAndTimeouts
  , dmqLimitsAndTimeouts
  , HandshakeTr
  , ntnHandshakeArguments
  , stdVersionDataNTN
  ) where


import Control.Applicative (Alternative)
import Control.Concurrent.Class.MonadMVar.Strict
import Control.Concurrent.Class.MonadSTM.Strict
import Control.Monad.Class.MonadAsync
import Control.Monad.Class.MonadFork
import Control.Monad.Class.MonadST
import Control.Monad.Class.MonadThrow
import Control.Monad.Class.MonadTimer.SI
import Control.Tracer (Tracer, nullTracer)

import Codec.CBOR.Decoding qualified as CBOR
import Codec.CBOR.Encoding qualified as CBOR
import Codec.CBOR.Read qualified as CBOR
import Codec.CBOR.Term qualified as CBOR
import Data.Aeson qualified as Aeson
import Data.ByteString.Lazy qualified as BL
import Data.Functor.Contravariant ((>$<))
import Data.Hashable (Hashable)
import Data.Void (Void)
import System.Random (mkStdGen)

import Network.Mux.Trace qualified as Mx
import Network.Mux.Types (Mode (..))
import Network.Mux.Types qualified as Mx
import Network.TypedProtocol.Codec (Codec)

import DMQ.Configuration (Configuration, Configuration' (..), I (..))
import DMQ.Diffusion.NodeKernel (NodeKernel (..))
import DMQ.NodeToNode.Version
import DMQ.Protocol.SigSubmission.Codec
import DMQ.Protocol.SigSubmission.Type
import DMQ.Tracer

import Ouroboros.Network.BlockFetch.ClientRegistry (bracketKeepAliveClient)
import Ouroboros.Network.Channel (Channel)
import Ouroboros.Network.ConnectionId (ConnectionId (..))
import Ouroboros.Network.Context (ExpandedInitiatorContext (..),
           ResponderContext (..))
import Ouroboros.Network.Driver.Limits (runPeerWithLimits,
           runPipelinedPeerWithLimits)
import Ouroboros.Network.Driver.Simple (TraceSendRecv)
import Ouroboros.Network.Handshake.Acceptable (Acceptable (..))
import Ouroboros.Network.Handshake.Queryable (Queryable (..))
import Ouroboros.Network.KeepAlive (KeepAliveInterval (..), keepAliveClient,
           keepAliveServer)
import Ouroboros.Network.Magic (NetworkMagic (..))
import Ouroboros.Network.Mux (MiniProtocol (..), MiniProtocolCb (..),
           MiniProtocolLimits (..), OuroborosBundle,
           OuroborosBundleWithExpandedCtx, RunMiniProtocol (..),
           StartOnDemandOrEagerly (..), TemperatureBundle (..),
           WithProtocolTemperature (..))
import Ouroboros.Network.NodeToNode.Version (DiffusionMode (..))
import Ouroboros.Network.PeerSelection (PeerSharing (..))
import Ouroboros.Network.PeerSharing (bracketPeerSharingClient,
           peerSharingClient, peerSharingServer)
import Ouroboros.Network.Snocket (RemoteAddress)
import Ouroboros.Network.TxSubmission.Inbound.V2 as SigSubmission
import Ouroboros.Network.TxSubmission.Mempool.Simple qualified as Mempool
import Ouroboros.Network.TxSubmission.Outbound

import Ouroboros.Network.OrphanInstances ()

import Ouroboros.Network.Protocol.Handshake (Handshake, HandshakeArguments (..))
import Ouroboros.Network.Protocol.Handshake.Codec (cborTermVersionDataCodec,
           codecHandshake, timeLimitsHandshake)
import Ouroboros.Network.Protocol.KeepAlive.Client (keepAliveClientPeer)
import Ouroboros.Network.Protocol.KeepAlive.Codec (byteLimitsKeepAlive,
           codecKeepAlive_v2, timeLimitsKeepAlive)
import Ouroboros.Network.Protocol.KeepAlive.Server (keepAliveServerPeer)
import Ouroboros.Network.Protocol.KeepAlive.Type (KeepAlive)
import Ouroboros.Network.Protocol.Limits (ProtocolSizeLimits,
           ProtocolTimeLimits)
import Ouroboros.Network.Protocol.PeerSharing.Client (peerSharingClientPeer)
import Ouroboros.Network.Protocol.PeerSharing.Codec (byteLimitsPeerSharing,
           codecPeerSharing, timeLimitsPeerSharing)
import Ouroboros.Network.Protocol.PeerSharing.Server (peerSharingServerPeer)
import Ouroboros.Network.Protocol.PeerSharing.Type qualified as Protocol
import Ouroboros.Network.Protocol.TxSubmission2.Client (txSubmissionClientPeer)
import Ouroboros.Network.Protocol.TxSubmission2.Server
           (txSubmissionServerPeerPipelined)

-- TODO: if we add `versionNumber` to `ctx` we could use `RunMiniProtocolCb`.
-- This makes sense, since `ctx` already contains `versionData`.
type ClientApp addr m a =
     NodeToNodeVersion
  -> ExpandedInitiatorContext addr m
  -> Channel m BL.ByteString
  -> m (a, Maybe BL.ByteString)

type ServerApp addr m a =
     NodeToNodeVersion
  -> ResponderContext addr
  -> Channel m BL.ByteString
  -> m (a, Maybe BL.ByteString)

data Apps addr m a b =
  Apps {
    -- | Start a sig-submission client
    forall addr (m :: * -> *) a b.
Apps addr m a b -> ClientApp addr m a
aSigSubmissionClient :: ClientApp addr m a

    -- | Start a sig-submission server
  , forall addr (m :: * -> *) a b.
Apps addr m a b -> ServerApp addr m b
aSigSubmissionServer :: ServerApp addr m b

    -- | Start a keep-alive client.
  , forall addr (m :: * -> *) a b.
Apps addr m a b -> ClientApp addr m a
aKeepAliveClient     :: ClientApp addr m a

    -- | Start a keep-alive server.
  , forall addr (m :: * -> *) a b.
Apps addr m a b -> ServerApp addr m b
aKeepAliveServer     :: ServerApp addr m b

    -- | Start a peer-sharing client.
  , forall addr (m :: * -> *) a b.
Apps addr m a b -> ClientApp addr m a
aPeerSharingClient   :: ClientApp addr m a

    -- | Start a peer-sharing server.
  , forall addr (m :: * -> *) a b.
Apps addr m a b -> ServerApp addr m b
aPeerSharingServer   :: ServerApp addr m b
  }

ntnApps
  :: forall m addr .
    ( Alternative (STM m)
    , MonadAsync m
    , MonadDelay m
    , MonadFork m
    , MonadMask m
    , MonadMVar m
    , MonadThrow (STM m)
    , MonadTimer m
    , Ord addr
    , Show addr
    , Hashable addr
    , Aeson.ToJSON addr
    )
 => (forall ev. Aeson.ToJSON ev => Tracer m (WithEventType ev))
 -> Configuration
 -> NodeKernel addr m
 -> Codecs addr m
 -> LimitsAndTimeouts addr
 -> TxDecisionPolicy
 -> Apps addr m () ()
ntnApps :: forall (m :: * -> *) addr.
(Alternative (STM m), MonadAsync m, MonadDelay m, MonadFork m,
 MonadMask m, MonadMVar m, MonadThrow (STM m), MonadTimer m,
 Ord addr, Show addr, Hashable addr, ToJSON addr) =>
(forall ev. ToJSON ev => Tracer m (WithEventType ev))
-> Configuration
-> NodeKernel addr m
-> Codecs addr m
-> LimitsAndTimeouts addr
-> TxDecisionPolicy
-> Apps addr m () ()
ntnApps
    forall ev. ToJSON ev => Tracer m (WithEventType ev)
tracer
    Configuration {
      dmqcSigSubmissionClientTracer :: forall (f :: * -> *). Configuration' f -> f Bool
dmqcSigSubmissionClientTracer = I Bool
sigSubmissionClientTracer
    , dmqcSigSubmissionServerTracer :: forall (f :: * -> *). Configuration' f -> f Bool
dmqcSigSubmissionServerTracer = I Bool
sigSubmissionServerTracer
    , dmqcKeepAliveClientTracer :: forall (f :: * -> *). Configuration' f -> f Bool
dmqcKeepAliveClientTracer     = I Bool
keepAliveClientTracer
    , dmqcKeepAliveServerTracer :: forall (f :: * -> *). Configuration' f -> f Bool
dmqcKeepAliveServerTracer     = I Bool
keepAliveServerTracer
    , dmqcPeerSharingClientTracer :: forall (f :: * -> *). Configuration' f -> f Bool
dmqcPeerSharingClientTracer   = I Bool
peerSharingClientTracer
    , dmqcPeerSharingServerTracer :: forall (f :: * -> *). Configuration' f -> f Bool
dmqcPeerSharingServerTracer   = I Bool
peerSharingServerTracer
    }
    NodeKernel {
      FetchClientRegistry (ConnectionId addr) () () m
fetchClientRegistry :: FetchClientRegistry (ConnectionId addr) () () m
fetchClientRegistry :: forall ntnAddr (m :: * -> *).
NodeKernel ntnAddr m
-> FetchClientRegistry (ConnectionId ntnAddr) () () m
fetchClientRegistry
    , PeerSharingRegistry addr m
peerSharingRegistry :: PeerSharingRegistry addr m
peerSharingRegistry :: forall ntnAddr (m :: * -> *).
NodeKernel ntnAddr m -> PeerSharingRegistry ntnAddr m
peerSharingRegistry
    , PeerSharingAPI addr StdGen m
peerSharingAPI :: PeerSharingAPI addr StdGen m
peerSharingAPI :: forall ntnAddr (m :: * -> *).
NodeKernel ntnAddr m -> PeerSharingAPI ntnAddr StdGen m
peerSharingAPI
    , Mempool m Sig
mempool :: Mempool m Sig
mempool :: forall ntnAddr (m :: * -> *). NodeKernel ntnAddr m -> Mempool m Sig
mempool
    , TxChannelsVar m addr SigId Sig
sigChannelVar :: TxChannelsVar m addr SigId Sig
sigChannelVar :: forall ntnAddr (m :: * -> *).
NodeKernel ntnAddr m -> TxChannelsVar m ntnAddr SigId Sig
sigChannelVar
    , TxMempoolSem m
sigMempoolSem :: TxMempoolSem m
sigMempoolSem :: forall ntnAddr (m :: * -> *).
NodeKernel ntnAddr m -> TxMempoolSem m
sigMempoolSem
    , SharedTxStateVar m addr SigId Sig
sigSharedTxStateVar :: SharedTxStateVar m addr SigId Sig
sigSharedTxStateVar :: forall ntnAddr (m :: * -> *).
NodeKernel ntnAddr m -> SharedTxStateVar m ntnAddr SigId Sig
sigSharedTxStateVar
    }
    Codecs {
      Codec SigSubmission DeserialiseFailure m ByteString
sigSubmissionCodec :: Codec SigSubmission DeserialiseFailure m ByteString
sigSubmissionCodec :: forall addr (m :: * -> *).
Codecs addr m
-> Codec SigSubmission DeserialiseFailure m ByteString
sigSubmissionCodec
    , Codec KeepAlive DeserialiseFailure m ByteString
keepAliveCodec :: Codec KeepAlive DeserialiseFailure m ByteString
keepAliveCodec :: forall addr (m :: * -> *).
Codecs addr m -> Codec KeepAlive DeserialiseFailure m ByteString
keepAliveCodec
    , Codec (PeerSharing addr) DeserialiseFailure m ByteString
peerSharingCodec :: Codec (PeerSharing addr) DeserialiseFailure m ByteString
peerSharingCodec :: forall addr (m :: * -> *).
Codecs addr m
-> Codec (PeerSharing addr) DeserialiseFailure m ByteString
peerSharingCodec
    }
    LimitsAndTimeouts {
      ProtocolSizeLimits SigSubmission ByteString
sigSubmissionSizeLimits :: ProtocolSizeLimits SigSubmission ByteString
sigSubmissionSizeLimits :: forall addr.
LimitsAndTimeouts addr
-> ProtocolSizeLimits SigSubmission ByteString
sigSubmissionSizeLimits
    , ProtocolTimeLimits SigSubmission
sigSubmissionTimeLimits :: ProtocolTimeLimits SigSubmission
sigSubmissionTimeLimits :: forall addr.
LimitsAndTimeouts addr -> ProtocolTimeLimits SigSubmission
sigSubmissionTimeLimits
    , ProtocolSizeLimits KeepAlive ByteString
keepAliveSizeLimits :: ProtocolSizeLimits KeepAlive ByteString
keepAliveSizeLimits :: forall addr.
LimitsAndTimeouts addr -> ProtocolSizeLimits KeepAlive ByteString
keepAliveSizeLimits
    , ProtocolTimeLimits KeepAlive
keepAliveTimeLimits :: ProtocolTimeLimits KeepAlive
keepAliveTimeLimits :: forall addr. LimitsAndTimeouts addr -> ProtocolTimeLimits KeepAlive
keepAliveTimeLimits
    , ProtocolTimeLimits (PeerSharing addr)
peerSharingTimeLimits :: ProtocolTimeLimits (PeerSharing addr)
peerSharingTimeLimits :: forall addr.
LimitsAndTimeouts addr -> ProtocolTimeLimits (PeerSharing addr)
peerSharingTimeLimits
    , ProtocolSizeLimits (PeerSharing addr) ByteString
peerSharingSizeLimits :: ProtocolSizeLimits (PeerSharing addr) ByteString
peerSharingSizeLimits :: forall addr.
LimitsAndTimeouts addr
-> ProtocolSizeLimits (PeerSharing addr) ByteString
peerSharingSizeLimits
    }
    TxDecisionPolicy
sigDecisionPolicy
    =
    Apps {
      ClientApp addr m ()
aSigSubmissionClient :: ClientApp addr m ()
aSigSubmissionClient :: ClientApp addr m ()
aSigSubmissionClient
    , ServerApp addr m ()
aSigSubmissionServer :: ServerApp addr m ()
aSigSubmissionServer :: ServerApp addr m ()
aSigSubmissionServer
    , ClientApp addr m ()
aKeepAliveClient :: ClientApp addr m ()
aKeepAliveClient :: ClientApp addr m ()
aKeepAliveClient
    , ServerApp addr m ()
aKeepAliveServer :: ServerApp addr m ()
aKeepAliveServer :: ServerApp addr m ()
aKeepAliveServer
    , ClientApp addr m ()
aPeerSharingClient :: ClientApp addr m ()
aPeerSharingClient :: ClientApp addr m ()
aPeerSharingClient
    , ServerApp addr m ()
aPeerSharingServer :: ServerApp addr m ()
aPeerSharingServer :: ServerApp addr m ()
aPeerSharingServer
    }
  where
    sigSize :: Sig -> SizeInBytes
    sigSize :: Sig -> SizeInBytes
sigSize Sig
_ = SizeInBytes
0 -- TODO

    mempoolReader :: TxSubmissionMempoolReader SigId Sig Int m
mempoolReader = (Sig -> SigId)
-> (Sig -> SizeInBytes)
-> Mempool m Sig
-> TxSubmissionMempoolReader SigId Sig Int m
forall tx txid (m :: * -> *).
(MonadSTM m, Eq txid) =>
(tx -> txid)
-> (tx -> SizeInBytes)
-> Mempool m tx
-> TxSubmissionMempoolReader txid tx Int m
Mempool.getReader Sig -> SigId
sigId Sig -> SizeInBytes
sigSize Mempool m Sig
mempool
    mempoolWriter :: TxSubmissionMempoolWriter SigId Sig Int m
mempoolWriter = (Sig -> SigId)
-> (Sig -> Bool)
-> Mempool m Sig
-> TxSubmissionMempoolWriter SigId Sig Int m
forall tx txid (m :: * -> *).
(MonadSTM m, Ord txid) =>
(tx -> txid)
-> (tx -> Bool)
-> Mempool m tx
-> TxSubmissionMempoolWriter txid tx Int m
Mempool.getWriter Sig -> SigId
sigId Sig -> Bool
sigValid Mempool m Sig
mempool
      where
        -- Note: invalid signatures are just omitted from the mempool. For DMQ
        -- we need to validate signatures when we received them, and shutdown
        -- connection if we receive one, rather than validate them in the
        -- mempool.
        sigValid :: Sig -> Bool
        sigValid :: Sig -> Bool
sigValid Sig
_ = Bool
True

    aSigSubmissionClient
      :: NodeToNodeVersion
      -> ExpandedInitiatorContext addr m
      -> Channel m BL.ByteString
      -> m ((), Maybe BL.ByteString)
    aSigSubmissionClient :: ClientApp addr m ()
aSigSubmissionClient NodeToNodeVersion
version
                         ExpandedInitiatorContext {
                           eicConnectionId :: forall addr (m :: * -> *).
ExpandedInitiatorContext addr m -> ConnectionId addr
eicConnectionId   = ConnectionId addr
connId,
                           eicControlMessage :: forall addr (m :: * -> *).
ExpandedInitiatorContext addr m -> ControlMessageSTM m
eicControlMessage = ControlMessageSTM m
controlMessage
                         } Channel m ByteString
channel =
      Tracer m (TraceSendRecv SigSubmission)
-> Codec SigSubmission DeserialiseFailure m ByteString
-> ProtocolSizeLimits SigSubmission ByteString
-> ProtocolTimeLimits SigSubmission
-> Channel m ByteString
-> Peer SigSubmission 'AsClient 'NonPipelined 'StInit m ()
-> m ((), Maybe ByteString)
forall ps (st :: ps) (pr :: PeerRole) failure bytes (m :: * -> *)
       a.
(MonadAsync m, MonadFork m, MonadMask m, MonadThrow (STM m),
 MonadTimer m, ShowProxy ps,
 forall (st' :: ps) stok. (stok ~ StateToken st') => Show stok,
 Show failure) =>
Tracer m (TraceSendRecv ps)
-> Codec ps failure m bytes
-> ProtocolSizeLimits ps bytes
-> ProtocolTimeLimits ps
-> Channel m bytes
-> Peer ps pr 'NonPipelined st m a
-> m (a, Maybe bytes)
runPeerWithLimits
        (if Bool
sigSubmissionClientTracer
          then String
-> WithBearer (ConnectionId addr) (TraceSendRecv SigSubmission)
-> WithEventType
     (WithBearer (ConnectionId addr) (TraceSendRecv SigSubmission))
forall a. String -> a -> WithEventType a
WithEventType String
"SigSubmissionClient" (WithBearer (ConnectionId addr) (TraceSendRecv SigSubmission)
 -> WithEventType
      (WithBearer (ConnectionId addr) (TraceSendRecv SigSubmission)))
-> (TraceSendRecv SigSubmission
    -> WithBearer (ConnectionId addr) (TraceSendRecv SigSubmission))
-> TraceSendRecv SigSubmission
-> WithEventType
     (WithBearer (ConnectionId addr) (TraceSendRecv SigSubmission))
forall b c a. (b -> c) -> (a -> b) -> a -> c
. ConnectionId addr
-> TraceSendRecv SigSubmission
-> WithBearer (ConnectionId addr) (TraceSendRecv SigSubmission)
forall peerid a. peerid -> a -> WithBearer peerid a
Mx.WithBearer ConnectionId addr
connId (TraceSendRecv SigSubmission
 -> WithEventType
      (WithBearer (ConnectionId addr) (TraceSendRecv SigSubmission)))
-> Tracer
     m
     (WithEventType
        (WithBearer (ConnectionId addr) (TraceSendRecv SigSubmission)))
-> Tracer m (TraceSendRecv SigSubmission)
forall (f :: * -> *) a b. Contravariant f => (a -> b) -> f b -> f a
>$< Tracer
  m
  (WithEventType
     (WithBearer (ConnectionId addr) (TraceSendRecv SigSubmission)))
forall ev. ToJSON ev => Tracer m (WithEventType ev)
tracer
          else Tracer m (TraceSendRecv SigSubmission)
forall (m :: * -> *) a. Applicative m => Tracer m a
nullTracer)
        Codec SigSubmission DeserialiseFailure m ByteString
sigSubmissionCodec
        ProtocolSizeLimits SigSubmission ByteString
sigSubmissionSizeLimits
        ProtocolTimeLimits SigSubmission
sigSubmissionTimeLimits
        Channel m ByteString
channel
        (Peer SigSubmission 'AsClient 'NonPipelined 'StInit m ()
 -> m ((), Maybe ByteString))
-> Peer SigSubmission 'AsClient 'NonPipelined 'StInit m ()
-> m ((), Maybe ByteString)
forall a b. (a -> b) -> a -> b
$ TxSubmissionClient SigId Sig m ()
-> Peer SigSubmission 'AsClient 'NonPipelined 'StInit m ()
forall txid tx (m :: * -> *) a.
Monad m =>
TxSubmissionClient txid tx m a
-> Client (TxSubmission2 txid tx) 'NonPipelined 'StInit m a
txSubmissionClientPeer
        (TxSubmissionClient SigId Sig m ()
 -> Peer SigSubmission 'AsClient 'NonPipelined 'StInit m ())
-> TxSubmissionClient SigId Sig m ()
-> Peer SigSubmission 'AsClient 'NonPipelined 'StInit m ()
forall a b. (a -> b) -> a -> b
$ Tracer m (TraceTxSubmissionOutbound SigId Sig)
-> NumTxIdsToAck
-> TxSubmissionMempoolReader SigId Sig Int m
-> NodeToNodeVersion
-> ControlMessageSTM m
-> TxSubmissionClient SigId Sig m ()
forall version txid tx idx (m :: * -> *).
(Ord txid, Ord idx, MonadSTM m, MonadThrow m) =>
Tracer m (TraceTxSubmissionOutbound txid tx)
-> NumTxIdsToAck
-> TxSubmissionMempoolReader txid tx idx m
-> version
-> ControlMessageSTM m
-> TxSubmissionClient txid tx m ()
txSubmissionOutbound
            Tracer m (TraceTxSubmissionOutbound SigId Sig)
forall (m :: * -> *) a. Applicative m => Tracer m a
nullTracer
            NumTxIdsToAck
_MAX_SIGS_TO_ACK
            TxSubmissionMempoolReader SigId Sig Int m
mempoolReader
            NodeToNodeVersion
version
            ControlMessageSTM m
controlMessage


    aSigSubmissionServer
      :: NodeToNodeVersion
      -> ResponderContext addr
      -> Channel m BL.ByteString
      -> m ((), Maybe BL.ByteString)
    aSigSubmissionServer :: ServerApp addr m ()
aSigSubmissionServer NodeToNodeVersion
_version ResponderContext { rcConnectionId :: forall addr. ResponderContext addr -> ConnectionId addr
rcConnectionId = ConnectionId addr
connId } Channel m ByteString
channel =
        Tracer m (TraceTxLogic addr SigId Sig)
-> TxChannelsVar m addr SigId Sig
-> TxMempoolSem m
-> TxDecisionPolicy
-> SharedTxStateVar m addr SigId Sig
-> TxSubmissionMempoolReader SigId Sig Int m
-> TxSubmissionMempoolWriter SigId Sig Int m
-> (Sig -> SizeInBytes)
-> addr
-> (PeerTxAPI m SigId Sig -> m ((), Maybe ByteString))
-> m ((), Maybe ByteString)
forall tx peeraddr txid idx (m :: * -> *) a.
(MonadMask m, MonadMVar m, MonadSTM m, MonadMonotonicTime m,
 Ord txid, Show txid, Typeable txid, Ord peeraddr, Show peeraddr) =>
Tracer m (TraceTxLogic peeraddr txid tx)
-> TxChannelsVar m peeraddr txid tx
-> TxMempoolSem m
-> TxDecisionPolicy
-> SharedTxStateVar m peeraddr txid tx
-> TxSubmissionMempoolReader txid tx idx m
-> TxSubmissionMempoolWriter txid tx idx m
-> (tx -> SizeInBytes)
-> peeraddr
-> (PeerTxAPI m txid tx -> m a)
-> m a
SigSubmission.withPeer
          Tracer m (TraceTxLogic addr SigId Sig)
forall (m :: * -> *) a. Applicative m => Tracer m a
nullTracer
          TxChannelsVar m addr SigId Sig
sigChannelVar
          TxMempoolSem m
sigMempoolSem
          TxDecisionPolicy
sigDecisionPolicy
          SharedTxStateVar m addr SigId Sig
sigSharedTxStateVar
          TxSubmissionMempoolReader SigId Sig Int m
mempoolReader
          TxSubmissionMempoolWriter SigId Sig Int m
mempoolWriter
          Sig -> SizeInBytes
sigSize
          (ConnectionId addr -> addr
forall addr. ConnectionId addr -> addr
remoteAddress ConnectionId addr
connId)
          ((PeerTxAPI m SigId Sig -> m ((), Maybe ByteString))
 -> m ((), Maybe ByteString))
-> (PeerTxAPI m SigId Sig -> m ((), Maybe ByteString))
-> m ((), Maybe ByteString)
forall a b. (a -> b) -> a -> b
$ \(PeerTxAPI m SigId Sig
peerSigAPI :: PeerTxAPI m SigId Sig) ->
            Tracer m (TraceSendRecv SigSubmission)
-> Codec SigSubmission DeserialiseFailure m ByteString
-> ProtocolSizeLimits SigSubmission ByteString
-> ProtocolTimeLimits SigSubmission
-> Channel m ByteString
-> PeerPipelined SigSubmission 'AsServer 'StInit m ()
-> m ((), Maybe ByteString)
forall ps (st :: ps) (pr :: PeerRole) failure bytes (m :: * -> *)
       a.
(MonadAsync m, MonadFork m, MonadMask m, MonadTimer m,
 MonadThrow (STM m), ShowProxy ps,
 forall (st' :: ps) stok. (stok ~ StateToken st') => Show stok,
 Show failure) =>
Tracer m (TraceSendRecv ps)
-> Codec ps failure m bytes
-> ProtocolSizeLimits ps bytes
-> ProtocolTimeLimits ps
-> Channel m bytes
-> PeerPipelined ps pr st m a
-> m (a, Maybe bytes)
runPipelinedPeerWithLimits
              (if Bool
sigSubmissionServerTracer
                 then String
-> WithBearer (ConnectionId addr) (TraceSendRecv SigSubmission)
-> WithEventType
     (WithBearer (ConnectionId addr) (TraceSendRecv SigSubmission))
forall a. String -> a -> WithEventType a
WithEventType String
"SigSubmissionServer" (WithBearer (ConnectionId addr) (TraceSendRecv SigSubmission)
 -> WithEventType
      (WithBearer (ConnectionId addr) (TraceSendRecv SigSubmission)))
-> (TraceSendRecv SigSubmission
    -> WithBearer (ConnectionId addr) (TraceSendRecv SigSubmission))
-> TraceSendRecv SigSubmission
-> WithEventType
     (WithBearer (ConnectionId addr) (TraceSendRecv SigSubmission))
forall b c a. (b -> c) -> (a -> b) -> a -> c
. ConnectionId addr
-> TraceSendRecv SigSubmission
-> WithBearer (ConnectionId addr) (TraceSendRecv SigSubmission)
forall peerid a. peerid -> a -> WithBearer peerid a
Mx.WithBearer ConnectionId addr
connId (TraceSendRecv SigSubmission
 -> WithEventType
      (WithBearer (ConnectionId addr) (TraceSendRecv SigSubmission)))
-> Tracer
     m
     (WithEventType
        (WithBearer (ConnectionId addr) (TraceSendRecv SigSubmission)))
-> Tracer m (TraceSendRecv SigSubmission)
forall (f :: * -> *) a b. Contravariant f => (a -> b) -> f b -> f a
>$< Tracer
  m
  (WithEventType
     (WithBearer (ConnectionId addr) (TraceSendRecv SigSubmission)))
forall ev. ToJSON ev => Tracer m (WithEventType ev)
tracer
                 else Tracer m (TraceSendRecv SigSubmission)
forall (m :: * -> *) a. Applicative m => Tracer m a
nullTracer)
              Codec SigSubmission DeserialiseFailure m ByteString
sigSubmissionCodec
              ProtocolSizeLimits SigSubmission ByteString
sigSubmissionSizeLimits
              ProtocolTimeLimits SigSubmission
sigSubmissionTimeLimits
              Channel m ByteString
channel
              (PeerPipelined SigSubmission 'AsServer 'StInit m ()
 -> m ((), Maybe ByteString))
-> PeerPipelined SigSubmission 'AsServer 'StInit m ()
-> m ((), Maybe ByteString)
forall a b. (a -> b) -> a -> b
$ TxSubmissionServerPipelined SigId Sig m ()
-> PeerPipelined SigSubmission 'AsServer 'StInit m ()
forall txid tx (m :: * -> *) a.
Functor m =>
TxSubmissionServerPipelined txid tx m a
-> ServerPipelined (TxSubmission2 txid tx) 'StInit m a
txSubmissionServerPeerPipelined
              (TxSubmissionServerPipelined SigId Sig m ()
 -> PeerPipelined SigSubmission 'AsServer 'StInit m ())
-> TxSubmissionServerPipelined SigId Sig m ()
-> PeerPipelined SigSubmission 'AsServer 'StInit m ()
forall a b. (a -> b) -> a -> b
$ Tracer m (TraceTxSubmissionInbound SigId Sig)
-> TxSubmissionInitDelay
-> TxSubmissionMempoolWriter SigId Sig Int m
-> PeerTxAPI m SigId Sig
-> TxSubmissionServerPipelined SigId Sig m ()
forall txid tx idx (m :: * -> *).
(MonadDelay m, MonadThrow m, Ord txid) =>
Tracer m (TraceTxSubmissionInbound txid tx)
-> TxSubmissionInitDelay
-> TxSubmissionMempoolWriter txid tx idx m
-> PeerTxAPI m txid tx
-> TxSubmissionServerPipelined txid tx m ()
txSubmissionInboundV2
                  Tracer m (TraceTxSubmissionInbound SigId Sig)
forall (m :: * -> *) a. Applicative m => Tracer m a
nullTracer
                  TxSubmissionInitDelay
_SIG_SUBMISSION_INIT_DELAY
                  TxSubmissionMempoolWriter SigId Sig Int m
mempoolWriter
                  PeerTxAPI m SigId Sig
peerSigAPI


    aKeepAliveClient
      :: NodeToNodeVersion
      -> ExpandedInitiatorContext addr m
      -> Channel m BL.ByteString
      -> m ((), Maybe BL.ByteString)
    aKeepAliveClient :: ClientApp addr m ()
aKeepAliveClient NodeToNodeVersion
_version
                     ExpandedInitiatorContext {
                       eicConnectionId :: forall addr (m :: * -> *).
ExpandedInitiatorContext addr m -> ConnectionId addr
eicConnectionId   = ConnectionId addr
connId
                     , eicControlMessage :: forall addr (m :: * -> *).
ExpandedInitiatorContext addr m -> ControlMessageSTM m
eicControlMessage = ControlMessageSTM m
controlMessageSTM
                     }
                     Channel m ByteString
channel = do
      String -> m ()
forall (m :: * -> *). MonadThread m => String -> m ()
labelThisThread String
"KeepAliveClient"
      let kacApp :: StrictTVar m (Map (ConnectionId addr) PeerGSV)
-> m ((), Maybe ByteString)
kacApp StrictTVar m (Map (ConnectionId addr) PeerGSV)
dqCtx =
            Tracer m (TraceSendRecv KeepAlive)
-> Codec KeepAlive DeserialiseFailure m ByteString
-> ProtocolSizeLimits KeepAlive ByteString
-> ProtocolTimeLimits KeepAlive
-> Channel m ByteString
-> Peer KeepAlive 'AsClient 'NonPipelined 'StClient m ()
-> m ((), Maybe ByteString)
forall ps (st :: ps) (pr :: PeerRole) failure bytes (m :: * -> *)
       a.
(MonadAsync m, MonadFork m, MonadMask m, MonadThrow (STM m),
 MonadTimer m, ShowProxy ps,
 forall (st' :: ps) stok. (stok ~ StateToken st') => Show stok,
 Show failure) =>
Tracer m (TraceSendRecv ps)
-> Codec ps failure m bytes
-> ProtocolSizeLimits ps bytes
-> ProtocolTimeLimits ps
-> Channel m bytes
-> Peer ps pr 'NonPipelined st m a
-> m (a, Maybe bytes)
runPeerWithLimits
              (if Bool
keepAliveClientTracer
                 then String
-> WithBearer (ConnectionId addr) (TraceSendRecv KeepAlive)
-> WithEventType
     (WithBearer (ConnectionId addr) (TraceSendRecv KeepAlive))
forall a. String -> a -> WithEventType a
WithEventType String
"KeepAliveClient" (WithBearer (ConnectionId addr) (TraceSendRecv KeepAlive)
 -> WithEventType
      (WithBearer (ConnectionId addr) (TraceSendRecv KeepAlive)))
-> (TraceSendRecv KeepAlive
    -> WithBearer (ConnectionId addr) (TraceSendRecv KeepAlive))
-> TraceSendRecv KeepAlive
-> WithEventType
     (WithBearer (ConnectionId addr) (TraceSendRecv KeepAlive))
forall b c a. (b -> c) -> (a -> b) -> a -> c
. ConnectionId addr
-> TraceSendRecv KeepAlive
-> WithBearer (ConnectionId addr) (TraceSendRecv KeepAlive)
forall peerid a. peerid -> a -> WithBearer peerid a
Mx.WithBearer ConnectionId addr
connId (TraceSendRecv KeepAlive
 -> WithEventType
      (WithBearer (ConnectionId addr) (TraceSendRecv KeepAlive)))
-> Tracer
     m
     (WithEventType
        (WithBearer (ConnectionId addr) (TraceSendRecv KeepAlive)))
-> Tracer m (TraceSendRecv KeepAlive)
forall (f :: * -> *) a b. Contravariant f => (a -> b) -> f b -> f a
>$< Tracer
  m
  (WithEventType
     (WithBearer (ConnectionId addr) (TraceSendRecv KeepAlive)))
forall ev. ToJSON ev => Tracer m (WithEventType ev)
tracer
                 else Tracer m (TraceSendRecv KeepAlive)
forall (m :: * -> *) a. Applicative m => Tracer m a
nullTracer)
              Codec KeepAlive DeserialiseFailure m ByteString
keepAliveCodec
              ProtocolSizeLimits KeepAlive ByteString
keepAliveSizeLimits
              ProtocolTimeLimits KeepAlive
keepAliveTimeLimits
              Channel m ByteString
channel
              (Peer KeepAlive 'AsClient 'NonPipelined 'StClient m ()
 -> m ((), Maybe ByteString))
-> Peer KeepAlive 'AsClient 'NonPipelined 'StClient m ()
-> m ((), Maybe ByteString)
forall a b. (a -> b) -> a -> b
$ KeepAliveClient m ()
-> Peer KeepAlive 'AsClient 'NonPipelined 'StClient m ()
forall (m :: * -> *) a.
MonadThrow m =>
KeepAliveClient m a -> Client KeepAlive 'NonPipelined 'StClient m a
keepAliveClientPeer
              (KeepAliveClient m ()
 -> Peer KeepAlive 'AsClient 'NonPipelined 'StClient m ())
-> KeepAliveClient m ()
-> Peer KeepAlive 'AsClient 'NonPipelined 'StClient m ()
forall a b. (a -> b) -> a -> b
$ Tracer m (TraceKeepAliveClient (ConnectionId addr))
-> StdGen
-> ControlMessageSTM m
-> ConnectionId addr
-> StrictTVar m (Map (ConnectionId addr) PeerGSV)
-> KeepAliveInterval
-> KeepAliveClient m ()
forall (m :: * -> *) peer.
(MonadTimer m, Ord peer) =>
Tracer m (TraceKeepAliveClient peer)
-> StdGen
-> ControlMessageSTM m
-> peer
-> StrictTVar m (Map peer PeerGSV)
-> KeepAliveInterval
-> KeepAliveClient m ()
keepAliveClient Tracer m (TraceKeepAliveClient (ConnectionId addr))
forall (m :: * -> *) a. Applicative m => Tracer m a
nullTracer
                                (Int -> StdGen
mkStdGen Int
0)
                                ControlMessageSTM m
controlMessageSTM
                                ConnectionId addr
connId
                                StrictTVar m (Map (ConnectionId addr) PeerGSV)
dqCtx
                                (DiffTime -> KeepAliveInterval
KeepAliveInterval DiffTime
10)

      ((), trailing) <- FetchClientRegistry (ConnectionId addr) () () m
-> ConnectionId addr
-> (StrictTVar m (Map (ConnectionId addr) PeerGSV)
    -> m ((), Maybe ByteString))
-> m ((), Maybe ByteString)
forall (m :: * -> *) a peer header block.
(MonadSTM m, MonadFork m, MonadMask m, Ord peer) =>
FetchClientRegistry peer header block m
-> peer -> (StrictTVar m (Map peer PeerGSV) -> m a) -> m a
bracketKeepAliveClient FetchClientRegistry (ConnectionId addr) () () m
fetchClientRegistry ConnectionId addr
connId StrictTVar m (Map (ConnectionId addr) PeerGSV)
-> m ((), Maybe ByteString)
kacApp
      return ((), trailing)

    aKeepAliveServer
      :: NodeToNodeVersion
      -> ResponderContext addr
      -> Channel m BL.ByteString
      -> m ((), Maybe BL.ByteString)
    aKeepAliveServer :: ServerApp addr m ()
aKeepAliveServer NodeToNodeVersion
_version
                     ResponderContext {
                       rcConnectionId :: forall addr. ResponderContext addr -> ConnectionId addr
rcConnectionId = ConnectionId addr
connId
                     }
                     Channel m ByteString
channel = do
      String -> m ()
forall (m :: * -> *). MonadThread m => String -> m ()
labelThisThread String
"KeepAliveServer"
      Tracer m (TraceSendRecv KeepAlive)
-> Codec KeepAlive DeserialiseFailure m ByteString
-> ProtocolSizeLimits KeepAlive ByteString
-> ProtocolTimeLimits KeepAlive
-> Channel m ByteString
-> Peer KeepAlive 'AsServer 'NonPipelined 'StClient m ()
-> m ((), Maybe ByteString)
forall ps (st :: ps) (pr :: PeerRole) failure bytes (m :: * -> *)
       a.
(MonadAsync m, MonadFork m, MonadMask m, MonadThrow (STM m),
 MonadTimer m, ShowProxy ps,
 forall (st' :: ps) stok. (stok ~ StateToken st') => Show stok,
 Show failure) =>
Tracer m (TraceSendRecv ps)
-> Codec ps failure m bytes
-> ProtocolSizeLimits ps bytes
-> ProtocolTimeLimits ps
-> Channel m bytes
-> Peer ps pr 'NonPipelined st m a
-> m (a, Maybe bytes)
runPeerWithLimits
        (if Bool
keepAliveServerTracer
           then String
-> WithBearer (ConnectionId addr) (TraceSendRecv KeepAlive)
-> WithEventType
     (WithBearer (ConnectionId addr) (TraceSendRecv KeepAlive))
forall a. String -> a -> WithEventType a
WithEventType String
"KeepAliveServer" (WithBearer (ConnectionId addr) (TraceSendRecv KeepAlive)
 -> WithEventType
      (WithBearer (ConnectionId addr) (TraceSendRecv KeepAlive)))
-> (TraceSendRecv KeepAlive
    -> WithBearer (ConnectionId addr) (TraceSendRecv KeepAlive))
-> TraceSendRecv KeepAlive
-> WithEventType
     (WithBearer (ConnectionId addr) (TraceSendRecv KeepAlive))
forall b c a. (b -> c) -> (a -> b) -> a -> c
. ConnectionId addr
-> TraceSendRecv KeepAlive
-> WithBearer (ConnectionId addr) (TraceSendRecv KeepAlive)
forall peerid a. peerid -> a -> WithBearer peerid a
Mx.WithBearer ConnectionId addr
connId (TraceSendRecv KeepAlive
 -> WithEventType
      (WithBearer (ConnectionId addr) (TraceSendRecv KeepAlive)))
-> Tracer
     m
     (WithEventType
        (WithBearer (ConnectionId addr) (TraceSendRecv KeepAlive)))
-> Tracer m (TraceSendRecv KeepAlive)
forall (f :: * -> *) a b. Contravariant f => (a -> b) -> f b -> f a
>$< Tracer
  m
  (WithEventType
     (WithBearer (ConnectionId addr) (TraceSendRecv KeepAlive)))
forall ev. ToJSON ev => Tracer m (WithEventType ev)
tracer
           else Tracer m (TraceSendRecv KeepAlive)
forall (m :: * -> *) a. Applicative m => Tracer m a
nullTracer)
        Codec KeepAlive DeserialiseFailure m ByteString
keepAliveCodec
        ProtocolSizeLimits KeepAlive ByteString
keepAliveSizeLimits
        ProtocolTimeLimits KeepAlive
keepAliveTimeLimits
        Channel m ByteString
channel
        (Peer KeepAlive 'AsServer 'NonPipelined 'StClient m ()
 -> m ((), Maybe ByteString))
-> Peer KeepAlive 'AsServer 'NonPipelined 'StClient m ()
-> m ((), Maybe ByteString)
forall a b. (a -> b) -> a -> b
$ KeepAliveServer m ()
-> Peer KeepAlive 'AsServer 'NonPipelined 'StClient m ()
forall (m :: * -> *) a.
Functor m =>
KeepAliveServer m a -> Server KeepAlive 'NonPipelined 'StClient m a
keepAliveServerPeer
          KeepAliveServer m ()
forall (m :: * -> *). Applicative m => KeepAliveServer m ()
keepAliveServer

    aPeerSharingClient
      :: NodeToNodeVersion
      -> ExpandedInitiatorContext addr m
      -> Channel m BL.ByteString
      -> m ((), Maybe BL.ByteString)
    aPeerSharingClient :: ClientApp addr m ()
aPeerSharingClient NodeToNodeVersion
_version
                       ExpandedInitiatorContext {
                         eicConnectionId :: forall addr (m :: * -> *).
ExpandedInitiatorContext addr m -> ConnectionId addr
eicConnectionId   = ConnectionId addr
connId
                       , eicControlMessage :: forall addr (m :: * -> *).
ExpandedInitiatorContext addr m -> ControlMessageSTM m
eicControlMessage = ControlMessageSTM m
controlMessageSTM
                       }
                       Channel m ByteString
channel = do
      String -> m ()
forall (m :: * -> *). MonadThread m => String -> m ()
labelThisThread String
"PeerSharingClient"
      PeerSharingRegistry addr m
-> addr
-> (PeerSharingController addr m -> m ((), Maybe ByteString))
-> m ((), Maybe ByteString)
forall peer (m :: * -> *) a.
(Ord peer, MonadSTM m, MonadThrow m) =>
PeerSharingRegistry peer m
-> peer -> (PeerSharingController peer m -> m a) -> m a
bracketPeerSharingClient PeerSharingRegistry addr m
peerSharingRegistry (ConnectionId addr -> addr
forall addr. ConnectionId addr -> addr
remoteAddress ConnectionId addr
connId)
        ((PeerSharingController addr m -> m ((), Maybe ByteString))
 -> m ((), Maybe ByteString))
-> (PeerSharingController addr m -> m ((), Maybe ByteString))
-> m ((), Maybe ByteString)
forall a b. (a -> b) -> a -> b
$ \PeerSharingController addr m
controller -> do
          psClient <- ControlMessageSTM m
-> PeerSharingController addr m -> m (PeerSharingClient addr m ())
forall (m :: * -> *) peer.
(Alternative (STM m), MonadMVar m, MonadSTM m, MonadThrow m) =>
ControlMessageSTM m
-> PeerSharingController peer m -> m (PeerSharingClient peer m ())
peerSharingClient ControlMessageSTM m
controlMessageSTM PeerSharingController addr m
controller
          ((), trailing) <- runPeerWithLimits
            (if peerSharingClientTracer
               then WithEventType "PeerSharingClient" . Mx.WithBearer connId >$< tracer
               else nullTracer)
            peerSharingCodec
            peerSharingSizeLimits
            peerSharingTimeLimits
            channel
            (peerSharingClientPeer psClient)
          return ((), trailing)

    aPeerSharingServer
      :: NodeToNodeVersion
      -> ResponderContext addr
      -> Channel m BL.ByteString
      -> m ((), Maybe BL.ByteString)
    aPeerSharingServer :: ServerApp addr m ()
aPeerSharingServer NodeToNodeVersion
_version
                       ResponderContext {
                         rcConnectionId :: forall addr. ResponderContext addr -> ConnectionId addr
rcConnectionId = ConnectionId addr
connId
                       }
                       Channel m ByteString
channel = do
      String -> m ()
forall (m :: * -> *). MonadThread m => String -> m ()
labelThisThread String
"PeerSharingServer"
      Tracer m (TraceSendRecv (PeerSharing addr))
-> Codec (PeerSharing addr) DeserialiseFailure m ByteString
-> ProtocolSizeLimits (PeerSharing addr) ByteString
-> ProtocolTimeLimits (PeerSharing addr)
-> Channel m ByteString
-> Peer (PeerSharing addr) 'AsServer 'NonPipelined 'StIdle m ()
-> m ((), Maybe ByteString)
forall ps (st :: ps) (pr :: PeerRole) failure bytes (m :: * -> *)
       a.
(MonadAsync m, MonadFork m, MonadMask m, MonadThrow (STM m),
 MonadTimer m, ShowProxy ps,
 forall (st' :: ps) stok. (stok ~ StateToken st') => Show stok,
 Show failure) =>
Tracer m (TraceSendRecv ps)
-> Codec ps failure m bytes
-> ProtocolSizeLimits ps bytes
-> ProtocolTimeLimits ps
-> Channel m bytes
-> Peer ps pr 'NonPipelined st m a
-> m (a, Maybe bytes)
runPeerWithLimits
        (if Bool
peerSharingServerTracer
           then String
-> WithBearer
     (ConnectionId addr) (TraceSendRecv (PeerSharing addr))
-> WithEventType
     (WithBearer (ConnectionId addr) (TraceSendRecv (PeerSharing addr)))
forall a. String -> a -> WithEventType a
WithEventType String
"PeerSharingServer" (WithBearer (ConnectionId addr) (TraceSendRecv (PeerSharing addr))
 -> WithEventType
      (WithBearer
         (ConnectionId addr) (TraceSendRecv (PeerSharing addr))))
-> (TraceSendRecv (PeerSharing addr)
    -> WithBearer
         (ConnectionId addr) (TraceSendRecv (PeerSharing addr)))
-> TraceSendRecv (PeerSharing addr)
-> WithEventType
     (WithBearer (ConnectionId addr) (TraceSendRecv (PeerSharing addr)))
forall b c a. (b -> c) -> (a -> b) -> a -> c
. ConnectionId addr
-> TraceSendRecv (PeerSharing addr)
-> WithBearer
     (ConnectionId addr) (TraceSendRecv (PeerSharing addr))
forall peerid a. peerid -> a -> WithBearer peerid a
Mx.WithBearer ConnectionId addr
connId (TraceSendRecv (PeerSharing addr)
 -> WithEventType
      (WithBearer
         (ConnectionId addr) (TraceSendRecv (PeerSharing addr))))
-> Tracer
     m
     (WithEventType
        (WithBearer
           (ConnectionId addr) (TraceSendRecv (PeerSharing addr))))
-> Tracer m (TraceSendRecv (PeerSharing addr))
forall (f :: * -> *) a b. Contravariant f => (a -> b) -> f b -> f a
>$< Tracer
  m
  (WithEventType
     (WithBearer
        (ConnectionId addr) (TraceSendRecv (PeerSharing addr))))
forall ev. ToJSON ev => Tracer m (WithEventType ev)
tracer
           else Tracer m (TraceSendRecv (PeerSharing addr))
forall (m :: * -> *) a. Applicative m => Tracer m a
nullTracer)
        Codec (PeerSharing addr) DeserialiseFailure m ByteString
peerSharingCodec
        ProtocolSizeLimits (PeerSharing addr) ByteString
peerSharingSizeLimits
        ProtocolTimeLimits (PeerSharing addr)
peerSharingTimeLimits
        Channel m ByteString
channel
        (Peer (PeerSharing addr) 'AsServer 'NonPipelined 'StIdle m ()
 -> m ((), Maybe ByteString))
-> Peer (PeerSharing addr) 'AsServer 'NonPipelined 'StIdle m ()
-> m ((), Maybe ByteString)
forall a b. (a -> b) -> a -> b
$ PeerSharingServer addr m
-> Peer (PeerSharing addr) 'AsServer 'NonPipelined 'StIdle m ()
forall (m :: * -> *) peerAddress.
Monad m =>
PeerSharingServer peerAddress m
-> Server (PeerSharing peerAddress) 'NonPipelined 'StIdle m ()
peerSharingServerPeer
        (PeerSharingServer addr m
 -> Peer (PeerSharing addr) 'AsServer 'NonPipelined 'StIdle m ())
-> PeerSharingServer addr m
-> Peer (PeerSharing addr) 'AsServer 'NonPipelined 'StIdle m ()
forall a b. (a -> b) -> a -> b
$ PeerSharingAPI addr StdGen m -> PeerSharingServer addr m
forall (m :: * -> *) peer s.
(MonadSTM m, MonadMonotonicTime m, Hashable peer, RandomGen s) =>
PeerSharingAPI peer s m -> PeerSharingServer peer m
peerSharingServer PeerSharingAPI addr StdGen m
peerSharingAPI


data Protocols appType initiatorCtx responderCtx bytes m a b =
  Protocols {
    forall (appType :: Mode) initiatorCtx responderCtx bytes
       (m :: * -> *) a b.
Protocols appType initiatorCtx responderCtx bytes m a b
-> RunMiniProtocol appType initiatorCtx responderCtx bytes m a b
sigSubmissionProtocol :: RunMiniProtocol appType initiatorCtx responderCtx bytes m a b

    -- | keep-alive mini-protocol
    --
  , forall (appType :: Mode) initiatorCtx responderCtx bytes
       (m :: * -> *) a b.
Protocols appType initiatorCtx responderCtx bytes m a b
-> RunMiniProtocol appType initiatorCtx responderCtx bytes m a b
keepAliveProtocol    :: RunMiniProtocol appType initiatorCtx responderCtx bytes m a b

    -- | peer sharing mini-protocol
    --
  , forall (appType :: Mode) initiatorCtx responderCtx bytes
       (m :: * -> *) a b.
Protocols appType initiatorCtx responderCtx bytes m a b
-> RunMiniProtocol appType initiatorCtx responderCtx bytes m a b
peerSharingProtocol  :: RunMiniProtocol appType initiatorCtx responderCtx bytes m a b
  }

sigSubmissionMiniProtocolNum :: Mx.MiniProtocolNum
sigSubmissionMiniProtocolNum :: MiniProtocolNum
sigSubmissionMiniProtocolNum = Word16 -> MiniProtocolNum
Mx.MiniProtocolNum Word16
11

keepAliveMiniProtocolNum :: Mx.MiniProtocolNum
keepAliveMiniProtocolNum :: MiniProtocolNum
keepAliveMiniProtocolNum = Word16 -> MiniProtocolNum
Mx.MiniProtocolNum Word16
12

peerSharingMiniProtocolNum :: Mx.MiniProtocolNum
peerSharingMiniProtocolNum :: MiniProtocolNum
peerSharingMiniProtocolNum = Word16 -> MiniProtocolNum
Mx.MiniProtocolNum Word16
13

nodeToNodeProtocols
  :: LimitsAndTimeouts addr
  -> Protocols appType initiatorCtx responderCtx bytes m a b
  -> NodeToNodeVersion
  -- ^ negotiated version number
  -> NodeToNodeVersionData
  -- ^ negotiated version data
  -> OuroborosBundle appType initiatorCtx responderCtx bytes m a b
nodeToNodeProtocols :: forall addr (appType :: Mode) initiatorCtx responderCtx bytes
       (m :: * -> *) a b.
LimitsAndTimeouts addr
-> Protocols appType initiatorCtx responderCtx bytes m a b
-> NodeToNodeVersion
-> NodeToNodeVersionData
-> OuroborosBundle appType initiatorCtx responderCtx bytes m a b
nodeToNodeProtocols LimitsAndTimeouts {
                      MiniProtocolLimits
sigSubmissionLimits :: MiniProtocolLimits
sigSubmissionLimits :: forall addr. LimitsAndTimeouts addr -> MiniProtocolLimits
sigSubmissionLimits
                    , MiniProtocolLimits
keepAliveLimits :: MiniProtocolLimits
keepAliveLimits :: forall addr. LimitsAndTimeouts addr -> MiniProtocolLimits
keepAliveLimits
                    , MiniProtocolLimits
peerSharingLimits :: MiniProtocolLimits
peerSharingLimits :: forall addr. LimitsAndTimeouts addr -> MiniProtocolLimits
peerSharingLimits
                    }
                    Protocols {
                      RunMiniProtocol appType initiatorCtx responderCtx bytes m a b
sigSubmissionProtocol :: forall (appType :: Mode) initiatorCtx responderCtx bytes
       (m :: * -> *) a b.
Protocols appType initiatorCtx responderCtx bytes m a b
-> RunMiniProtocol appType initiatorCtx responderCtx bytes m a b
sigSubmissionProtocol :: RunMiniProtocol appType initiatorCtx responderCtx bytes m a b
sigSubmissionProtocol
                    , RunMiniProtocol appType initiatorCtx responderCtx bytes m a b
keepAliveProtocol :: forall (appType :: Mode) initiatorCtx responderCtx bytes
       (m :: * -> *) a b.
Protocols appType initiatorCtx responderCtx bytes m a b
-> RunMiniProtocol appType initiatorCtx responderCtx bytes m a b
keepAliveProtocol :: RunMiniProtocol appType initiatorCtx responderCtx bytes m a b
keepAliveProtocol
                    , RunMiniProtocol appType initiatorCtx responderCtx bytes m a b
peerSharingProtocol :: forall (appType :: Mode) initiatorCtx responderCtx bytes
       (m :: * -> *) a b.
Protocols appType initiatorCtx responderCtx bytes m a b
-> RunMiniProtocol appType initiatorCtx responderCtx bytes m a b
peerSharingProtocol :: RunMiniProtocol appType initiatorCtx responderCtx bytes m a b
peerSharingProtocol
                    }
                    NodeToNodeVersion
_version
                    NodeToNodeVersionData {
                      PeerSharing
peerSharing :: PeerSharing
peerSharing :: NodeToNodeVersionData -> PeerSharing
peerSharing
                    }
                    =
    WithProtocolTemperature
  'Hot [MiniProtocol appType initiatorCtx responderCtx bytes m a b]
-> WithProtocolTemperature
     'Warm [MiniProtocol appType initiatorCtx responderCtx bytes m a b]
-> WithProtocolTemperature
     'Established
     [MiniProtocol appType initiatorCtx responderCtx bytes m a b]
-> TemperatureBundle
     [MiniProtocol appType initiatorCtx responderCtx bytes m a b]
forall a.
WithProtocolTemperature 'Hot a
-> WithProtocolTemperature 'Warm a
-> WithProtocolTemperature 'Established a
-> TemperatureBundle a
TemperatureBundle
      -- Hot protocols
      ([MiniProtocol appType initiatorCtx responderCtx bytes m a b]
-> WithProtocolTemperature
     'Hot [MiniProtocol appType initiatorCtx responderCtx bytes m a b]
forall a. a -> WithProtocolTemperature 'Hot a
WithHot [
        MiniProtocol {
          miniProtocolNum :: MiniProtocolNum
miniProtocolNum    = MiniProtocolNum
sigSubmissionMiniProtocolNum
        , miniProtocolStart :: StartOnDemandOrEagerly
miniProtocolStart  = StartOnDemandOrEagerly
StartOnDemandAny
        , miniProtocolLimits :: MiniProtocolLimits
miniProtocolLimits = MiniProtocolLimits
sigSubmissionLimits
        , miniProtocolRun :: RunMiniProtocol appType initiatorCtx responderCtx bytes m a b
miniProtocolRun    = RunMiniProtocol appType initiatorCtx responderCtx bytes m a b
sigSubmissionProtocol
        }
      ])

      -- Warm protocols
      ([MiniProtocol appType initiatorCtx responderCtx bytes m a b]
-> WithProtocolTemperature
     'Warm [MiniProtocol appType initiatorCtx responderCtx bytes m a b]
forall a. a -> WithProtocolTemperature 'Warm a
WithWarm [])

      -- Established protocols: 'keep-alive', 'peer-sharing'.
      ([MiniProtocol appType initiatorCtx responderCtx bytes m a b]
-> WithProtocolTemperature
     'Established
     [MiniProtocol appType initiatorCtx responderCtx bytes m a b]
forall a. a -> WithProtocolTemperature 'Established a
WithEstablished ([MiniProtocol appType initiatorCtx responderCtx bytes m a b]
 -> WithProtocolTemperature
      'Established
      [MiniProtocol appType initiatorCtx responderCtx bytes m a b])
-> [MiniProtocol appType initiatorCtx responderCtx bytes m a b]
-> WithProtocolTemperature
     'Established
     [MiniProtocol appType initiatorCtx responderCtx bytes m a b]
forall a b. (a -> b) -> a -> b
$
        MiniProtocol {
          miniProtocolNum :: MiniProtocolNum
miniProtocolNum    = MiniProtocolNum
keepAliveMiniProtocolNum
        , miniProtocolStart :: StartOnDemandOrEagerly
miniProtocolStart  = StartOnDemandOrEagerly
StartOnDemandAny
        , miniProtocolLimits :: MiniProtocolLimits
miniProtocolLimits = MiniProtocolLimits
keepAliveLimits
        , miniProtocolRun :: RunMiniProtocol appType initiatorCtx responderCtx bytes m a b
miniProtocolRun    = RunMiniProtocol appType initiatorCtx responderCtx bytes m a b
keepAliveProtocol
        }
        MiniProtocol appType initiatorCtx responderCtx bytes m a b
-> [MiniProtocol appType initiatorCtx responderCtx bytes m a b]
-> [MiniProtocol appType initiatorCtx responderCtx bytes m a b]
forall a. a -> [a] -> [a]
: case PeerSharing
peerSharing of
            PeerSharing
PeerSharingEnabled ->
              [ MiniProtocol {
                  miniProtocolNum :: MiniProtocolNum
miniProtocolNum    = MiniProtocolNum
peerSharingMiniProtocolNum
                , miniProtocolStart :: StartOnDemandOrEagerly
miniProtocolStart  = StartOnDemandOrEagerly
StartOnDemand
                , miniProtocolLimits :: MiniProtocolLimits
miniProtocolLimits = MiniProtocolLimits
peerSharingLimits
                , miniProtocolRun :: RunMiniProtocol appType initiatorCtx responderCtx bytes m a b
miniProtocolRun    = RunMiniProtocol appType initiatorCtx responderCtx bytes m a b
peerSharingProtocol
                }
              ]
            PeerSharing
PeerSharingDisabled ->
              []
      )

initiatorProtocols
  :: LimitsAndTimeouts addr
  -> Apps addr m a b
  -> NodeToNodeVersion
  -> NodeToNodeVersionData
  -> OuroborosBundleWithExpandedCtx 'InitiatorMode addr BL.ByteString m a Void
initiatorProtocols :: forall addr (m :: * -> *) a b.
LimitsAndTimeouts addr
-> Apps addr m a b
-> NodeToNodeVersion
-> NodeToNodeVersionData
-> OuroborosBundleWithExpandedCtx
     'InitiatorMode addr ByteString m a Void
initiatorProtocols LimitsAndTimeouts addr
limitsAndTimeouts
                   Apps {
                     ClientApp addr m a
aSigSubmissionClient :: forall addr (m :: * -> *) a b.
Apps addr m a b -> ClientApp addr m a
aSigSubmissionClient :: ClientApp addr m a
aSigSubmissionClient
                   , ClientApp addr m a
aKeepAliveClient :: forall addr (m :: * -> *) a b.
Apps addr m a b -> ClientApp addr m a
aKeepAliveClient :: ClientApp addr m a
aKeepAliveClient
                   , ClientApp addr m a
aPeerSharingClient :: forall addr (m :: * -> *) a b.
Apps addr m a b -> ClientApp addr m a
aPeerSharingClient :: ClientApp addr m a
aPeerSharingClient
                   }
                   NodeToNodeVersion
version =
  LimitsAndTimeouts addr
-> Protocols
     'InitiatorMode
     (ExpandedInitiatorContext addr m)
     (ResponderContext addr)
     ByteString
     m
     a
     Void
-> NodeToNodeVersion
-> NodeToNodeVersionData
-> OuroborosBundle
     'InitiatorMode
     (ExpandedInitiatorContext addr m)
     (ResponderContext addr)
     ByteString
     m
     a
     Void
forall addr (appType :: Mode) initiatorCtx responderCtx bytes
       (m :: * -> *) a b.
LimitsAndTimeouts addr
-> Protocols appType initiatorCtx responderCtx bytes m a b
-> NodeToNodeVersion
-> NodeToNodeVersionData
-> OuroborosBundle appType initiatorCtx responderCtx bytes m a b
nodeToNodeProtocols
    LimitsAndTimeouts addr
limitsAndTimeouts
    (Protocols {
      sigSubmissionProtocol :: RunMiniProtocol
  'InitiatorMode
  (ExpandedInitiatorContext addr m)
  (ResponderContext addr)
  ByteString
  m
  a
  Void
sigSubmissionProtocol =
        MiniProtocolCb (ExpandedInitiatorContext addr m) ByteString m a
-> RunMiniProtocol
     'InitiatorMode
     (ExpandedInitiatorContext addr m)
     (ResponderContext addr)
     ByteString
     m
     a
     Void
forall initiatorCtx bytes (m :: * -> *) a responderCtx.
MiniProtocolCb initiatorCtx bytes m a
-> RunMiniProtocol
     'InitiatorMode initiatorCtx responderCtx bytes m a Void
InitiatorProtocolOnly ((ExpandedInitiatorContext addr m
 -> Channel m ByteString -> m (a, Maybe ByteString))
-> MiniProtocolCb (ExpandedInitiatorContext addr m) ByteString m a
forall ctx bytes (m :: * -> *) a.
(ctx -> Channel m bytes -> m (a, Maybe bytes))
-> MiniProtocolCb ctx bytes m a
MiniProtocolCb (ClientApp addr m a
aSigSubmissionClient NodeToNodeVersion
version))
    , keepAliveProtocol :: RunMiniProtocol
  'InitiatorMode
  (ExpandedInitiatorContext addr m)
  (ResponderContext addr)
  ByteString
  m
  a
  Void
keepAliveProtocol =
        MiniProtocolCb (ExpandedInitiatorContext addr m) ByteString m a
-> RunMiniProtocol
     'InitiatorMode
     (ExpandedInitiatorContext addr m)
     (ResponderContext addr)
     ByteString
     m
     a
     Void
forall initiatorCtx bytes (m :: * -> *) a responderCtx.
MiniProtocolCb initiatorCtx bytes m a
-> RunMiniProtocol
     'InitiatorMode initiatorCtx responderCtx bytes m a Void
InitiatorProtocolOnly ((ExpandedInitiatorContext addr m
 -> Channel m ByteString -> m (a, Maybe ByteString))
-> MiniProtocolCb (ExpandedInitiatorContext addr m) ByteString m a
forall ctx bytes (m :: * -> *) a.
(ctx -> Channel m bytes -> m (a, Maybe bytes))
-> MiniProtocolCb ctx bytes m a
MiniProtocolCb (ClientApp addr m a
aKeepAliveClient NodeToNodeVersion
version))
    , peerSharingProtocol :: RunMiniProtocol
  'InitiatorMode
  (ExpandedInitiatorContext addr m)
  (ResponderContext addr)
  ByteString
  m
  a
  Void
peerSharingProtocol =
        MiniProtocolCb (ExpandedInitiatorContext addr m) ByteString m a
-> RunMiniProtocol
     'InitiatorMode
     (ExpandedInitiatorContext addr m)
     (ResponderContext addr)
     ByteString
     m
     a
     Void
forall initiatorCtx bytes (m :: * -> *) a responderCtx.
MiniProtocolCb initiatorCtx bytes m a
-> RunMiniProtocol
     'InitiatorMode initiatorCtx responderCtx bytes m a Void
InitiatorProtocolOnly ((ExpandedInitiatorContext addr m
 -> Channel m ByteString -> m (a, Maybe ByteString))
-> MiniProtocolCb (ExpandedInitiatorContext addr m) ByteString m a
forall ctx bytes (m :: * -> *) a.
(ctx -> Channel m bytes -> m (a, Maybe bytes))
-> MiniProtocolCb ctx bytes m a
MiniProtocolCb (ClientApp addr m a
aPeerSharingClient NodeToNodeVersion
version))
    })
    NodeToNodeVersion
version

initiatorAndResponderProtocols
  :: LimitsAndTimeouts addr
  -> Apps addr m a b
  -> NodeToNodeVersion
  -> NodeToNodeVersionData
  -> OuroborosBundleWithExpandedCtx 'InitiatorResponderMode addr BL.ByteString m a b
initiatorAndResponderProtocols :: forall addr (m :: * -> *) a b.
LimitsAndTimeouts addr
-> Apps addr m a b
-> NodeToNodeVersion
-> NodeToNodeVersionData
-> OuroborosBundleWithExpandedCtx
     'InitiatorResponderMode addr ByteString m a b
initiatorAndResponderProtocols LimitsAndTimeouts addr
limitsAndTimeouts
                               Apps {
                                 ClientApp addr m a
aSigSubmissionClient :: forall addr (m :: * -> *) a b.
Apps addr m a b -> ClientApp addr m a
aSigSubmissionClient :: ClientApp addr m a
aSigSubmissionClient
                               , ServerApp addr m b
aSigSubmissionServer :: forall addr (m :: * -> *) a b.
Apps addr m a b -> ServerApp addr m b
aSigSubmissionServer :: ServerApp addr m b
aSigSubmissionServer
                               , ClientApp addr m a
aKeepAliveClient :: forall addr (m :: * -> *) a b.
Apps addr m a b -> ClientApp addr m a
aKeepAliveClient :: ClientApp addr m a
aKeepAliveClient
                               , ServerApp addr m b
aKeepAliveServer :: forall addr (m :: * -> *) a b.
Apps addr m a b -> ServerApp addr m b
aKeepAliveServer :: ServerApp addr m b
aKeepAliveServer
                               , ClientApp addr m a
aPeerSharingClient :: forall addr (m :: * -> *) a b.
Apps addr m a b -> ClientApp addr m a
aPeerSharingClient :: ClientApp addr m a
aPeerSharingClient
                               , ServerApp addr m b
aPeerSharingServer :: forall addr (m :: * -> *) a b.
Apps addr m a b -> ServerApp addr m b
aPeerSharingServer :: ServerApp addr m b
aPeerSharingServer
                               }
                               NodeToNodeVersion
version =
  LimitsAndTimeouts addr
-> Protocols
     'InitiatorResponderMode
     (ExpandedInitiatorContext addr m)
     (ResponderContext addr)
     ByteString
     m
     a
     b
-> NodeToNodeVersion
-> NodeToNodeVersionData
-> OuroborosBundle
     'InitiatorResponderMode
     (ExpandedInitiatorContext addr m)
     (ResponderContext addr)
     ByteString
     m
     a
     b
forall addr (appType :: Mode) initiatorCtx responderCtx bytes
       (m :: * -> *) a b.
LimitsAndTimeouts addr
-> Protocols appType initiatorCtx responderCtx bytes m a b
-> NodeToNodeVersion
-> NodeToNodeVersionData
-> OuroborosBundle appType initiatorCtx responderCtx bytes m a b
nodeToNodeProtocols
    LimitsAndTimeouts addr
limitsAndTimeouts
    (Protocols {
      sigSubmissionProtocol :: RunMiniProtocol
  'InitiatorResponderMode
  (ExpandedInitiatorContext addr m)
  (ResponderContext addr)
  ByteString
  m
  a
  b
sigSubmissionProtocol =
        MiniProtocolCb (ExpandedInitiatorContext addr m) ByteString m a
-> MiniProtocolCb (ResponderContext addr) ByteString m b
-> RunMiniProtocol
     'InitiatorResponderMode
     (ExpandedInitiatorContext addr m)
     (ResponderContext addr)
     ByteString
     m
     a
     b
forall initiatorCtx bytes (m :: * -> *) a responderCtx b.
MiniProtocolCb initiatorCtx bytes m a
-> MiniProtocolCb responderCtx bytes m b
-> RunMiniProtocol
     'InitiatorResponderMode initiatorCtx responderCtx bytes m a b
InitiatorAndResponderProtocol
           ((ExpandedInitiatorContext addr m
 -> Channel m ByteString -> m (a, Maybe ByteString))
-> MiniProtocolCb (ExpandedInitiatorContext addr m) ByteString m a
forall ctx bytes (m :: * -> *) a.
(ctx -> Channel m bytes -> m (a, Maybe bytes))
-> MiniProtocolCb ctx bytes m a
MiniProtocolCb (ClientApp addr m a
aSigSubmissionClient NodeToNodeVersion
version))
           ((ResponderContext addr
 -> Channel m ByteString -> m (b, Maybe ByteString))
-> MiniProtocolCb (ResponderContext addr) ByteString m b
forall ctx bytes (m :: * -> *) a.
(ctx -> Channel m bytes -> m (a, Maybe bytes))
-> MiniProtocolCb ctx bytes m a
MiniProtocolCb (ServerApp addr m b
aSigSubmissionServer NodeToNodeVersion
version))
    , keepAliveProtocol :: RunMiniProtocol
  'InitiatorResponderMode
  (ExpandedInitiatorContext addr m)
  (ResponderContext addr)
  ByteString
  m
  a
  b
keepAliveProtocol =
        MiniProtocolCb (ExpandedInitiatorContext addr m) ByteString m a
-> MiniProtocolCb (ResponderContext addr) ByteString m b
-> RunMiniProtocol
     'InitiatorResponderMode
     (ExpandedInitiatorContext addr m)
     (ResponderContext addr)
     ByteString
     m
     a
     b
forall initiatorCtx bytes (m :: * -> *) a responderCtx b.
MiniProtocolCb initiatorCtx bytes m a
-> MiniProtocolCb responderCtx bytes m b
-> RunMiniProtocol
     'InitiatorResponderMode initiatorCtx responderCtx bytes m a b
InitiatorAndResponderProtocol
           ((ExpandedInitiatorContext addr m
 -> Channel m ByteString -> m (a, Maybe ByteString))
-> MiniProtocolCb (ExpandedInitiatorContext addr m) ByteString m a
forall ctx bytes (m :: * -> *) a.
(ctx -> Channel m bytes -> m (a, Maybe bytes))
-> MiniProtocolCb ctx bytes m a
MiniProtocolCb (ClientApp addr m a
aKeepAliveClient NodeToNodeVersion
version))
           ((ResponderContext addr
 -> Channel m ByteString -> m (b, Maybe ByteString))
-> MiniProtocolCb (ResponderContext addr) ByteString m b
forall ctx bytes (m :: * -> *) a.
(ctx -> Channel m bytes -> m (a, Maybe bytes))
-> MiniProtocolCb ctx bytes m a
MiniProtocolCb (ServerApp addr m b
aKeepAliveServer NodeToNodeVersion
version))
    , peerSharingProtocol :: RunMiniProtocol
  'InitiatorResponderMode
  (ExpandedInitiatorContext addr m)
  (ResponderContext addr)
  ByteString
  m
  a
  b
peerSharingProtocol =
        MiniProtocolCb (ExpandedInitiatorContext addr m) ByteString m a
-> MiniProtocolCb (ResponderContext addr) ByteString m b
-> RunMiniProtocol
     'InitiatorResponderMode
     (ExpandedInitiatorContext addr m)
     (ResponderContext addr)
     ByteString
     m
     a
     b
forall initiatorCtx bytes (m :: * -> *) a responderCtx b.
MiniProtocolCb initiatorCtx bytes m a
-> MiniProtocolCb responderCtx bytes m b
-> RunMiniProtocol
     'InitiatorResponderMode initiatorCtx responderCtx bytes m a b
InitiatorAndResponderProtocol
           ((ExpandedInitiatorContext addr m
 -> Channel m ByteString -> m (a, Maybe ByteString))
-> MiniProtocolCb (ExpandedInitiatorContext addr m) ByteString m a
forall ctx bytes (m :: * -> *) a.
(ctx -> Channel m bytes -> m (a, Maybe bytes))
-> MiniProtocolCb ctx bytes m a
MiniProtocolCb (ClientApp addr m a
aPeerSharingClient NodeToNodeVersion
version))
           ((ResponderContext addr
 -> Channel m ByteString -> m (b, Maybe ByteString))
-> MiniProtocolCb (ResponderContext addr) ByteString m b
forall ctx bytes (m :: * -> *) a.
(ctx -> Channel m bytes -> m (a, Maybe bytes))
-> MiniProtocolCb ctx bytes m a
MiniProtocolCb (ServerApp addr m b
aPeerSharingServer NodeToNodeVersion
version))
    })
    NodeToNodeVersion
version

data Codecs addr m =
  Codecs {
    forall addr (m :: * -> *).
Codecs addr m
-> Codec SigSubmission DeserialiseFailure m ByteString
sigSubmissionCodec :: Codec SigSubmission
                            CBOR.DeserialiseFailure m BL.ByteString
  , forall addr (m :: * -> *).
Codecs addr m -> Codec KeepAlive DeserialiseFailure m ByteString
keepAliveCodec     :: Codec KeepAlive
                            CBOR.DeserialiseFailure m BL.ByteString
  , forall addr (m :: * -> *).
Codecs addr m
-> Codec (PeerSharing addr) DeserialiseFailure m ByteString
peerSharingCodec   :: Codec (Protocol.PeerSharing addr)
                            CBOR.DeserialiseFailure m BL.ByteString
  }

dmqCodecs :: MonadST m
          => (addr -> CBOR.Encoding)
          -> (forall s. CBOR.Decoder s addr)
          -> Codecs addr m
dmqCodecs :: forall (m :: * -> *) addr.
MonadST m =>
(addr -> Encoding) -> (forall s. Decoder s addr) -> Codecs addr m
dmqCodecs addr -> Encoding
encodeAddr forall s. Decoder s addr
decodeAddr =
  Codecs {
    sigSubmissionCodec :: Codec SigSubmission DeserialiseFailure m ByteString
sigSubmissionCodec = Codec SigSubmission DeserialiseFailure m ByteString
forall (m :: * -> *).
MonadST m =>
Codec SigSubmission DeserialiseFailure m ByteString
codecSigSubmission
  , keepAliveCodec :: Codec KeepAlive DeserialiseFailure m ByteString
keepAliveCodec     = Codec KeepAlive DeserialiseFailure m ByteString
forall (m :: * -> *).
MonadST m =>
Codec KeepAlive DeserialiseFailure m ByteString
codecKeepAlive_v2
  , peerSharingCodec :: Codec (PeerSharing addr) DeserialiseFailure m ByteString
peerSharingCodec   = (addr -> Encoding)
-> (forall s. Decoder s addr)
-> Codec (PeerSharing addr) DeserialiseFailure m ByteString
forall (m :: * -> *) peerAddress.
MonadST m =>
(peerAddress -> Encoding)
-> (forall s. Decoder s peerAddress)
-> Codec (PeerSharing peerAddress) DeserialiseFailure m ByteString
codecPeerSharing addr -> Encoding
encodeAddr Decoder s addr
forall s. Decoder s addr
decodeAddr
  }

data LimitsAndTimeouts addr =
  LimitsAndTimeouts {
    -- sig-submission
    forall addr. LimitsAndTimeouts addr -> MiniProtocolLimits
sigSubmissionLimits
      :: MiniProtocolLimits
  , forall addr.
LimitsAndTimeouts addr
-> ProtocolSizeLimits SigSubmission ByteString
sigSubmissionSizeLimits
      :: ProtocolSizeLimits SigSubmission BL.ByteString
  , forall addr.
LimitsAndTimeouts addr -> ProtocolTimeLimits SigSubmission
sigSubmissionTimeLimits
      :: ProtocolTimeLimits SigSubmission

    -- keep-alive
  , forall addr. LimitsAndTimeouts addr -> MiniProtocolLimits
keepAliveLimits
      :: MiniProtocolLimits
  , forall addr.
LimitsAndTimeouts addr -> ProtocolSizeLimits KeepAlive ByteString
keepAliveSizeLimits
      :: ProtocolSizeLimits KeepAlive BL.ByteString
  , forall addr. LimitsAndTimeouts addr -> ProtocolTimeLimits KeepAlive
keepAliveTimeLimits
      :: ProtocolTimeLimits KeepAlive

    -- peer sharing
  , forall addr. LimitsAndTimeouts addr -> MiniProtocolLimits
peerSharingLimits
      :: MiniProtocolLimits
  , forall addr.
LimitsAndTimeouts addr -> ProtocolTimeLimits (PeerSharing addr)
peerSharingTimeLimits
      :: ProtocolTimeLimits (Protocol.PeerSharing addr)
  , forall addr.
LimitsAndTimeouts addr
-> ProtocolSizeLimits (PeerSharing addr) ByteString
peerSharingSizeLimits
      :: ProtocolSizeLimits (Protocol.PeerSharing addr) BL.ByteString
  }

dmqLimitsAndTimeouts :: LimitsAndTimeouts addr
dmqLimitsAndTimeouts :: forall addr. LimitsAndTimeouts addr
dmqLimitsAndTimeouts =
    LimitsAndTimeouts {
      sigSubmissionLimits :: MiniProtocolLimits
sigSubmissionLimits =
        MiniProtocolLimits {
          -- TODO
          maximumIngressQueue :: Int
maximumIngressQueue = Int
forall a. Bounded a => a
maxBound
        }
    , sigSubmissionTimeLimits :: ProtocolTimeLimits SigSubmission
sigSubmissionTimeLimits = ProtocolTimeLimits SigSubmission
timeLimitsSigSubmission
    , sigSubmissionSizeLimits :: ProtocolSizeLimits SigSubmission ByteString
sigSubmissionSizeLimits = (ByteString -> Word) -> ProtocolSizeLimits SigSubmission ByteString
forall bytes.
(bytes -> Word) -> ProtocolSizeLimits SigSubmission bytes
byteLimitsSigSubmission ByteString -> Word
size

    , keepAliveLimits :: MiniProtocolLimits
keepAliveLimits     =
        MiniProtocolLimits {
          -- One small outstanding message.
          maximumIngressQueue :: Int
maximumIngressQueue = Int -> Int
addSafetyMargin Int
1280
        }

    , keepAliveTimeLimits :: ProtocolTimeLimits KeepAlive
keepAliveTimeLimits = ProtocolTimeLimits KeepAlive
timeLimitsKeepAlive
    , keepAliveSizeLimits :: ProtocolSizeLimits KeepAlive ByteString
keepAliveSizeLimits = (ByteString -> Word) -> ProtocolSizeLimits KeepAlive ByteString
forall bytes. (bytes -> Word) -> ProtocolSizeLimits KeepAlive bytes
byteLimitsKeepAlive ByteString -> Word
size

    , peerSharingLimits :: MiniProtocolLimits
peerSharingLimits   =
        MiniProtocolLimits {
          -- This protocol does not need to be pipelined and a peer can only ask
          -- for a maximum of 255 peers each time. Hence a reply can have up to
          -- 255 IP (IPv4 or IPv6) addresses so 255 * 16 = 4080. TCP has an initial
          -- window size of 4 and a TCP segment is 1440, which gives us 4 * 1440 =
          -- 5760 bytes to fit into a single RTT. So setting the maximum ingress
          -- queue to be a single RTT should be enough to cover for CBOR overhead.
          maximumIngressQueue :: Int
maximumIngressQueue = Int
4 Int -> Int -> Int
forall a. Num a => a -> a -> a
* Int
1440
        }
    , peerSharingTimeLimits :: ProtocolTimeLimits (PeerSharing addr)
peerSharingTimeLimits = ProtocolTimeLimits (PeerSharing addr)
forall peerAddress. ProtocolTimeLimits (PeerSharing peerAddress)
timeLimitsPeerSharing
    , peerSharingSizeLimits :: ProtocolSizeLimits (PeerSharing addr) ByteString
peerSharingSizeLimits = (ByteString -> Word)
-> ProtocolSizeLimits (PeerSharing addr) ByteString
forall peerAddress bytes.
(bytes -> Word)
-> ProtocolSizeLimits (PeerSharing peerAddress) bytes
byteLimitsPeerSharing ByteString -> Word
size
    }
  where
    size :: BL.ByteString -> Word
    size :: ByteString -> Word
size = Int64 -> Word
forall a b. (Integral a, Num b) => a -> b
fromIntegral (Int64 -> Word) -> (ByteString -> Int64) -> ByteString -> Word
forall b c a. (b -> c) -> (a -> b) -> a -> c
. ByteString -> Int64
BL.length


type HandshakeTr ntnAddr = Mx.WithBearer (ConnectionId ntnAddr) (TraceSendRecv (Handshake NodeToNodeVersion CBOR.Term))

ntnHandshakeArguments
  :: MonadST m
  => Tracer m (HandshakeTr ntnAddr)
  -> HandshakeArguments
      (ConnectionId ntnAddr)
      NodeToNodeVersion
      NodeToNodeVersionData
      m
ntnHandshakeArguments :: forall (m :: * -> *) ntnAddr.
MonadST m =>
Tracer m (HandshakeTr ntnAddr)
-> HandshakeArguments
     (ConnectionId ntnAddr) NodeToNodeVersion NodeToNodeVersionData m
ntnHandshakeArguments Tracer m (HandshakeTr ntnAddr)
tracer =
  HandshakeArguments {
    haHandshakeTracer :: Tracer m (HandshakeTr ntnAddr)
haHandshakeTracer  = Tracer m (HandshakeTr ntnAddr)
tracer
  , haBearerTracer :: Tracer m (WithBearer (ConnectionId ntnAddr) BearerTrace)
haBearerTracer     = Tracer m (WithBearer (ConnectionId ntnAddr) BearerTrace)
forall (m :: * -> *) a. Applicative m => Tracer m a
nullTracer -- TODO
  , haHandshakeCodec :: Codec
  (Handshake NodeToNodeVersion Term) DeserialiseFailure m ByteString
haHandshakeCodec   = CodecCBORTerm (Text, Maybe Int) NodeToNodeVersion
-> Codec
     (Handshake NodeToNodeVersion Term) DeserialiseFailure m ByteString
forall vNumber (m :: * -> *) failure.
(MonadST m, Ord vNumber, Show failure) =>
CodecCBORTerm (failure, Maybe Int) vNumber
-> Codec (Handshake vNumber Term) DeserialiseFailure m ByteString
codecHandshake CodecCBORTerm (Text, Maybe Int) NodeToNodeVersion
nodeToNodeVersionCodec
  , haVersionDataCodec :: VersionDataCodec Term NodeToNodeVersion NodeToNodeVersionData
haVersionDataCodec = (NodeToNodeVersion -> CodecCBORTerm Text NodeToNodeVersionData)
-> VersionDataCodec Term NodeToNodeVersion NodeToNodeVersionData
forall vNumber vData.
(vNumber -> CodecCBORTerm Text vData)
-> VersionDataCodec Term vNumber vData
cborTermVersionDataCodec NodeToNodeVersion -> CodecCBORTerm Text NodeToNodeVersionData
nodeToNodeCodecCBORTerm
  , haAcceptVersion :: NodeToNodeVersionData
-> NodeToNodeVersionData -> Accept NodeToNodeVersionData
haAcceptVersion    = NodeToNodeVersionData
-> NodeToNodeVersionData -> Accept NodeToNodeVersionData
forall v. Acceptable v => v -> v -> Accept v
acceptableVersion
  , haQueryVersion :: NodeToNodeVersionData -> Bool
haQueryVersion     = NodeToNodeVersionData -> Bool
forall v. Queryable v => v -> Bool
queryVersion
  , haTimeLimits :: ProtocolTimeLimits (Handshake NodeToNodeVersion Term)
haTimeLimits       = ProtocolTimeLimits (Handshake NodeToNodeVersion Term)
forall {k} (vNumber :: k).
ProtocolTimeLimits (Handshake vNumber Term)
timeLimitsHandshake
  }

stdVersionDataNTN :: NetworkMagic
                  -> DiffusionMode
                  -> PeerSharing
                  -> NodeToNodeVersionData
stdVersionDataNTN :: NetworkMagic
-> DiffusionMode -> PeerSharing -> NodeToNodeVersionData
stdVersionDataNTN NetworkMagic
networkMagic DiffusionMode
diffusionMode PeerSharing
peerSharing =
  NodeToNodeVersionData
    { NetworkMagic
networkMagic :: NetworkMagic
networkMagic :: NetworkMagic
networkMagic
    , DiffusionMode
diffusionMode :: DiffusionMode
diffusionMode :: DiffusionMode
diffusionMode
    , PeerSharing
peerSharing :: PeerSharing
peerSharing :: PeerSharing
peerSharing
    , query :: Bool
query = Bool
False
    }

-- TODO: choose wisely, is a protocol parameter.
_MAX_SIGS_TO_ACK :: NumTxIdsToAck
_MAX_SIGS_TO_ACK :: NumTxIdsToAck
_MAX_SIGS_TO_ACK = NumTxIdsToAck
20

_SIG_SUBMISSION_INIT_DELAY :: TxSubmissionInitDelay
_SIG_SUBMISSION_INIT_DELAY :: TxSubmissionInitDelay
_SIG_SUBMISSION_INIT_DELAY = TxSubmissionInitDelay
NoTxSubmissionInitDelay


-- TODO: this is duplicated code, similar function is in
-- `Cardano.Network.NodeToNode` module.
addSafetyMargin :: Int -> Int
addSafetyMargin :: Int -> Int
addSafetyMargin Int
x = Int
x Int -> Int -> Int
forall a. Num a => a -> a -> a
+ Int
x Int -> Int -> Int
forall a. Integral a => a -> a -> a
`div` Int
10