{-# LANGUAGE DataKinds           #-}
{-# LANGUAGE FlexibleInstances   #-}
{-# LANGUAGE NamedFieldPuns      #-}
{-# LANGUAGE NumericUnderscores  #-}
{-# LANGUAGE RankNTypes          #-}
{-# LANGUAGE ScopedTypeVariables #-}
{-# LANGUAGE TypeFamilies        #-}
{-# LANGUAGE TypeOperators       #-}

{-# OPTIONS_GHC -Wno-orphans #-}

-- | This is the starting point for a module that will bring together the
-- overall node to node protocol, as a collection of mini-protocols.
--
module Ouroboros.Network.NodeToNode
  ( nodeToNodeProtocols
  , NodeToNodeProtocols (..)
  , NodeToNodeProtocolsWithExpandedCtx
  , NodeToNodeProtocolsWithMinimalCtx
  , MiniProtocolParameters (..)
  , chainSyncProtocolLimits
  , blockFetchProtocolLimits
  , txSubmissionProtocolLimits
  , keepAliveProtocolLimits
  , defaultMiniProtocolParameters
  , NodeToNodeVersion (..)
  , NodeToNodeVersionData (..)
  , NetworkConnectTracers (..)
  , nullNetworkConnectTracers
  , connectTo
  , AcceptedConnectionsLimit (..)
    -- * P2P Governor
  , PeerAdvertise (..)
  , PeerSelectionTargets (..)
    -- * Subscription Workers
    -- ** Versions
  , Versions (..)
  , DiffusionMode (..)
  , simpleSingletonVersions
  , foldMapVersions
  , combineVersions
    -- *** Codecs
  , nodeToNodeHandshakeCodec
  , nodeToNodeVersionCodec
  , nodeToNodeCodecCBORTerm
    -- * Re-exports
  , ExpandedInitiatorContext (..)
  , MinimalInitiatorContext (..)
  , ResponderContext (..)
  , ConnectionId (..)
  , ControlMessage (..)
  , ControlMessageSTM
  , RemoteAddress
  , RemoteConnectionId
  , IsBigLedgerPeer (..)
  , NumTxIdsToAck (..)
  , ProtocolLimitFailure
  , Handshake
  , Socket
    -- ** Exceptions
  , ExceptionInHandler (..)
    -- ** Traces
  , AcceptConnectionsPolicyTrace (..)
  , TraceSendRecv (..)
  , HandshakeTr
    -- * For Consensus ThreadNet Tests
  , chainSyncMiniProtocolNum
  , blockFetchMiniProtocolNum
  , txSubmissionMiniProtocolNum
  , keepAliveMiniProtocolNum
  , peerSharingMiniProtocolNum
  ) where

import Control.Exception (SomeException)

import Codec.CBOR.Term qualified as CBOR
import Data.ByteString.Lazy qualified as BL
import Data.Word
import Network.Mux qualified as Mx
import Network.Socket (Socket, StructLinger (..))
import Network.Socket qualified as Socket

import Ouroboros.Network.ConnectionManager.Types (ExceptionInHandler (..))
import Ouroboros.Network.Context
import Ouroboros.Network.ControlMessage (ControlMessage (..))
import Ouroboros.Network.Driver (TraceSendRecv (..))
import Ouroboros.Network.Driver.Limits (ProtocolLimitFailure (..))
import Ouroboros.Network.Mux
import Ouroboros.Network.NodeToNode.Version
import Ouroboros.Network.PeerSelection.Governor.Types
           (PeerSelectionTargets (..))
import Ouroboros.Network.PeerSelection.PeerAdvertise (PeerAdvertise (..))
import Ouroboros.Network.PeerSelection.PeerSharing (PeerSharing (..))
import Ouroboros.Network.Protocol.Handshake.Codec
import Ouroboros.Network.Protocol.Handshake.Type
import Ouroboros.Network.Protocol.Handshake.Version hiding (Accept)
import Ouroboros.Network.Protocol.TxSubmission2.Type (NumTxIdsToAck (..))
import Ouroboros.Network.Server.RateLimiting
import Ouroboros.Network.Snocket
import Ouroboros.Network.Socket
import Ouroboros.Network.Util.ShowProxy (ShowProxy, showProxy)


-- The Handshake tracer types are simply terrible.
type HandshakeTr ntnAddr ntnVersion =
    Mx.WithBearer (ConnectionId ntnAddr)
                  (TraceSendRecv (Handshake ntnVersion CBOR.Term))


data NodeToNodeProtocols appType initiatorCtx responderCtx bytes m a b = NodeToNodeProtocols {
    -- | chain-sync mini-protocol
    --
    forall (appType :: Mode) initiatorCtx responderCtx bytes
       (m :: * -> *) a b.
NodeToNodeProtocols appType initiatorCtx responderCtx bytes m a b
-> RunMiniProtocol appType initiatorCtx responderCtx bytes m a b
chainSyncProtocol    :: RunMiniProtocol appType initiatorCtx responderCtx bytes m a b,

    -- | block-fetch mini-protocol
    --
    forall (appType :: Mode) initiatorCtx responderCtx bytes
       (m :: * -> *) a b.
NodeToNodeProtocols appType initiatorCtx responderCtx bytes m a b
-> RunMiniProtocol appType initiatorCtx responderCtx bytes m a b
blockFetchProtocol   :: RunMiniProtocol appType initiatorCtx responderCtx bytes m a b,

    -- | tx-submission mini-protocol
    --
    forall (appType :: Mode) initiatorCtx responderCtx bytes
       (m :: * -> *) a b.
NodeToNodeProtocols appType initiatorCtx responderCtx bytes m a b
-> RunMiniProtocol appType initiatorCtx responderCtx bytes m a b
txSubmissionProtocol :: RunMiniProtocol appType initiatorCtx responderCtx bytes m a b,

    -- | keep-alive mini-protocol
    --
    forall (appType :: Mode) initiatorCtx responderCtx bytes
       (m :: * -> *) a b.
NodeToNodeProtocols appType initiatorCtx responderCtx bytes m a b
-> RunMiniProtocol appType initiatorCtx responderCtx bytes m a b
keepAliveProtocol    :: RunMiniProtocol appType initiatorCtx responderCtx bytes m a b,

    -- | peer sharing mini-protocol
    --
    forall (appType :: Mode) initiatorCtx responderCtx bytes
       (m :: * -> *) a b.
NodeToNodeProtocols appType initiatorCtx responderCtx bytes m a b
-> RunMiniProtocol appType initiatorCtx responderCtx bytes m a b
peerSharingProtocol  :: RunMiniProtocol appType initiatorCtx responderCtx bytes m a b

  }

type NodeToNodeProtocolsWithExpandedCtx appType ntnAddr bytes m a b =
    NodeToNodeProtocols appType (ExpandedInitiatorContext ntnAddr m) (ResponderContext ntnAddr) bytes m a b
type NodeToNodeProtocolsWithMinimalCtx  appType ntnAddr bytes m a b =
    NodeToNodeProtocols appType (MinimalInitiatorContext ntnAddr)  (ResponderContext ntnAddr) bytes m a b


data MiniProtocolParameters = MiniProtocolParameters {
      MiniProtocolParameters -> Word16
chainSyncPipeliningHighMark :: !Word16,
      -- ^ high threshold for pipelining (we will never exceed that many
      -- messages pipelined).

      MiniProtocolParameters -> Word16
chainSyncPipeliningLowMark  :: !Word16,
      -- ^ low threshold: if we hit the 'chainSyncPipeliningHighMark' we will
      -- listen for responses until there are at most
      -- 'chainSyncPipeliningLowMark' pipelined message
      --
      -- Must be smaller than 'chainSyncPipeliningHighMark'.
      --
      -- Note: 'chainSyncPipeliningLowMark' and 'chainSyncPipeliningLowMark'
      -- are passed to 'pipelineDecisionLowHighMark'.

      MiniProtocolParameters -> Word16
blockFetchPipeliningMax     :: !Word16,
      -- ^ maximal number of pipelined messages in 'block-fetch' mini-protocol.

      MiniProtocolParameters -> NumTxIdsToAck
txSubmissionMaxUnacked      :: !NumTxIdsToAck
      -- ^ maximal number of unacked tx (pipelining is bounded by twice this
      -- number)
    }

defaultMiniProtocolParameters :: MiniProtocolParameters
defaultMiniProtocolParameters :: MiniProtocolParameters
defaultMiniProtocolParameters = MiniProtocolParameters {
      chainSyncPipeliningLowMark :: Word16
chainSyncPipeliningLowMark  = Word16
200
    , chainSyncPipeliningHighMark :: Word16
chainSyncPipeliningHighMark = Word16
300
    , blockFetchPipeliningMax :: Word16
blockFetchPipeliningMax     = Word16
100
    , txSubmissionMaxUnacked :: NumTxIdsToAck
txSubmissionMaxUnacked       = NumTxIdsToAck
10
  }

-- | Make an 'OuroborosApplication' for the bundle of mini-protocols that
-- make up the overall node-to-node protocol.
--
-- This function specifies the wire format protocol numbers.
--
-- The application specific protocol numbers start from 2.  The
-- @'MiniProtocolNum' 0@ is reserved for the 'Handshake' protocol, while
-- @'MiniProtocolNum' 1@ is reserved for DeltaQ messages.
-- 'Handshake' protocol is not included in 'NodeToNodeProtocols' as it runs
-- before mux is started but it reusing 'MuxBearer' to send and receive
-- messages.  Only when the handshake protocol succeeds, we will know which
-- protocols to run / multiplex.
--
-- These are chosen to not overlap with the node to client protocol numbers (and
-- the handshake protocol number).  This is not essential for correctness, but
-- is helpful to allow a single shared implementation of tools that can analyse
-- both protocols, e.g.  wireshark plugins.
--
nodeToNodeProtocols
  :: MiniProtocolParameters
  -> NodeToNodeProtocols muxMode initiatorCtx responderCtx bytes m a b
  -> NodeToNodeVersion
  -- ^ negotiated version number
  -> NodeToNodeVersionData
  -- ^ negotiated version data
  -> OuroborosBundle muxMode initiatorCtx responderCtx bytes m a b
nodeToNodeProtocols :: forall (muxMode :: Mode) initiatorCtx responderCtx bytes
       (m :: * -> *) a b.
MiniProtocolParameters
-> NodeToNodeProtocols
     muxMode initiatorCtx responderCtx bytes m a b
-> NodeToNodeVersion
-> NodeToNodeVersionData
-> OuroborosBundle muxMode initiatorCtx responderCtx bytes m a b
nodeToNodeProtocols MiniProtocolParameters
miniProtocolParameters NodeToNodeProtocols muxMode initiatorCtx responderCtx bytes m a b
protocols
                    NodeToNodeVersion
_version NodeToNodeVersionData { PeerSharing
peerSharing :: PeerSharing
peerSharing :: NodeToNodeVersionData -> PeerSharing
peerSharing }
                    =
    WithProtocolTemperature
  'Hot [MiniProtocol muxMode initiatorCtx responderCtx bytes m a b]
-> WithProtocolTemperature
     'Warm [MiniProtocol muxMode initiatorCtx responderCtx bytes m a b]
-> WithProtocolTemperature
     'Established
     [MiniProtocol muxMode initiatorCtx responderCtx bytes m a b]
-> TemperatureBundle
     [MiniProtocol muxMode initiatorCtx responderCtx bytes m a b]
forall a.
WithProtocolTemperature 'Hot a
-> WithProtocolTemperature 'Warm a
-> WithProtocolTemperature 'Established a
-> TemperatureBundle a
TemperatureBundle
      -- Hot protocols: 'chain-sync', 'block-fetch' and 'tx-submission'.
      ([MiniProtocol muxMode initiatorCtx responderCtx bytes m a b]
-> WithProtocolTemperature
     'Hot [MiniProtocol muxMode initiatorCtx responderCtx bytes m a b]
forall a. a -> WithProtocolTemperature 'Hot a
WithHot ([MiniProtocol muxMode initiatorCtx responderCtx bytes m a b]
 -> WithProtocolTemperature
      'Hot [MiniProtocol muxMode initiatorCtx responderCtx bytes m a b])
-> [MiniProtocol muxMode initiatorCtx responderCtx bytes m a b]
-> WithProtocolTemperature
     'Hot [MiniProtocol muxMode initiatorCtx responderCtx bytes m a b]
forall a b. (a -> b) -> a -> b
$
        case NodeToNodeProtocols muxMode initiatorCtx responderCtx bytes m a b
protocols of
          NodeToNodeProtocols { RunMiniProtocol muxMode initiatorCtx responderCtx bytes m a b
chainSyncProtocol :: forall (appType :: Mode) initiatorCtx responderCtx bytes
       (m :: * -> *) a b.
NodeToNodeProtocols appType initiatorCtx responderCtx bytes m a b
-> RunMiniProtocol appType initiatorCtx responderCtx bytes m a b
chainSyncProtocol :: RunMiniProtocol muxMode initiatorCtx responderCtx bytes m a b
chainSyncProtocol,
                                RunMiniProtocol muxMode initiatorCtx responderCtx bytes m a b
blockFetchProtocol :: forall (appType :: Mode) initiatorCtx responderCtx bytes
       (m :: * -> *) a b.
NodeToNodeProtocols appType initiatorCtx responderCtx bytes m a b
-> RunMiniProtocol appType initiatorCtx responderCtx bytes m a b
blockFetchProtocol :: RunMiniProtocol muxMode initiatorCtx responderCtx bytes m a b
blockFetchProtocol,
                                RunMiniProtocol muxMode initiatorCtx responderCtx bytes m a b
txSubmissionProtocol :: forall (appType :: Mode) initiatorCtx responderCtx bytes
       (m :: * -> *) a b.
NodeToNodeProtocols appType initiatorCtx responderCtx bytes m a b
-> RunMiniProtocol appType initiatorCtx responderCtx bytes m a b
txSubmissionProtocol :: RunMiniProtocol muxMode initiatorCtx responderCtx bytes m a b
txSubmissionProtocol
                              } ->
            [ MiniProtocol {
                miniProtocolNum :: MiniProtocolNum
miniProtocolNum    = MiniProtocolNum
chainSyncMiniProtocolNum,
                miniProtocolStart :: StartOnDemandOrEagerly
miniProtocolStart  = StartOnDemandOrEagerly
StartOnDemand,
                miniProtocolLimits :: MiniProtocolLimits
miniProtocolLimits = MiniProtocolParameters -> MiniProtocolLimits
chainSyncProtocolLimits MiniProtocolParameters
miniProtocolParameters,
                miniProtocolRun :: RunMiniProtocol muxMode initiatorCtx responderCtx bytes m a b
miniProtocolRun    = RunMiniProtocol muxMode initiatorCtx responderCtx bytes m a b
chainSyncProtocol
              }
            , MiniProtocol {
                miniProtocolNum :: MiniProtocolNum
miniProtocolNum    = MiniProtocolNum
blockFetchMiniProtocolNum,
                miniProtocolStart :: StartOnDemandOrEagerly
miniProtocolStart  = StartOnDemandOrEagerly
StartOnDemand,
                miniProtocolLimits :: MiniProtocolLimits
miniProtocolLimits = MiniProtocolParameters -> MiniProtocolLimits
blockFetchProtocolLimits MiniProtocolParameters
miniProtocolParameters,
                miniProtocolRun :: RunMiniProtocol muxMode initiatorCtx responderCtx bytes m a b
miniProtocolRun    = RunMiniProtocol muxMode initiatorCtx responderCtx bytes m a b
blockFetchProtocol
              }
            , MiniProtocol {
                miniProtocolNum :: MiniProtocolNum
miniProtocolNum    = MiniProtocolNum
txSubmissionMiniProtocolNum,
                miniProtocolStart :: StartOnDemandOrEagerly
miniProtocolStart  = StartOnDemandOrEagerly
StartOnDemand,
                miniProtocolLimits :: MiniProtocolLimits
miniProtocolLimits = MiniProtocolParameters -> MiniProtocolLimits
txSubmissionProtocolLimits MiniProtocolParameters
miniProtocolParameters,
                miniProtocolRun :: RunMiniProtocol muxMode initiatorCtx responderCtx bytes m a b
miniProtocolRun    = RunMiniProtocol muxMode initiatorCtx responderCtx bytes m a b
txSubmissionProtocol
              }
            ])

      -- Warm protocols: reserved for 'tip-sample'.
      ([MiniProtocol muxMode initiatorCtx responderCtx bytes m a b]
-> WithProtocolTemperature
     'Warm [MiniProtocol muxMode initiatorCtx responderCtx bytes m a b]
forall a. a -> WithProtocolTemperature 'Warm a
WithWarm [])

      -- Established protocols: 'keep-alive'.
      ([MiniProtocol muxMode initiatorCtx responderCtx bytes m a b]
-> WithProtocolTemperature
     'Established
     [MiniProtocol muxMode initiatorCtx responderCtx bytes m a b]
forall a. a -> WithProtocolTemperature 'Established a
WithEstablished ([MiniProtocol muxMode initiatorCtx responderCtx bytes m a b]
 -> WithProtocolTemperature
      'Established
      [MiniProtocol muxMode initiatorCtx responderCtx bytes m a b])
-> [MiniProtocol muxMode initiatorCtx responderCtx bytes m a b]
-> WithProtocolTemperature
     'Established
     [MiniProtocol muxMode initiatorCtx responderCtx bytes m a b]
forall a b. (a -> b) -> a -> b
$
        case NodeToNodeProtocols muxMode initiatorCtx responderCtx bytes m a b
protocols of
          NodeToNodeProtocols { RunMiniProtocol muxMode initiatorCtx responderCtx bytes m a b
keepAliveProtocol :: forall (appType :: Mode) initiatorCtx responderCtx bytes
       (m :: * -> *) a b.
NodeToNodeProtocols appType initiatorCtx responderCtx bytes m a b
-> RunMiniProtocol appType initiatorCtx responderCtx bytes m a b
keepAliveProtocol :: RunMiniProtocol muxMode initiatorCtx responderCtx bytes m a b
keepAliveProtocol,
                                RunMiniProtocol muxMode initiatorCtx responderCtx bytes m a b
peerSharingProtocol :: forall (appType :: Mode) initiatorCtx responderCtx bytes
       (m :: * -> *) a b.
NodeToNodeProtocols appType initiatorCtx responderCtx bytes m a b
-> RunMiniProtocol appType initiatorCtx responderCtx bytes m a b
peerSharingProtocol :: RunMiniProtocol muxMode initiatorCtx responderCtx bytes m a b
peerSharingProtocol } ->
              MiniProtocol {
                miniProtocolNum :: MiniProtocolNum
miniProtocolNum    = MiniProtocolNum
keepAliveMiniProtocolNum,
                miniProtocolStart :: StartOnDemandOrEagerly
miniProtocolStart  = StartOnDemandOrEagerly
StartOnDemandAny,
                miniProtocolLimits :: MiniProtocolLimits
miniProtocolLimits = MiniProtocolParameters -> MiniProtocolLimits
keepAliveProtocolLimits MiniProtocolParameters
miniProtocolParameters,
                miniProtocolRun :: RunMiniProtocol muxMode initiatorCtx responderCtx bytes m a b
miniProtocolRun    = RunMiniProtocol muxMode initiatorCtx responderCtx bytes m a b
keepAliveProtocol
              }
            MiniProtocol muxMode initiatorCtx responderCtx bytes m a b
-> [MiniProtocol muxMode initiatorCtx responderCtx bytes m a b]
-> [MiniProtocol muxMode initiatorCtx responderCtx bytes m a b]
forall a. a -> [a] -> [a]
: case PeerSharing
peerSharing of
                PeerSharing
PeerSharingEnabled ->
                  [ MiniProtocol {
                      miniProtocolNum :: MiniProtocolNum
miniProtocolNum    = MiniProtocolNum
peerSharingMiniProtocolNum,
                      miniProtocolStart :: StartOnDemandOrEagerly
miniProtocolStart  = StartOnDemandOrEagerly
StartOnDemand,
                      miniProtocolLimits :: MiniProtocolLimits
miniProtocolLimits = MiniProtocolParameters -> MiniProtocolLimits
peerSharingProtocolLimits MiniProtocolParameters
miniProtocolParameters,
                      miniProtocolRun :: RunMiniProtocol muxMode initiatorCtx responderCtx bytes m a b
miniProtocolRun    = RunMiniProtocol muxMode initiatorCtx responderCtx bytes m a b
peerSharingProtocol
                    }
                  ]
                PeerSharing
PeerSharingDisabled ->
                  []
      )

addSafetyMargin :: Int -> Int
addSafetyMargin :: Int -> Int
addSafetyMargin Int
x = Int
x Int -> Int -> Int
forall a. Num a => a -> a -> a
+ Int
x Int -> Int -> Int
forall a. Integral a => a -> a -> a
`div` Int
10

chainSyncProtocolLimits
  , blockFetchProtocolLimits
  , txSubmissionProtocolLimits
  , keepAliveProtocolLimits
  , peerSharingProtocolLimits :: MiniProtocolParameters -> MiniProtocolLimits

chainSyncProtocolLimits :: MiniProtocolParameters -> MiniProtocolLimits
chainSyncProtocolLimits MiniProtocolParameters { Word16
chainSyncPipeliningHighMark :: MiniProtocolParameters -> Word16
chainSyncPipeliningHighMark :: Word16
chainSyncPipeliningHighMark } =
  MiniProtocolLimits {
      -- The largest message over ChainSync is @MsgRollForward@ which mainly
      -- consists of a BlockHeader.
      -- TODO: 1400 comes from maxBlockHeaderSize in genesis, but should come
      -- from consensus rather than being hard coded.
      maximumIngressQueue :: Int
maximumIngressQueue = Int -> Int
addSafetyMargin (Int -> Int) -> Int -> Int
forall a b. (a -> b) -> a -> b
$
        Word16 -> Int
forall a b. (Integral a, Num b) => a -> b
fromIntegral Word16
chainSyncPipeliningHighMark Int -> Int -> Int
forall a. Num a => a -> a -> a
* Int
1400
    }

blockFetchProtocolLimits :: MiniProtocolParameters -> MiniProtocolLimits
blockFetchProtocolLimits MiniProtocolParameters { Word16
blockFetchPipeliningMax :: MiniProtocolParameters -> Word16
blockFetchPipeliningMax :: Word16
blockFetchPipeliningMax } = MiniProtocolLimits {
    -- block-fetch client can pipeline at most 'blockFetchPipeliningMax'
    -- blocks (currently '10').  This is currently hard coded in
    -- 'Ouroboros.Network.BlockFetch.blockFetchLogic' (where
    -- @maxInFlightReqsPerPeer = 100@ is specified).  In the future the
    -- block fetch client will count bytes rather than blocks.  By far
    -- the largest (and the only pipelined message) in 'block-fetch'
    -- protocol is 'MsgBlock'.  Current block size limit is 88kiB and
    -- `blockFetchPipeliningMax` below is set to `100`.  This means that
    -- overall queue limit must be:
    --
    --   ```
        -- 100 * 88kiB = 8.8MiB
    --   ```
    --
    -- In the byron era this limit was set to `10 * 2MiB`, we keep the more
    -- relaxed limit here.
    --
    maximumIngressQueue :: Int
maximumIngressQueue = Int -> Int
addSafetyMargin (Int -> Int) -> Int -> Int
forall a b. (a -> b) -> a -> b
$
      Int -> Int -> Int
forall a. Ord a => a -> a -> a
max (Int
10 Int -> Int -> Int
forall a. Num a => a -> a -> a
* Int
2_097_154 :: Int) (Word16 -> Int
forall a b. (Integral a, Num b) => a -> b
fromIntegral Word16
blockFetchPipeliningMax Int -> Int -> Int
forall a. Num a => a -> a -> a
* Int
90_112)
  }

txSubmissionProtocolLimits :: MiniProtocolParameters -> MiniProtocolLimits
txSubmissionProtocolLimits MiniProtocolParameters { NumTxIdsToAck
txSubmissionMaxUnacked :: MiniProtocolParameters -> NumTxIdsToAck
txSubmissionMaxUnacked :: NumTxIdsToAck
txSubmissionMaxUnacked } = MiniProtocolLimits {
      -- tx-submission server can pipeline both 'MsgRequestTxIds' and
      -- 'MsgRequestTx'. This means that there can be many
      -- 'MsgReplyTxIds', 'MsgReplyTxs' messages in an inbound queue (their
      -- sizes are strictly greater than the corresponding request
      -- messages).
      --
      -- Each 'MsgRequestTx' can contain at max @maxTxIdsToRequest = 3@
      -- (defined in -- 'Ouroboros.Network.TxSubmission.Inbound.txSubmissionInbound')
      --
      -- Each 'MsgRequestTx' can request at max @maxTxToRequest = 2@
      -- (defined in -- 'Ouroboros.Network.TxSubmission.Inbound.txSubmissionInbound')
      --
      -- The 'txSubmissionInBound' server can at most put `100`
      -- unacknowledged transactions.  It also pipelines both 'MsgRequestTx`
      -- and `MsgRequestTx` in turn. This means that the inbound queue can
      -- have at most `100` `MsgRequestTxIds` and `MsgRequestTx` which will
      -- contain a single `TxId` / `Tx`.
      --
      -- TODO: the unacknowledged transactions are configured in `NodeArgs`,
      -- and we should take this parameter as an input for this computation.
      --
      -- The upper bound of size of a single transaction is 64k, while the
      -- size of `TxId` is `34` bytes (`type TxId = Hash Tx`).
      --
      -- Ingress side of `txSubmissinInbound`
      --
      -- - 'MsgReplyTxs' carrying a single `TxId`:
      -- ```
      --    1  -- encodeListLen 2
      --  + 1  -- encodeWord 1
      --  + 1  -- encodeListLenIndef
      --  + 1  -- encodeListLen 2
      --  + 34 -- encode 'TxId'
      --  + 5  -- encodeWord32 (size of tx)
      --  + 1  -- encodeBreak
      --  = 44
      -- ```
      -- - 'MsgReplyTx' carrying a single 'Tx':
      -- ```
      --    1      -- encodeListLen 2
      --  + 1      -- encodeWord 3
      --  + 1      -- encodeListLenIndef
      --  + 65_536 -- 64kiB transaction
      --  + 1      -- encodeBreak
      --  = 65_540
      -- ```
      --
      -- On the ingress side of 'txSubmissionOutbound' we can have at most
      -- `MaxUnacked' 'MsgRequestTxsIds' and the same amount of
      -- 'MsgRequsetTx' containing a single 'TxId'.  The size of
      -- 'MsgRequestTxsIds' is much smaller that 'MsgReplyTx', and the size
      -- of `MsgReqeustTx` with a single 'TxId' is smaller than
      -- 'MsgReplyTxIds' which contains a single 'TxId' (it just contains
      -- the 'TxId' without the size of 'Tx' in bytes).  So the ingress
      -- queue of 'txSubmissionOutbound' is bounded by the ingress side of
      -- the 'txSubmissionInbound'
      --
      -- Currently the value of 'txSubmissionMaxUnacked' is '100', for
      -- which the upper bound is `100 * (44 + 65_540) = 6_558_400`, we add
      -- 10% as a safety margin.
      --
      maximumIngressQueue :: Int
maximumIngressQueue = Int -> Int
addSafetyMargin (Int -> Int) -> Int -> Int
forall a b. (a -> b) -> a -> b
$
          NumTxIdsToAck -> Int
forall a b. (Integral a, Num b) => a -> b
fromIntegral NumTxIdsToAck
txSubmissionMaxUnacked Int -> Int -> Int
forall a. Num a => a -> a -> a
* (Int
44 Int -> Int -> Int
forall a. Num a => a -> a -> a
+ Int
65_540)
    }

keepAliveProtocolLimits :: MiniProtocolParameters -> MiniProtocolLimits
keepAliveProtocolLimits MiniProtocolParameters
_ =
  MiniProtocolLimits {
      -- One small outstanding message.
      maximumIngressQueue :: Int
maximumIngressQueue = Int -> Int
addSafetyMargin Int
1280
    }

peerSharingProtocolLimits :: MiniProtocolParameters -> MiniProtocolLimits
peerSharingProtocolLimits MiniProtocolParameters
_ =
  MiniProtocolLimits {
  -- This protocol does not need to be pipelined and a peer can only ask
  -- for a maximum of 255 peers each time. Hence a reply can have up to
  -- 255 IP (IPv4 or IPv6) addresses so 255 * 16 = 4080. TCP has an initial
  -- window size of 4 and a TCP segment is 1440, which gives us 4 * 1440 =
  -- 5760 bytes to fit into a single RTT. So setting the maximum ingress
  -- queue to be a single RTT should be enough to cover for CBOR overhead.
  maximumIngressQueue :: Int
maximumIngressQueue = Int
4 Int -> Int -> Int
forall a. Num a => a -> a -> a
* Int
1440
  }

chainSyncMiniProtocolNum :: MiniProtocolNum
chainSyncMiniProtocolNum :: MiniProtocolNum
chainSyncMiniProtocolNum = Word16 -> MiniProtocolNum
MiniProtocolNum Word16
2

blockFetchMiniProtocolNum :: MiniProtocolNum
blockFetchMiniProtocolNum :: MiniProtocolNum
blockFetchMiniProtocolNum = Word16 -> MiniProtocolNum
MiniProtocolNum Word16
3

txSubmissionMiniProtocolNum :: MiniProtocolNum
txSubmissionMiniProtocolNum :: MiniProtocolNum
txSubmissionMiniProtocolNum = Word16 -> MiniProtocolNum
MiniProtocolNum Word16
4

keepAliveMiniProtocolNum :: MiniProtocolNum
keepAliveMiniProtocolNum :: MiniProtocolNum
keepAliveMiniProtocolNum = Word16 -> MiniProtocolNum
MiniProtocolNum Word16
8

peerSharingMiniProtocolNum :: MiniProtocolNum
peerSharingMiniProtocolNum :: MiniProtocolNum
peerSharingMiniProtocolNum = Word16 -> MiniProtocolNum
MiniProtocolNum Word16
10

-- | A specialised version of @'Ouroboros.Network.Socket.connectToNode'@.
--
connectTo
  :: Snocket IO Socket.Socket Socket.SockAddr
  -> NetworkConnectTracers Socket.SockAddr NodeToNodeVersion
  -> Versions NodeToNodeVersion
              NodeToNodeVersionData
              (OuroborosApplicationWithMinimalCtx
                 Mx.InitiatorMode Socket.SockAddr BL.ByteString IO a b)
  -> Maybe Socket.SockAddr
  -> Socket.SockAddr
  -> IO (Either SomeException (Either a b))
connectTo :: forall a b.
Snocket IO Socket SockAddr
-> NetworkConnectTracers SockAddr NodeToNodeVersion
-> Versions
     NodeToNodeVersion
     NodeToNodeVersionData
     (OuroborosApplicationWithMinimalCtx
        'InitiatorMode SockAddr ByteString IO a b)
-> Maybe SockAddr
-> SockAddr
-> IO (Either SomeException (Either a b))
connectTo Snocket IO Socket SockAddr
sn NetworkConnectTracers SockAddr NodeToNodeVersion
tr =
    Snocket IO Socket SockAddr
-> MakeBearer IO Socket
-> ConnectToArgs
     Socket SockAddr NodeToNodeVersion NodeToNodeVersionData
-> (Socket -> IO ())
-> Versions
     NodeToNodeVersion
     NodeToNodeVersionData
     (OuroborosApplicationWithMinimalCtx
        'InitiatorMode SockAddr ByteString IO a b)
-> Maybe SockAddr
-> SockAddr
-> IO (Either SomeException (Either a b))
forall (muxMode :: Mode) vNumber vData fd addr a b.
(Ord vNumber, Typeable vNumber, Show vNumber,
 HasInitiator muxMode ~ 'True) =>
Snocket IO fd addr
-> MakeBearer IO fd
-> ConnectToArgs fd addr vNumber vData
-> (fd -> IO ())
-> Versions
     vNumber
     vData
     (OuroborosApplicationWithMinimalCtx muxMode addr ByteString IO a b)
-> Maybe addr
-> addr
-> IO (Either SomeException (Either a b))
connectToNode Snocket IO Socket SockAddr
sn MakeBearer IO Socket
makeSocketBearer
                  ConnectToArgs {
                    ctaHandshakeCodec :: Codec
  (Handshake NodeToNodeVersion Term) DeserialiseFailure IO ByteString
ctaHandshakeCodec      = Codec
  (Handshake NodeToNodeVersion Term) DeserialiseFailure IO ByteString
forall (m :: * -> *).
MonadST m =>
Codec
  (Handshake NodeToNodeVersion Term) DeserialiseFailure m ByteString
nodeToNodeHandshakeCodec,
                    ctaHandshakeTimeLimits :: ProtocolTimeLimits (Handshake NodeToNodeVersion Term)
ctaHandshakeTimeLimits = ProtocolTimeLimits (Handshake NodeToNodeVersion Term)
forall {k} (vNumber :: k).
ProtocolTimeLimits (Handshake vNumber Term)
timeLimitsHandshake,
                    ctaVersionDataCodec :: VersionDataCodec Term NodeToNodeVersion NodeToNodeVersionData
ctaVersionDataCodec    = (NodeToNodeVersion -> CodecCBORTerm Text NodeToNodeVersionData)
-> VersionDataCodec Term NodeToNodeVersion NodeToNodeVersionData
forall vNumber vData.
(vNumber -> CodecCBORTerm Text vData)
-> VersionDataCodec Term vNumber vData
cborTermVersionDataCodec NodeToNodeVersion -> CodecCBORTerm Text NodeToNodeVersionData
nodeToNodeCodecCBORTerm,
                    ctaConnectTracers :: NetworkConnectTracers SockAddr NodeToNodeVersion
ctaConnectTracers      = NetworkConnectTracers SockAddr NodeToNodeVersion
tr,
                    ctaHandshakeCallbacks :: HandshakeCallbacks NodeToNodeVersionData
ctaHandshakeCallbacks  = (NodeToNodeVersionData
 -> NodeToNodeVersionData -> Accept NodeToNodeVersionData)
-> (NodeToNodeVersionData -> Bool)
-> HandshakeCallbacks NodeToNodeVersionData
forall vData.
(vData -> vData -> Accept vData)
-> (vData -> Bool) -> HandshakeCallbacks vData
HandshakeCallbacks NodeToNodeVersionData
-> NodeToNodeVersionData -> Accept NodeToNodeVersionData
forall v. Acceptable v => v -> v -> Accept v
acceptableVersion NodeToNodeVersionData -> Bool
forall v. Queryable v => v -> Bool
queryVersion
                  }
                  Socket -> IO ()
configureOutboundSocket
  where
    configureOutboundSocket :: Socket -> IO ()
    configureOutboundSocket :: Socket -> IO ()
configureOutboundSocket Socket
sock = do
        Socket -> SocketOption -> Int -> IO ()
Socket.setSocketOption Socket
sock SocketOption
Socket.NoDelay Int
1
        Socket -> SocketOption -> StructLinger -> IO ()
forall a. Storable a => Socket -> SocketOption -> a -> IO ()
Socket.setSockOpt Socket
sock SocketOption
Socket.Linger
                              (StructLinger { sl_onoff :: CInt
sl_onoff  = CInt
1,
                                              sl_linger :: CInt
sl_linger = CInt
0 })


type RemoteAddress      = Socket.SockAddr

instance ShowProxy RemoteAddress where
  showProxy :: Proxy SockAddr -> String
showProxy Proxy SockAddr
_ = String
"SockAddr"

type RemoteConnectionId = ConnectionId RemoteAddress