{-# LANGUAGE DataKinds           #-}
{-# LANGUAGE FlexibleInstances   #-}
{-# LANGUAGE NamedFieldPuns      #-}
{-# LANGUAGE NumericUnderscores  #-}
{-# LANGUAGE RankNTypes          #-}
{-# LANGUAGE ScopedTypeVariables #-}
{-# LANGUAGE TypeFamilies        #-}
{-# LANGUAGE TypeOperators       #-}

{-# OPTIONS_GHC -Wno-orphans #-}

-- | This is the starting point for a module that will bring together the
-- overall node to node protocol, as a collection of mini-protocols.
--
module Ouroboros.Network.NodeToNode
  ( nodeToNodeProtocols
  , NodeToNodeProtocols (..)
  , NodeToNodeProtocolsWithExpandedCtx
  , NodeToNodeProtocolsWithMinimalCtx
  , MiniProtocolParameters (..)
  , chainSyncProtocolLimits
  , blockFetchProtocolLimits
  , txSubmissionProtocolLimits
  , keepAliveProtocolLimits
  , defaultMiniProtocolParameters
  , NodeToNodeVersion (..)
  , NodeToNodeVersionData (..)
  , NetworkConnectTracers (..)
  , nullNetworkConnectTracers
  , connectTo
  , NetworkServerTracers (..)
  , nullNetworkServerTracers
  , NetworkMutableState (..)
  , AcceptedConnectionsLimit (..)
  , newNetworkMutableState
  , newNetworkMutableStateSTM
  , cleanNetworkMutableState
  , withServer
    -- * P2P Governor
  , PeerAdvertise (..)
  , PeerSelectionTargets (..)
    -- * Subscription Workers
    -- ** IP subscription worker
  , IPSubscriptionTarget (..)
  , NetworkIPSubscriptionTracers
  , NetworkSubscriptionTracers (..)
  , nullNetworkSubscriptionTracers
  , SubscriptionParams (..)
  , IPSubscriptionParams
  , ipSubscriptionWorker
    -- ** DNS subscription worker
  , DnsSubscriptionTarget (..)
  , DnsSubscriptionParams
  , NetworkDNSSubscriptionTracers (..)
  , nullNetworkDNSSubscriptionTracers
  , dnsSubscriptionWorker
    -- ** Versions
  , Versions (..)
  , DiffusionMode (..)
  , simpleSingletonVersions
  , foldMapVersions
  , combineVersions
    -- *** Codecs
  , nodeToNodeHandshakeCodec
  , nodeToNodeVersionCodec
  , nodeToNodeCodecCBORTerm
    -- * Re-exports
  , ExpandedInitiatorContext (..)
  , MinimalInitiatorContext (..)
  , ResponderContext (..)
  , ConnectionId (..)
  , ControlMessage (..)
  , ControlMessageSTM
  , RemoteAddress
  , RemoteConnectionId
  , IsBigLedgerPeer (..)
  , NumTxIdsToAck (..)
  , ProtocolLimitFailure
  , Handshake
  , LocalAddresses (..)
  , Socket
    -- ** Exceptions
  , ExceptionInHandler (..)
    -- ** Error Policies and Peer state
  , ErrorPolicies (..)
  , remoteNetworkErrorPolicy
  , localNetworkErrorPolicy
  , nullErrorPolicies
  , ErrorPolicy (..)
  , SuspendDecision (..)
    -- ** Traces
  , AcceptConnectionsPolicyTrace (..)
  , TraceSendRecv (..)
  , SubscriptionTrace (..)
  , DnsTrace (..)
  , ErrorPolicyTrace (..)
  , WithIPList (..)
  , WithDomainName (..)
  , WithAddr (..)
  , HandshakeTr
    -- * For Consensus ThreadNet Tests
  , chainSyncMiniProtocolNum
  , blockFetchMiniProtocolNum
  , txSubmissionMiniProtocolNum
  , keepAliveMiniProtocolNum
  , peerSharingMiniProtocolNum
  ) where

import Control.Concurrent.Async qualified as Async
import Control.Exception (IOException, SomeException)
import Control.Monad.Class.MonadTime.SI (DiffTime)

import Codec.CBOR.Read qualified as CBOR
import Codec.CBOR.Term qualified as CBOR
import Data.ByteString.Lazy qualified as BL
import Data.Functor (void)
import Data.Void (Void)
import Data.Word
import Network.Mux qualified as Mx
import Network.Socket (Socket, StructLinger (..))
import Network.Socket qualified as Socket

import Ouroboros.Network.BlockFetch.Client (BlockFetchProtocolFailure)
import Ouroboros.Network.ConnectionManager.Types (ExceptionInHandler (..))
import Ouroboros.Network.Context
import Ouroboros.Network.ControlMessage (ControlMessage (..))
import Ouroboros.Network.Driver (TraceSendRecv (..))
import Ouroboros.Network.Driver.Limits (ProtocolLimitFailure (..))
import Ouroboros.Network.Driver.Simple (DecoderFailure)
import Ouroboros.Network.ErrorPolicy
import Ouroboros.Network.IOManager
import Ouroboros.Network.Mux
import Ouroboros.Network.NodeToNode.Version
import Ouroboros.Network.PeerSelection.Governor.Types
           (PeerSelectionTargets (..))
import Ouroboros.Network.PeerSelection.PeerAdvertise (PeerAdvertise (..))
import Ouroboros.Network.PeerSelection.PeerSharing (PeerSharing (..))
import Ouroboros.Network.Protocol.Handshake.Codec
import Ouroboros.Network.Protocol.Handshake.Type
import Ouroboros.Network.Protocol.Handshake.Version hiding (Accept)
import Ouroboros.Network.Protocol.TxSubmission2.Type (NumTxIdsToAck (..))
import Ouroboros.Network.Snocket
import Ouroboros.Network.Socket
import Ouroboros.Network.Subscription.Dns (DnsSubscriptionParams,
           DnsSubscriptionTarget (..), DnsTrace (..), WithDomainName (..))
import Ouroboros.Network.Subscription.Dns qualified as Subscription
import Ouroboros.Network.Subscription.Ip (IPSubscriptionParams,
           IPSubscriptionTarget (..), SubscriptionParams (..),
           SubscriptionTrace (..), WithIPList (..))
import Ouroboros.Network.Subscription.Ip qualified as Subscription
import Ouroboros.Network.Subscription.Worker (LocalAddresses (..),
           SubscriberError)
import Ouroboros.Network.Tracers
import Ouroboros.Network.TxSubmission.Inbound qualified as TxInbound
import Ouroboros.Network.TxSubmission.Outbound qualified as TxOutbound
import Ouroboros.Network.Util.ShowProxy (ShowProxy, showProxy)


-- The Handshake tracer types are simply terrible.
type HandshakeTr ntnAddr ntnVersion =
    Mx.WithBearer (ConnectionId ntnAddr)
                  (TraceSendRecv (Handshake ntnVersion CBOR.Term))


data NodeToNodeProtocols appType initiatorCtx responderCtx bytes m a b = NodeToNodeProtocols {
    -- | chain-sync mini-protocol
    --
    forall (appType :: Mode) initiatorCtx responderCtx bytes
       (m :: * -> *) a b.
NodeToNodeProtocols appType initiatorCtx responderCtx bytes m a b
-> RunMiniProtocol appType initiatorCtx responderCtx bytes m a b
chainSyncProtocol    :: RunMiniProtocol appType initiatorCtx responderCtx bytes m a b,

    -- | block-fetch mini-protocol
    --
    forall (appType :: Mode) initiatorCtx responderCtx bytes
       (m :: * -> *) a b.
NodeToNodeProtocols appType initiatorCtx responderCtx bytes m a b
-> RunMiniProtocol appType initiatorCtx responderCtx bytes m a b
blockFetchProtocol   :: RunMiniProtocol appType initiatorCtx responderCtx bytes m a b,

    -- | tx-submission mini-protocol
    --
    forall (appType :: Mode) initiatorCtx responderCtx bytes
       (m :: * -> *) a b.
NodeToNodeProtocols appType initiatorCtx responderCtx bytes m a b
-> RunMiniProtocol appType initiatorCtx responderCtx bytes m a b
txSubmissionProtocol :: RunMiniProtocol appType initiatorCtx responderCtx bytes m a b,

    -- | keep-alive mini-protocol
    --
    forall (appType :: Mode) initiatorCtx responderCtx bytes
       (m :: * -> *) a b.
NodeToNodeProtocols appType initiatorCtx responderCtx bytes m a b
-> RunMiniProtocol appType initiatorCtx responderCtx bytes m a b
keepAliveProtocol    :: RunMiniProtocol appType initiatorCtx responderCtx bytes m a b,

    -- | peer sharing mini-protocol
    --
    forall (appType :: Mode) initiatorCtx responderCtx bytes
       (m :: * -> *) a b.
NodeToNodeProtocols appType initiatorCtx responderCtx bytes m a b
-> RunMiniProtocol appType initiatorCtx responderCtx bytes m a b
peerSharingProtocol  :: RunMiniProtocol appType initiatorCtx responderCtx bytes m a b

  }

type NodeToNodeProtocolsWithExpandedCtx appType ntnAddr bytes m a b =
    NodeToNodeProtocols appType (ExpandedInitiatorContext ntnAddr m) (ResponderContext ntnAddr) bytes m a b
type NodeToNodeProtocolsWithMinimalCtx  appType ntnAddr bytes m a b =
    NodeToNodeProtocols appType (MinimalInitiatorContext ntnAddr)  (ResponderContext ntnAddr) bytes m a b


data MiniProtocolParameters = MiniProtocolParameters {
      MiniProtocolParameters -> Word16
chainSyncPipeliningHighMark :: !Word16,
      -- ^ high threshold for pipelining (we will never exceed that many
      -- messages pipelined).

      MiniProtocolParameters -> Word16
chainSyncPipeliningLowMark  :: !Word16,
      -- ^ low threshold: if we hit the 'chainSyncPipeliningHighMark' we will
      -- listen for responses until there are at most
      -- 'chainSyncPipeliningLowMark' pipelined message
      --
      -- Must be smaller than 'chainSyncPipeliningHighMark'.
      --
      -- Note: 'chainSyncPipeliningLowMark' and 'chainSyncPipeliningLowMark'
      -- are passed to 'pipelineDecisionLowHighMark'.

      MiniProtocolParameters -> Word16
blockFetchPipeliningMax     :: !Word16,
      -- ^ maximal number of pipelined messages in 'block-fetch' mini-protocol.

      MiniProtocolParameters -> NumTxIdsToAck
txSubmissionMaxUnacked      :: !NumTxIdsToAck
      -- ^ maximal number of unacked tx (pipelining is bounded by twice this
      -- number)
    }

defaultMiniProtocolParameters :: MiniProtocolParameters
defaultMiniProtocolParameters :: MiniProtocolParameters
defaultMiniProtocolParameters = MiniProtocolParameters {
      chainSyncPipeliningLowMark :: Word16
chainSyncPipeliningLowMark  = Word16
200
    , chainSyncPipeliningHighMark :: Word16
chainSyncPipeliningHighMark = Word16
300
    , blockFetchPipeliningMax :: Word16
blockFetchPipeliningMax     = Word16
100
    , txSubmissionMaxUnacked :: NumTxIdsToAck
txSubmissionMaxUnacked       = NumTxIdsToAck
10
  }

-- | Make an 'OuroborosApplication' for the bundle of mini-protocols that
-- make up the overall node-to-node protocol.
--
-- This function specifies the wire format protocol numbers.
--
-- The application specific protocol numbers start from 2.  The
-- @'MiniProtocolNum' 0@ is reserved for the 'Handshake' protocol, while
-- @'MiniProtocolNum' 1@ is reserved for DeltaQ messages.
-- 'Handshake' protocol is not included in 'NodeToNodeProtocols' as it runs
-- before mux is started but it reusing 'MuxBearer' to send and receive
-- messages.  Only when the handshake protocol succeeds, we will know which
-- protocols to run / multiplex.
--
-- These are chosen to not overlap with the node to client protocol numbers (and
-- the handshake protocol number).  This is not essential for correctness, but
-- is helpful to allow a single shared implementation of tools that can analyse
-- both protocols, e.g.  wireshark plugins.
--
nodeToNodeProtocols
  :: MiniProtocolParameters
  -> NodeToNodeProtocols muxMode initiatorCtx responderCtx bytes m a b
  -> NodeToNodeVersion
  -> PeerSharing -- ^ Node's own PeerSharing value
  -> OuroborosBundle muxMode initiatorCtx responderCtx bytes m a b
nodeToNodeProtocols :: forall (muxMode :: Mode) initiatorCtx responderCtx bytes
       (m :: * -> *) a b.
MiniProtocolParameters
-> NodeToNodeProtocols
     muxMode initiatorCtx responderCtx bytes m a b
-> NodeToNodeVersion
-> PeerSharing
-> OuroborosBundle muxMode initiatorCtx responderCtx bytes m a b
nodeToNodeProtocols MiniProtocolParameters
miniProtocolParameters NodeToNodeProtocols muxMode initiatorCtx responderCtx bytes m a b
protocols NodeToNodeVersion
_version PeerSharing
ownPeerSharing =
    WithProtocolTemperature
  'Hot [MiniProtocol muxMode initiatorCtx responderCtx bytes m a b]
-> WithProtocolTemperature
     'Warm [MiniProtocol muxMode initiatorCtx responderCtx bytes m a b]
-> WithProtocolTemperature
     'Established
     [MiniProtocol muxMode initiatorCtx responderCtx bytes m a b]
-> TemperatureBundle
     [MiniProtocol muxMode initiatorCtx responderCtx bytes m a b]
forall a.
WithProtocolTemperature 'Hot a
-> WithProtocolTemperature 'Warm a
-> WithProtocolTemperature 'Established a
-> TemperatureBundle a
TemperatureBundle
      -- Hot protocols: 'chain-sync', 'block-fetch' and 'tx-submission'.
      ([MiniProtocol muxMode initiatorCtx responderCtx bytes m a b]
-> WithProtocolTemperature
     'Hot [MiniProtocol muxMode initiatorCtx responderCtx bytes m a b]
forall a. a -> WithProtocolTemperature 'Hot a
WithHot ([MiniProtocol muxMode initiatorCtx responderCtx bytes m a b]
 -> WithProtocolTemperature
      'Hot [MiniProtocol muxMode initiatorCtx responderCtx bytes m a b])
-> [MiniProtocol muxMode initiatorCtx responderCtx bytes m a b]
-> WithProtocolTemperature
     'Hot [MiniProtocol muxMode initiatorCtx responderCtx bytes m a b]
forall a b. (a -> b) -> a -> b
$
        case NodeToNodeProtocols muxMode initiatorCtx responderCtx bytes m a b
protocols of
          NodeToNodeProtocols { RunMiniProtocol muxMode initiatorCtx responderCtx bytes m a b
chainSyncProtocol :: forall (appType :: Mode) initiatorCtx responderCtx bytes
       (m :: * -> *) a b.
NodeToNodeProtocols appType initiatorCtx responderCtx bytes m a b
-> RunMiniProtocol appType initiatorCtx responderCtx bytes m a b
chainSyncProtocol :: RunMiniProtocol muxMode initiatorCtx responderCtx bytes m a b
chainSyncProtocol,
                                RunMiniProtocol muxMode initiatorCtx responderCtx bytes m a b
blockFetchProtocol :: forall (appType :: Mode) initiatorCtx responderCtx bytes
       (m :: * -> *) a b.
NodeToNodeProtocols appType initiatorCtx responderCtx bytes m a b
-> RunMiniProtocol appType initiatorCtx responderCtx bytes m a b
blockFetchProtocol :: RunMiniProtocol muxMode initiatorCtx responderCtx bytes m a b
blockFetchProtocol,
                                RunMiniProtocol muxMode initiatorCtx responderCtx bytes m a b
txSubmissionProtocol :: forall (appType :: Mode) initiatorCtx responderCtx bytes
       (m :: * -> *) a b.
NodeToNodeProtocols appType initiatorCtx responderCtx bytes m a b
-> RunMiniProtocol appType initiatorCtx responderCtx bytes m a b
txSubmissionProtocol :: RunMiniProtocol muxMode initiatorCtx responderCtx bytes m a b
txSubmissionProtocol
                              } ->
            [ MiniProtocol {
                miniProtocolNum :: MiniProtocolNum
miniProtocolNum    = MiniProtocolNum
chainSyncMiniProtocolNum,
                miniProtocolLimits :: MiniProtocolLimits
miniProtocolLimits = MiniProtocolParameters -> MiniProtocolLimits
chainSyncProtocolLimits MiniProtocolParameters
miniProtocolParameters,
                miniProtocolRun :: RunMiniProtocol muxMode initiatorCtx responderCtx bytes m a b
miniProtocolRun    = RunMiniProtocol muxMode initiatorCtx responderCtx bytes m a b
chainSyncProtocol
              }
            , MiniProtocol {
                miniProtocolNum :: MiniProtocolNum
miniProtocolNum    = MiniProtocolNum
blockFetchMiniProtocolNum,
                miniProtocolLimits :: MiniProtocolLimits
miniProtocolLimits = MiniProtocolParameters -> MiniProtocolLimits
blockFetchProtocolLimits MiniProtocolParameters
miniProtocolParameters,
                miniProtocolRun :: RunMiniProtocol muxMode initiatorCtx responderCtx bytes m a b
miniProtocolRun    = RunMiniProtocol muxMode initiatorCtx responderCtx bytes m a b
blockFetchProtocol
              }
            , MiniProtocol {
                miniProtocolNum :: MiniProtocolNum
miniProtocolNum    = MiniProtocolNum
txSubmissionMiniProtocolNum,
                miniProtocolLimits :: MiniProtocolLimits
miniProtocolLimits = MiniProtocolParameters -> MiniProtocolLimits
txSubmissionProtocolLimits MiniProtocolParameters
miniProtocolParameters,
                miniProtocolRun :: RunMiniProtocol muxMode initiatorCtx responderCtx bytes m a b
miniProtocolRun    = RunMiniProtocol muxMode initiatorCtx responderCtx bytes m a b
txSubmissionProtocol
              }
            ])

      -- Warm protocols: reserved for 'tip-sample'.
      ([MiniProtocol muxMode initiatorCtx responderCtx bytes m a b]
-> WithProtocolTemperature
     'Warm [MiniProtocol muxMode initiatorCtx responderCtx bytes m a b]
forall a. a -> WithProtocolTemperature 'Warm a
WithWarm [])

      -- Established protocols: 'keep-alive'.
      ([MiniProtocol muxMode initiatorCtx responderCtx bytes m a b]
-> WithProtocolTemperature
     'Established
     [MiniProtocol muxMode initiatorCtx responderCtx bytes m a b]
forall a. a -> WithProtocolTemperature 'Established a
WithEstablished ([MiniProtocol muxMode initiatorCtx responderCtx bytes m a b]
 -> WithProtocolTemperature
      'Established
      [MiniProtocol muxMode initiatorCtx responderCtx bytes m a b])
-> [MiniProtocol muxMode initiatorCtx responderCtx bytes m a b]
-> WithProtocolTemperature
     'Established
     [MiniProtocol muxMode initiatorCtx responderCtx bytes m a b]
forall a b. (a -> b) -> a -> b
$
        case NodeToNodeProtocols muxMode initiatorCtx responderCtx bytes m a b
protocols of
          NodeToNodeProtocols { RunMiniProtocol muxMode initiatorCtx responderCtx bytes m a b
keepAliveProtocol :: forall (appType :: Mode) initiatorCtx responderCtx bytes
       (m :: * -> *) a b.
NodeToNodeProtocols appType initiatorCtx responderCtx bytes m a b
-> RunMiniProtocol appType initiatorCtx responderCtx bytes m a b
keepAliveProtocol :: RunMiniProtocol muxMode initiatorCtx responderCtx bytes m a b
keepAliveProtocol, RunMiniProtocol muxMode initiatorCtx responderCtx bytes m a b
peerSharingProtocol :: forall (appType :: Mode) initiatorCtx responderCtx bytes
       (m :: * -> *) a b.
NodeToNodeProtocols appType initiatorCtx responderCtx bytes m a b
-> RunMiniProtocol appType initiatorCtx responderCtx bytes m a b
peerSharingProtocol :: RunMiniProtocol muxMode initiatorCtx responderCtx bytes m a b
peerSharingProtocol }
            | PeerSharing
ownPeerSharing PeerSharing -> PeerSharing -> Bool
forall a. Eq a => a -> a -> Bool
/= PeerSharing
PeerSharingDisabled ->
            [ MiniProtocol {
                miniProtocolNum :: MiniProtocolNum
miniProtocolNum    = MiniProtocolNum
keepAliveMiniProtocolNum,
                miniProtocolLimits :: MiniProtocolLimits
miniProtocolLimits = MiniProtocolParameters -> MiniProtocolLimits
keepAliveProtocolLimits MiniProtocolParameters
miniProtocolParameters,
                miniProtocolRun :: RunMiniProtocol muxMode initiatorCtx responderCtx bytes m a b
miniProtocolRun    = RunMiniProtocol muxMode initiatorCtx responderCtx bytes m a b
keepAliveProtocol
              }
            , MiniProtocol {
                miniProtocolNum :: MiniProtocolNum
miniProtocolNum    = MiniProtocolNum
peerSharingMiniProtocolNum,
                miniProtocolLimits :: MiniProtocolLimits
miniProtocolLimits = MiniProtocolParameters -> MiniProtocolLimits
peerSharingProtocolLimits MiniProtocolParameters
miniProtocolParameters,
                miniProtocolRun :: RunMiniProtocol muxMode initiatorCtx responderCtx bytes m a b
miniProtocolRun    = RunMiniProtocol muxMode initiatorCtx responderCtx bytes m a b
peerSharingProtocol
              }
            ]
          NodeToNodeProtocols { RunMiniProtocol muxMode initiatorCtx responderCtx bytes m a b
keepAliveProtocol :: forall (appType :: Mode) initiatorCtx responderCtx bytes
       (m :: * -> *) a b.
NodeToNodeProtocols appType initiatorCtx responderCtx bytes m a b
-> RunMiniProtocol appType initiatorCtx responderCtx bytes m a b
keepAliveProtocol :: RunMiniProtocol muxMode initiatorCtx responderCtx bytes m a b
keepAliveProtocol }
            | Bool
otherwise ->
            [ MiniProtocol {
                miniProtocolNum :: MiniProtocolNum
miniProtocolNum    = MiniProtocolNum
keepAliveMiniProtocolNum,
                miniProtocolLimits :: MiniProtocolLimits
miniProtocolLimits = MiniProtocolParameters -> MiniProtocolLimits
keepAliveProtocolLimits MiniProtocolParameters
miniProtocolParameters,
                miniProtocolRun :: RunMiniProtocol muxMode initiatorCtx responderCtx bytes m a b
miniProtocolRun    = RunMiniProtocol muxMode initiatorCtx responderCtx bytes m a b
keepAliveProtocol
              }
            ])

addSafetyMargin :: Int -> Int
addSafetyMargin :: Int -> Int
addSafetyMargin Int
x = Int
x Int -> Int -> Int
forall a. Num a => a -> a -> a
+ Int
x Int -> Int -> Int
forall a. Integral a => a -> a -> a
`div` Int
10

chainSyncProtocolLimits
  , blockFetchProtocolLimits
  , txSubmissionProtocolLimits
  , keepAliveProtocolLimits
  , peerSharingProtocolLimits :: MiniProtocolParameters -> MiniProtocolLimits

chainSyncProtocolLimits :: MiniProtocolParameters -> MiniProtocolLimits
chainSyncProtocolLimits MiniProtocolParameters { Word16
chainSyncPipeliningHighMark :: MiniProtocolParameters -> Word16
chainSyncPipeliningHighMark :: Word16
chainSyncPipeliningHighMark } =
  MiniProtocolLimits {
      -- The largest message over ChainSync is @MsgRollForward@ which mainly
      -- consists of a BlockHeader.
      -- TODO: 1400 comes from maxBlockHeaderSize in genesis, but should come
      -- from consensus rather than being hard coded.
      maximumIngressQueue :: Int
maximumIngressQueue = Int -> Int
addSafetyMargin (Int -> Int) -> Int -> Int
forall a b. (a -> b) -> a -> b
$
        Word16 -> Int
forall a b. (Integral a, Num b) => a -> b
fromIntegral Word16
chainSyncPipeliningHighMark Int -> Int -> Int
forall a. Num a => a -> a -> a
* Int
1400
    }

blockFetchProtocolLimits :: MiniProtocolParameters -> MiniProtocolLimits
blockFetchProtocolLimits MiniProtocolParameters { Word16
blockFetchPipeliningMax :: MiniProtocolParameters -> Word16
blockFetchPipeliningMax :: Word16
blockFetchPipeliningMax } = MiniProtocolLimits {
    -- block-fetch client can pipeline at most 'blockFetchPipeliningMax'
    -- blocks (currently '10').  This is currently hard coded in
    -- 'Ouroboros.Network.BlockFetch.blockFetchLogic' (where
    -- @maxInFlightReqsPerPeer = 100@ is specified).  In the future the
    -- block fetch client will count bytes rather than blocks.  By far
    -- the largest (and the only pipelined message) in 'block-fetch'
    -- protocol is 'MsgBlock'.  Current block size limit is 88kiB and
    -- `blockFetchPipeliningMax` below is set to `100`.  This means that
    -- overall queue limit must be:
    --
    --   ```
        -- 100 * 88kiB = 8.8MiB
    --   ```
    --
    -- In the byron era this limit was set to `10 * 2MiB`, we keep the more
    -- relaxed limit here.
    --
    maximumIngressQueue :: Int
maximumIngressQueue = Int -> Int
addSafetyMargin (Int -> Int) -> Int -> Int
forall a b. (a -> b) -> a -> b
$
      Int -> Int -> Int
forall a. Ord a => a -> a -> a
max (Int
10 Int -> Int -> Int
forall a. Num a => a -> a -> a
* Int
2_097_154 :: Int) (Word16 -> Int
forall a b. (Integral a, Num b) => a -> b
fromIntegral Word16
blockFetchPipeliningMax Int -> Int -> Int
forall a. Num a => a -> a -> a
* Int
90_112)
  }

txSubmissionProtocolLimits :: MiniProtocolParameters -> MiniProtocolLimits
txSubmissionProtocolLimits MiniProtocolParameters { NumTxIdsToAck
txSubmissionMaxUnacked :: MiniProtocolParameters -> NumTxIdsToAck
txSubmissionMaxUnacked :: NumTxIdsToAck
txSubmissionMaxUnacked } = MiniProtocolLimits {
      -- tx-submission server can pipeline both 'MsgRequestTxIds' and
      -- 'MsgRequestTx'. This means that there can be many
      -- 'MsgReplyTxIds', 'MsgReplyTxs' messages in an inbound queue (their
      -- sizes are strictly greater than the corresponding request
      -- messages).
      --
      -- Each 'MsgRequestTx' can contain at max @maxTxIdsToRequest = 3@
      -- (defined in -- 'Ouroboros.Network.TxSubmission.Inbound.txSubmissionInbound')
      --
      -- Each 'MsgRequestTx' can request at max @maxTxToRequest = 2@
      -- (defined in -- 'Ouroboros.Network.TxSubmission.Inbound.txSubmissionInbound')
      --
      -- The 'txSubmissionInBound' server can at most put `100`
      -- unacknowledged transactions.  It also pipelines both 'MsgRequestTx`
      -- and `MsgRequestTx` in turn. This means that the inbound queue can
      -- have at most `100` `MsgRequestTxIds` and `MsgRequestTx` which will
      -- contain a single `TxId` / `Tx`.
      --
      -- TODO: the unacknowledged transactions are configured in `NodeArgs`,
      -- and we should take this parameter as an input for this computation.
      --
      -- The upper bound of size of a single transaction is 64k, while the
      -- size of `TxId` is `34` bytes (`type TxId = Hash Tx`).
      --
      -- Ingress side of `txSubmissinInbound`
      --
      -- - 'MsgReplyTxs' carrying a single `TxId`:
      -- ```
      --    1  -- encodeListLen 2
      --  + 1  -- encodeWord 1
      --  + 1  -- encodeListLenIndef
      --  + 1  -- encodeListLen 2
      --  + 34 -- encode 'TxId'
      --  + 5  -- encodeWord32 (size of tx)
      --  + 1  -- encodeBreak
      --  = 44
      -- ```
      -- - 'MsgReplyTx' carrying a single 'Tx':
      -- ```
      --    1      -- encodeListLen 2
      --  + 1      -- encodeWord 3
      --  + 1      -- encodeListLenIndef
      --  + 65_536 -- 64kiB transaction
      --  + 1      -- encodeBreak
      --  = 65_540
      -- ```
      --
      -- On the ingress side of 'txSubmissionOutbound' we can have at most
      -- `MaxUnacked' 'MsgRequestTxsIds' and the same amount of
      -- 'MsgRequsetTx' containing a single 'TxId'.  The size of
      -- 'MsgRequestTxsIds' is much smaller that 'MsgReplyTx', and the size
      -- of `MsgReqeustTx` with a single 'TxId' is smaller than
      -- 'MsgReplyTxIds' which contains a single 'TxId' (it just contains
      -- the 'TxId' without the size of 'Tx' in bytes).  So the ingress
      -- queue of 'txSubmissionOutbound' is bounded by the ingress side of
      -- the 'txSubmissionInbound'
      --
      -- Currently the value of 'txSubmissionMaxUnacked' is '100', for
      -- which the upper bound is `100 * (44 + 65_540) = 6_558_400`, we add
      -- 10% as a safety margin.
      --
      maximumIngressQueue :: Int
maximumIngressQueue = Int -> Int
addSafetyMargin (Int -> Int) -> Int -> Int
forall a b. (a -> b) -> a -> b
$
          NumTxIdsToAck -> Int
forall a b. (Integral a, Num b) => a -> b
fromIntegral NumTxIdsToAck
txSubmissionMaxUnacked Int -> Int -> Int
forall a. Num a => a -> a -> a
* (Int
44 Int -> Int -> Int
forall a. Num a => a -> a -> a
+ Int
65_540)
    }

keepAliveProtocolLimits :: MiniProtocolParameters -> MiniProtocolLimits
keepAliveProtocolLimits MiniProtocolParameters
_ =
  MiniProtocolLimits {
      -- One small outstanding message.
      maximumIngressQueue :: Int
maximumIngressQueue = Int -> Int
addSafetyMargin Int
1280
    }

peerSharingProtocolLimits :: MiniProtocolParameters -> MiniProtocolLimits
peerSharingProtocolLimits MiniProtocolParameters
_ =
  MiniProtocolLimits {
  -- This protocol does not need to be pipelined and a peer can only ask
  -- for a maximum of 255 peers each time. Hence a reply can have up to
  -- 255 IP (IPv4 or IPv6) addresses so 255 * 16 = 4080. TCP has an initial
  -- window size of 4 and a TCP segment is 1440, which gives us 4 * 1440 =
  -- 5760 bytes to fit into a single RTT. So setting the maximum ingress
  -- queue to be a single RTT should be enough to cover for CBOR overhead.
  maximumIngressQueue :: Int
maximumIngressQueue = Int
4 Int -> Int -> Int
forall a. Num a => a -> a -> a
* Int
1440
  }

chainSyncMiniProtocolNum :: MiniProtocolNum
chainSyncMiniProtocolNum :: MiniProtocolNum
chainSyncMiniProtocolNum = Word16 -> MiniProtocolNum
MiniProtocolNum Word16
2

blockFetchMiniProtocolNum :: MiniProtocolNum
blockFetchMiniProtocolNum :: MiniProtocolNum
blockFetchMiniProtocolNum = Word16 -> MiniProtocolNum
MiniProtocolNum Word16
3

txSubmissionMiniProtocolNum :: MiniProtocolNum
txSubmissionMiniProtocolNum :: MiniProtocolNum
txSubmissionMiniProtocolNum = Word16 -> MiniProtocolNum
MiniProtocolNum Word16
4

keepAliveMiniProtocolNum :: MiniProtocolNum
keepAliveMiniProtocolNum :: MiniProtocolNum
keepAliveMiniProtocolNum = Word16 -> MiniProtocolNum
MiniProtocolNum Word16
8

peerSharingMiniProtocolNum :: MiniProtocolNum
peerSharingMiniProtocolNum :: MiniProtocolNum
peerSharingMiniProtocolNum = Word16 -> MiniProtocolNum
MiniProtocolNum Word16
10

-- | A specialised version of @'Ouroboros.Network.Socket.connectToNode'@.
--
connectTo
  :: Snocket IO Socket.Socket Socket.SockAddr
  -> NetworkConnectTracers Socket.SockAddr NodeToNodeVersion
  -> Versions NodeToNodeVersion
              NodeToNodeVersionData
              (OuroborosApplicationWithMinimalCtx
                 Mx.InitiatorMode Socket.SockAddr BL.ByteString IO a b)
  -> Maybe Socket.SockAddr
  -> Socket.SockAddr
  -> IO (Either SomeException (Either a b))
connectTo :: forall a b.
Snocket IO Socket SockAddr
-> NetworkConnectTracers SockAddr NodeToNodeVersion
-> Versions
     NodeToNodeVersion
     NodeToNodeVersionData
     (OuroborosApplicationWithMinimalCtx
        'InitiatorMode SockAddr ByteString IO a b)
-> Maybe SockAddr
-> SockAddr
-> IO (Either SomeException (Either a b))
connectTo Snocket IO Socket SockAddr
sn NetworkConnectTracers SockAddr NodeToNodeVersion
tr =
    Snocket IO Socket SockAddr
-> MakeBearer IO Socket
-> ConnectToArgs
     Socket SockAddr NodeToNodeVersion NodeToNodeVersionData
-> (Socket -> IO ())
-> Versions
     NodeToNodeVersion
     NodeToNodeVersionData
     (OuroborosApplicationWithMinimalCtx
        'InitiatorMode SockAddr ByteString IO a b)
-> Maybe SockAddr
-> SockAddr
-> IO (Either SomeException (Either a b))
forall (muxMode :: Mode) vNumber vData fd addr a b.
(Ord vNumber, Typeable vNumber, Show vNumber,
 HasInitiator muxMode ~ 'True) =>
Snocket IO fd addr
-> MakeBearer IO fd
-> ConnectToArgs fd addr vNumber vData
-> (fd -> IO ())
-> Versions
     vNumber
     vData
     (OuroborosApplicationWithMinimalCtx muxMode addr ByteString IO a b)
-> Maybe addr
-> addr
-> IO (Either SomeException (Either a b))
connectToNode Snocket IO Socket SockAddr
sn MakeBearer IO Socket
makeSocketBearer
                  ConnectToArgs {
                    ctaHandshakeCodec :: Codec
  (Handshake NodeToNodeVersion Term) DeserialiseFailure IO ByteString
ctaHandshakeCodec      = Codec
  (Handshake NodeToNodeVersion Term) DeserialiseFailure IO ByteString
forall (m :: * -> *).
MonadST m =>
Codec
  (Handshake NodeToNodeVersion Term) DeserialiseFailure m ByteString
nodeToNodeHandshakeCodec,
                    ctaHandshakeTimeLimits :: ProtocolTimeLimits (Handshake NodeToNodeVersion Term)
ctaHandshakeTimeLimits = ProtocolTimeLimits (Handshake NodeToNodeVersion Term)
forall {k} (vNumber :: k).
ProtocolTimeLimits (Handshake vNumber Term)
timeLimitsHandshake,
                    ctaVersionDataCodec :: VersionDataCodec Term NodeToNodeVersion NodeToNodeVersionData
ctaVersionDataCodec    = (NodeToNodeVersion -> CodecCBORTerm Text NodeToNodeVersionData)
-> VersionDataCodec Term NodeToNodeVersion NodeToNodeVersionData
forall vNumber vData.
(vNumber -> CodecCBORTerm Text vData)
-> VersionDataCodec Term vNumber vData
cborTermVersionDataCodec NodeToNodeVersion -> CodecCBORTerm Text NodeToNodeVersionData
nodeToNodeCodecCBORTerm,
                    ctaConnectTracers :: NetworkConnectTracers SockAddr NodeToNodeVersion
ctaConnectTracers      = NetworkConnectTracers SockAddr NodeToNodeVersion
tr,
                    ctaHandshakeCallbacks :: HandshakeCallbacks NodeToNodeVersionData
ctaHandshakeCallbacks  = (NodeToNodeVersionData
 -> NodeToNodeVersionData -> Accept NodeToNodeVersionData)
-> (NodeToNodeVersionData -> Bool)
-> HandshakeCallbacks NodeToNodeVersionData
forall vData.
(vData -> vData -> Accept vData)
-> (vData -> Bool) -> HandshakeCallbacks vData
HandshakeCallbacks NodeToNodeVersionData
-> NodeToNodeVersionData -> Accept NodeToNodeVersionData
forall v. Acceptable v => v -> v -> Accept v
acceptableVersion NodeToNodeVersionData -> Bool
forall v. Queryable v => v -> Bool
queryVersion
                  }
                  Socket -> IO ()
configureOutboundSocket
  where
    configureOutboundSocket :: Socket -> IO ()
    configureOutboundSocket :: Socket -> IO ()
configureOutboundSocket Socket
sock = do
        Socket -> SocketOption -> Int -> IO ()
Socket.setSocketOption Socket
sock SocketOption
Socket.NoDelay Int
1
        Socket -> SocketOption -> StructLinger -> IO ()
forall a. Storable a => Socket -> SocketOption -> a -> IO ()
Socket.setSockOpt Socket
sock SocketOption
Socket.Linger
                              (StructLinger { sl_onoff :: CInt
sl_onoff  = CInt
1,
                                              sl_linger :: CInt
sl_linger = CInt
0 })


-- | A specialised version of @'Ouroboros.Network.Socket.withServerNode'@.
-- It forks a thread which runs an accept loop (server thread):
--
-- * when the server thread throws an exception the main thread rethrows
--   it (by 'Async.wait')
-- * when an async exception is thrown to kill the main thread the server thread
--   will be cancelled as well (by 'withAsync')
--
withServer
  :: SocketSnocket
  -> NetworkServerTracers Socket.SockAddr NodeToNodeVersion
  -> NetworkMutableState Socket.SockAddr
  -> AcceptedConnectionsLimit
  -> Socket.Socket
  -- ^ a configured socket to be used be the server.  The server will call
  -- `bind` and `listen` methods but it will not set any socket or tcp options
  -- on it.
  -> Versions NodeToNodeVersion
              NodeToNodeVersionData
              (OuroborosApplicationWithMinimalCtx
                 Mx.ResponderMode Socket.SockAddr BL.ByteString IO a b)
  -> ErrorPolicies
  -> IO Void
withServer :: forall a b.
Snocket IO Socket SockAddr
-> NetworkServerTracers SockAddr NodeToNodeVersion
-> NetworkMutableState SockAddr
-> AcceptedConnectionsLimit
-> Socket
-> Versions
     NodeToNodeVersion
     NodeToNodeVersionData
     (OuroborosApplicationWithMinimalCtx
        'ResponderMode SockAddr ByteString IO a b)
-> ErrorPolicies
-> IO IOManagerError
withServer Snocket IO Socket SockAddr
sn NetworkServerTracers SockAddr NodeToNodeVersion
tracers NetworkMutableState SockAddr
networkState AcceptedConnectionsLimit
acceptedConnectionsLimit Socket
sd Versions
  NodeToNodeVersion
  NodeToNodeVersionData
  (OuroborosApplicationWithMinimalCtx
     'ResponderMode SockAddr ByteString IO a b)
versions ErrorPolicies
errPolicies =
  Snocket IO Socket SockAddr
-> MakeBearer IO Socket
-> NetworkServerTracers SockAddr NodeToNodeVersion
-> NetworkMutableState SockAddr
-> AcceptedConnectionsLimit
-> Socket
-> Codec
     (Handshake NodeToNodeVersion Term) DeserialiseFailure IO ByteString
-> ProtocolTimeLimits (Handshake NodeToNodeVersion Term)
-> VersionDataCodec Term NodeToNodeVersion NodeToNodeVersionData
-> HandshakeCallbacks NodeToNodeVersionData
-> Versions
     NodeToNodeVersion
     NodeToNodeVersionData
     (SomeResponderApplication SockAddr ByteString IO b)
-> ErrorPolicies
-> (SockAddr -> Async IOManagerError -> IO IOManagerError)
-> IO IOManagerError
forall vNumber vData t fd addr b.
(Ord vNumber, Typeable vNumber, Show vNumber, Ord addr) =>
Snocket IO fd addr
-> MakeBearer IO fd
-> NetworkServerTracers addr vNumber
-> NetworkMutableState addr
-> AcceptedConnectionsLimit
-> fd
-> Codec (Handshake vNumber Term) DeserialiseFailure IO ByteString
-> ProtocolTimeLimits (Handshake vNumber Term)
-> VersionDataCodec Term vNumber vData
-> HandshakeCallbacks vData
-> Versions
     vNumber vData (SomeResponderApplication addr ByteString IO b)
-> ErrorPolicies
-> (addr -> Async IOManagerError -> IO t)
-> IO t
withServerNode'
    Snocket IO Socket SockAddr
sn
    MakeBearer IO Socket
makeSocketBearer
    NetworkServerTracers SockAddr NodeToNodeVersion
tracers
    NetworkMutableState SockAddr
networkState
    AcceptedConnectionsLimit
acceptedConnectionsLimit
    Socket
sd
    Codec
  (Handshake NodeToNodeVersion Term) DeserialiseFailure IO ByteString
forall (m :: * -> *).
MonadST m =>
Codec
  (Handshake NodeToNodeVersion Term) DeserialiseFailure m ByteString
nodeToNodeHandshakeCodec
    ProtocolTimeLimits (Handshake NodeToNodeVersion Term)
forall {k} (vNumber :: k).
ProtocolTimeLimits (Handshake vNumber Term)
timeLimitsHandshake
    ((NodeToNodeVersion -> CodecCBORTerm Text NodeToNodeVersionData)
-> VersionDataCodec Term NodeToNodeVersion NodeToNodeVersionData
forall vNumber vData.
(vNumber -> CodecCBORTerm Text vData)
-> VersionDataCodec Term vNumber vData
cborTermVersionDataCodec NodeToNodeVersion -> CodecCBORTerm Text NodeToNodeVersionData
nodeToNodeCodecCBORTerm)
    ((NodeToNodeVersionData
 -> NodeToNodeVersionData -> Accept NodeToNodeVersionData)
-> (NodeToNodeVersionData -> Bool)
-> HandshakeCallbacks NodeToNodeVersionData
forall vData.
(vData -> vData -> Accept vData)
-> (vData -> Bool) -> HandshakeCallbacks vData
HandshakeCallbacks NodeToNodeVersionData
-> NodeToNodeVersionData -> Accept NodeToNodeVersionData
forall v. Acceptable v => v -> v -> Accept v
acceptableVersion NodeToNodeVersionData -> Bool
forall v. Queryable v => v -> Bool
queryVersion)
    (OuroborosApplicationWithMinimalCtx
  'ResponderMode SockAddr ByteString IO a b
-> SomeResponderApplication SockAddr ByteString IO b
forall (muxMode :: Mode) addr bytes (m :: * -> *) a b.
(HasResponder muxMode ~ 'True) =>
OuroborosApplicationWithMinimalCtx muxMode addr bytes m a b
-> SomeResponderApplication addr bytes m b
SomeResponderApplication (OuroborosApplicationWithMinimalCtx
   'ResponderMode SockAddr ByteString IO a b
 -> SomeResponderApplication SockAddr ByteString IO b)
-> Versions
     NodeToNodeVersion
     NodeToNodeVersionData
     (OuroborosApplicationWithMinimalCtx
        'ResponderMode SockAddr ByteString IO a b)
-> Versions
     NodeToNodeVersion
     NodeToNodeVersionData
     (SomeResponderApplication SockAddr ByteString IO b)
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> Versions
  NodeToNodeVersion
  NodeToNodeVersionData
  (OuroborosApplicationWithMinimalCtx
     'ResponderMode SockAddr ByteString IO a b)
versions)
    ErrorPolicies
errPolicies
    (\SockAddr
_ Async IOManagerError
async -> Async IOManagerError -> IO IOManagerError
forall a. Async a -> IO a
Async.wait Async IOManagerError
async)


-- | 'ipSubscriptionWorker' which starts given application versions on each
-- established connection.
--
ipSubscriptionWorker
    :: forall mode x y.
       ( HasInitiator mode ~ True )
    => SocketSnocket
    -> NetworkIPSubscriptionTracers Socket.SockAddr NodeToNodeVersion
    -> NetworkMutableState Socket.SockAddr
    -> IPSubscriptionParams ()
    -> Versions
        NodeToNodeVersion
        NodeToNodeVersionData
        (OuroborosApplicationWithMinimalCtx
           mode Socket.SockAddr BL.ByteString IO x y)
    -> IO Void
ipSubscriptionWorker :: forall (mode :: Mode) x y.
(HasInitiator mode ~ 'True) =>
Snocket IO Socket SockAddr
-> NetworkIPSubscriptionTracers SockAddr NodeToNodeVersion
-> NetworkMutableState SockAddr
-> IPSubscriptionParams ()
-> Versions
     NodeToNodeVersion
     NodeToNodeVersionData
     (OuroborosApplicationWithMinimalCtx
        mode SockAddr ByteString IO x y)
-> IO IOManagerError
ipSubscriptionWorker
  Snocket IO Socket SockAddr
sn
  NetworkSubscriptionTracers
    { Tracer IO (WithIPList (SubscriptionTrace SockAddr))
nsSubscriptionTracer :: Tracer IO (WithIPList (SubscriptionTrace SockAddr))
nsSubscriptionTracer :: forall (withIPList :: * -> *) addr vNumber.
NetworkSubscriptionTracers withIPList addr vNumber
-> Tracer IO (withIPList (SubscriptionTrace addr))
nsSubscriptionTracer
    , Tracer IO (WithBearer (ConnectionId SockAddr) Trace)
nsMuxTracer :: Tracer IO (WithBearer (ConnectionId SockAddr) Trace)
nsMuxTracer :: forall (withIPList :: * -> *) addr vNumber.
NetworkSubscriptionTracers withIPList addr vNumber
-> Tracer IO (WithBearer (ConnectionId addr) Trace)
nsMuxTracer
    , Tracer
  IO
  (WithBearer
     (ConnectionId SockAddr)
     (TraceSendRecv (Handshake NodeToNodeVersion Term)))
nsHandshakeTracer :: Tracer
  IO
  (WithBearer
     (ConnectionId SockAddr)
     (TraceSendRecv (Handshake NodeToNodeVersion Term)))
nsHandshakeTracer :: forall (withIPList :: * -> *) addr vNumber.
NetworkSubscriptionTracers withIPList addr vNumber
-> Tracer
     IO
     (WithBearer
        (ConnectionId addr) (TraceSendRecv (Handshake vNumber Term)))
nsHandshakeTracer
    , Tracer IO (WithAddr SockAddr ErrorPolicyTrace)
nsErrorPolicyTracer :: Tracer IO (WithAddr SockAddr ErrorPolicyTrace)
nsErrorPolicyTracer :: forall (withIPList :: * -> *) addr vNumber.
NetworkSubscriptionTracers withIPList addr vNumber
-> Tracer IO (WithAddr addr ErrorPolicyTrace)
nsErrorPolicyTracer
    }
  NetworkMutableState SockAddr
networkState
  IPSubscriptionParams ()
subscriptionParams
  Versions
  NodeToNodeVersion
  NodeToNodeVersionData
  (OuroborosApplicationWithMinimalCtx
     mode SockAddr ByteString IO x y)
versions
    = Snocket IO Socket SockAddr
-> Tracer IO (WithIPList (SubscriptionTrace SockAddr))
-> Tracer IO (WithAddr SockAddr ErrorPolicyTrace)
-> NetworkMutableState SockAddr
-> IPSubscriptionParams ()
-> (Socket -> IO ())
-> IO IOManagerError
forall a.
Snocket IO Socket SockAddr
-> Tracer IO (WithIPList (SubscriptionTrace SockAddr))
-> Tracer IO (WithAddr SockAddr ErrorPolicyTrace)
-> NetworkMutableState SockAddr
-> IPSubscriptionParams a
-> (Socket -> IO a)
-> IO IOManagerError
Subscription.ipSubscriptionWorker
        Snocket IO Socket SockAddr
sn
        Tracer IO (WithIPList (SubscriptionTrace SockAddr))
nsSubscriptionTracer
        Tracer IO (WithAddr SockAddr ErrorPolicyTrace)
nsErrorPolicyTracer
        NetworkMutableState SockAddr
networkState
        IPSubscriptionParams ()
subscriptionParams
        (IO (Either SomeException (Either x y)) -> IO ()
forall (f :: * -> *) a. Functor f => f a -> f ()
void (IO (Either SomeException (Either x y)) -> IO ())
-> (Socket -> IO (Either SomeException (Either x y)))
-> Socket
-> IO ()
forall b c a. (b -> c) -> (a -> b) -> a -> c
. Snocket IO Socket SockAddr
-> MakeBearer IO Socket
-> ConnectToArgs
     Socket SockAddr NodeToNodeVersion NodeToNodeVersionData
-> Versions
     NodeToNodeVersion
     NodeToNodeVersionData
     (OuroborosApplicationWithMinimalCtx
        mode SockAddr ByteString IO x y)
-> Socket
-> IO (Either SomeException (Either x y))
forall (muxMode :: Mode) vNumber vData fd addr a b.
(Ord vNumber, Typeable vNumber, Show vNumber,
 HasInitiator muxMode ~ 'True) =>
Snocket IO fd addr
-> MakeBearer IO fd
-> ConnectToArgs fd addr vNumber vData
-> Versions
     vNumber
     vData
     (OuroborosApplicationWithMinimalCtx muxMode addr ByteString IO a b)
-> fd
-> IO (Either SomeException (Either a b))
connectToNode'
          Snocket IO Socket SockAddr
sn
          MakeBearer IO Socket
makeSocketBearer
          ConnectToArgs {
            ctaHandshakeCodec :: Codec
  (Handshake NodeToNodeVersion Term) DeserialiseFailure IO ByteString
ctaHandshakeCodec      = Codec
  (Handshake NodeToNodeVersion Term) DeserialiseFailure IO ByteString
forall (m :: * -> *).
MonadST m =>
Codec
  (Handshake NodeToNodeVersion Term) DeserialiseFailure m ByteString
nodeToNodeHandshakeCodec,
            ctaHandshakeTimeLimits :: ProtocolTimeLimits (Handshake NodeToNodeVersion Term)
ctaHandshakeTimeLimits = ProtocolTimeLimits (Handshake NodeToNodeVersion Term)
forall {k} (vNumber :: k).
ProtocolTimeLimits (Handshake vNumber Term)
timeLimitsHandshake,
            ctaVersionDataCodec :: VersionDataCodec Term NodeToNodeVersion NodeToNodeVersionData
ctaVersionDataCodec    = (NodeToNodeVersion -> CodecCBORTerm Text NodeToNodeVersionData)
-> VersionDataCodec Term NodeToNodeVersion NodeToNodeVersionData
forall vNumber vData.
(vNumber -> CodecCBORTerm Text vData)
-> VersionDataCodec Term vNumber vData
cborTermVersionDataCodec NodeToNodeVersion -> CodecCBORTerm Text NodeToNodeVersionData
nodeToNodeCodecCBORTerm,
            ctaConnectTracers :: NetworkConnectTracers SockAddr NodeToNodeVersion
ctaConnectTracers      = Tracer IO (WithBearer (ConnectionId SockAddr) Trace)
-> Tracer
     IO
     (WithBearer
        (ConnectionId SockAddr)
        (TraceSendRecv (Handshake NodeToNodeVersion Term)))
-> NetworkConnectTracers SockAddr NodeToNodeVersion
forall addr vNumber.
Tracer IO (WithBearer (ConnectionId addr) Trace)
-> Tracer
     IO
     (WithBearer
        (ConnectionId addr) (TraceSendRecv (Handshake vNumber Term)))
-> NetworkConnectTracers addr vNumber
NetworkConnectTracers Tracer IO (WithBearer (ConnectionId SockAddr) Trace)
nsMuxTracer Tracer
  IO
  (WithBearer
     (ConnectionId SockAddr)
     (TraceSendRecv (Handshake NodeToNodeVersion Term)))
nsHandshakeTracer,
            ctaHandshakeCallbacks :: HandshakeCallbacks NodeToNodeVersionData
ctaHandshakeCallbacks  = (NodeToNodeVersionData
 -> NodeToNodeVersionData -> Accept NodeToNodeVersionData)
-> (NodeToNodeVersionData -> Bool)
-> HandshakeCallbacks NodeToNodeVersionData
forall vData.
(vData -> vData -> Accept vData)
-> (vData -> Bool) -> HandshakeCallbacks vData
HandshakeCallbacks NodeToNodeVersionData
-> NodeToNodeVersionData -> Accept NodeToNodeVersionData
forall v. Acceptable v => v -> v -> Accept v
acceptableVersion NodeToNodeVersionData -> Bool
forall v. Queryable v => v -> Bool
queryVersion
          }
          Versions
  NodeToNodeVersion
  NodeToNodeVersionData
  (OuroborosApplicationWithMinimalCtx
     mode SockAddr ByteString IO x y)
versions)


-- | 'dnsSubscriptionWorker' which starts given application versions on each
-- established connection.
--
dnsSubscriptionWorker
    :: forall mode x y.
       ( HasInitiator mode ~ True )
    => SocketSnocket
    -> NetworkDNSSubscriptionTracers NodeToNodeVersion Socket.SockAddr
    -> NetworkMutableState Socket.SockAddr
    -> DnsSubscriptionParams ()
    -> Versions
        NodeToNodeVersion
        NodeToNodeVersionData
        (OuroborosApplicationWithMinimalCtx
           mode Socket.SockAddr BL.ByteString IO x y)
    -> IO Void
dnsSubscriptionWorker :: forall (mode :: Mode) x y.
(HasInitiator mode ~ 'True) =>
Snocket IO Socket SockAddr
-> NetworkDNSSubscriptionTracers NodeToNodeVersion SockAddr
-> NetworkMutableState SockAddr
-> DnsSubscriptionParams ()
-> Versions
     NodeToNodeVersion
     NodeToNodeVersionData
     (OuroborosApplicationWithMinimalCtx
        mode SockAddr ByteString IO x y)
-> IO IOManagerError
dnsSubscriptionWorker
  Snocket IO Socket SockAddr
sn
  NetworkDNSSubscriptionTracers
    { Tracer IO (WithDomainName (SubscriptionTrace SockAddr))
ndstSubscriptionTracer :: Tracer IO (WithDomainName (SubscriptionTrace SockAddr))
ndstSubscriptionTracer :: forall vNumber addr.
NetworkDNSSubscriptionTracers vNumber addr
-> Tracer IO (WithDomainName (SubscriptionTrace addr))
ndstSubscriptionTracer
    , Tracer IO (WithDomainName DnsTrace)
ndstDnsTracer :: Tracer IO (WithDomainName DnsTrace)
ndstDnsTracer :: forall vNumber addr.
NetworkDNSSubscriptionTracers vNumber addr
-> Tracer IO (WithDomainName DnsTrace)
ndstDnsTracer
    , Tracer IO (WithBearer (ConnectionId SockAddr) Trace)
ndstMuxTracer :: Tracer IO (WithBearer (ConnectionId SockAddr) Trace)
ndstMuxTracer :: forall vNumber addr.
NetworkDNSSubscriptionTracers vNumber addr
-> Tracer IO (WithBearer (ConnectionId addr) Trace)
ndstMuxTracer
    , Tracer
  IO
  (WithBearer
     (ConnectionId SockAddr)
     (TraceSendRecv (Handshake NodeToNodeVersion Term)))
ndstHandshakeTracer :: Tracer
  IO
  (WithBearer
     (ConnectionId SockAddr)
     (TraceSendRecv (Handshake NodeToNodeVersion Term)))
ndstHandshakeTracer :: forall vNumber addr.
NetworkDNSSubscriptionTracers vNumber addr
-> Tracer
     IO
     (WithBearer
        (ConnectionId addr) (TraceSendRecv (Handshake vNumber Term)))
ndstHandshakeTracer
    , Tracer IO (WithAddr SockAddr ErrorPolicyTrace)
ndstErrorPolicyTracer :: Tracer IO (WithAddr SockAddr ErrorPolicyTrace)
ndstErrorPolicyTracer :: forall vNumber addr.
NetworkDNSSubscriptionTracers vNumber addr
-> Tracer IO (WithAddr addr ErrorPolicyTrace)
ndstErrorPolicyTracer
    }
  NetworkMutableState SockAddr
networkState
  DnsSubscriptionParams ()
subscriptionParams
  Versions
  NodeToNodeVersion
  NodeToNodeVersionData
  (OuroborosApplicationWithMinimalCtx
     mode SockAddr ByteString IO x y)
versions =
    Snocket IO Socket SockAddr
-> Tracer IO (WithDomainName (SubscriptionTrace SockAddr))
-> Tracer IO (WithDomainName DnsTrace)
-> Tracer IO (WithAddr SockAddr ErrorPolicyTrace)
-> NetworkMutableState SockAddr
-> DnsSubscriptionParams ()
-> (Socket -> IO ())
-> IO IOManagerError
forall a.
Snocket IO Socket SockAddr
-> Tracer IO (WithDomainName (SubscriptionTrace SockAddr))
-> Tracer IO (WithDomainName DnsTrace)
-> Tracer IO (WithAddr SockAddr ErrorPolicyTrace)
-> NetworkMutableState SockAddr
-> DnsSubscriptionParams a
-> (Socket -> IO a)
-> IO IOManagerError
Subscription.dnsSubscriptionWorker
      Snocket IO Socket SockAddr
sn
      Tracer IO (WithDomainName (SubscriptionTrace SockAddr))
ndstSubscriptionTracer
      Tracer IO (WithDomainName DnsTrace)
ndstDnsTracer
      Tracer IO (WithAddr SockAddr ErrorPolicyTrace)
ndstErrorPolicyTracer
      NetworkMutableState SockAddr
networkState
      DnsSubscriptionParams ()
subscriptionParams
      (IO (Either SomeException (Either x y)) -> IO ()
forall (f :: * -> *) a. Functor f => f a -> f ()
void (IO (Either SomeException (Either x y)) -> IO ())
-> (Socket -> IO (Either SomeException (Either x y)))
-> Socket
-> IO ()
forall b c a. (b -> c) -> (a -> b) -> a -> c
. Snocket IO Socket SockAddr
-> MakeBearer IO Socket
-> ConnectToArgs
     Socket SockAddr NodeToNodeVersion NodeToNodeVersionData
-> Versions
     NodeToNodeVersion
     NodeToNodeVersionData
     (OuroborosApplicationWithMinimalCtx
        mode SockAddr ByteString IO x y)
-> Socket
-> IO (Either SomeException (Either x y))
forall (muxMode :: Mode) vNumber vData fd addr a b.
(Ord vNumber, Typeable vNumber, Show vNumber,
 HasInitiator muxMode ~ 'True) =>
Snocket IO fd addr
-> MakeBearer IO fd
-> ConnectToArgs fd addr vNumber vData
-> Versions
     vNumber
     vData
     (OuroborosApplicationWithMinimalCtx muxMode addr ByteString IO a b)
-> fd
-> IO (Either SomeException (Either a b))
connectToNode'
        Snocket IO Socket SockAddr
sn
        MakeBearer IO Socket
makeSocketBearer
        ConnectToArgs {
          ctaHandshakeCodec :: Codec
  (Handshake NodeToNodeVersion Term) DeserialiseFailure IO ByteString
ctaHandshakeCodec      = Codec
  (Handshake NodeToNodeVersion Term) DeserialiseFailure IO ByteString
forall (m :: * -> *).
MonadST m =>
Codec
  (Handshake NodeToNodeVersion Term) DeserialiseFailure m ByteString
nodeToNodeHandshakeCodec,
          ctaHandshakeTimeLimits :: ProtocolTimeLimits (Handshake NodeToNodeVersion Term)
ctaHandshakeTimeLimits = ProtocolTimeLimits (Handshake NodeToNodeVersion Term)
forall {k} (vNumber :: k).
ProtocolTimeLimits (Handshake vNumber Term)
timeLimitsHandshake,
          ctaVersionDataCodec :: VersionDataCodec Term NodeToNodeVersion NodeToNodeVersionData
ctaVersionDataCodec    = (NodeToNodeVersion -> CodecCBORTerm Text NodeToNodeVersionData)
-> VersionDataCodec Term NodeToNodeVersion NodeToNodeVersionData
forall vNumber vData.
(vNumber -> CodecCBORTerm Text vData)
-> VersionDataCodec Term vNumber vData
cborTermVersionDataCodec NodeToNodeVersion -> CodecCBORTerm Text NodeToNodeVersionData
nodeToNodeCodecCBORTerm,
          ctaConnectTracers :: NetworkConnectTracers SockAddr NodeToNodeVersion
ctaConnectTracers      = Tracer IO (WithBearer (ConnectionId SockAddr) Trace)
-> Tracer
     IO
     (WithBearer
        (ConnectionId SockAddr)
        (TraceSendRecv (Handshake NodeToNodeVersion Term)))
-> NetworkConnectTracers SockAddr NodeToNodeVersion
forall addr vNumber.
Tracer IO (WithBearer (ConnectionId addr) Trace)
-> Tracer
     IO
     (WithBearer
        (ConnectionId addr) (TraceSendRecv (Handshake vNumber Term)))
-> NetworkConnectTracers addr vNumber
NetworkConnectTracers Tracer IO (WithBearer (ConnectionId SockAddr) Trace)
ndstMuxTracer Tracer
  IO
  (WithBearer
     (ConnectionId SockAddr)
     (TraceSendRecv (Handshake NodeToNodeVersion Term)))
ndstHandshakeTracer,
          ctaHandshakeCallbacks :: HandshakeCallbacks NodeToNodeVersionData
ctaHandshakeCallbacks  = (NodeToNodeVersionData
 -> NodeToNodeVersionData -> Accept NodeToNodeVersionData)
-> (NodeToNodeVersionData -> Bool)
-> HandshakeCallbacks NodeToNodeVersionData
forall vData.
(vData -> vData -> Accept vData)
-> (vData -> Bool) -> HandshakeCallbacks vData
HandshakeCallbacks NodeToNodeVersionData
-> NodeToNodeVersionData -> Accept NodeToNodeVersionData
forall v. Acceptable v => v -> v -> Accept v
acceptableVersion NodeToNodeVersionData -> Bool
forall v. Queryable v => v -> Bool
queryVersion
        }
        Versions
  NodeToNodeVersion
  NodeToNodeVersionData
  (OuroborosApplicationWithMinimalCtx
     mode SockAddr ByteString IO x y)
versions)


-- | A minimal error policy for remote peers, which only handles exceptions
-- raised by `ouroboros-network`.
--
remoteNetworkErrorPolicy :: ErrorPolicies
remoteNetworkErrorPolicy :: ErrorPolicies
remoteNetworkErrorPolicy = ErrorPolicies {
      epAppErrorPolicies :: [ErrorPolicy]
epAppErrorPolicies = [
          -- Handshake client protocol error: we either did not recognise received
          -- version or we refused it.  This is only for outbound connections,
          -- thus we suspend the consumer.
          (HandshakeProtocolError NodeToNodeVersion
 -> Maybe (SuspendDecision DiffTime))
-> ErrorPolicy
forall e.
Exception e =>
(e -> Maybe (SuspendDecision DiffTime)) -> ErrorPolicy
ErrorPolicy
            ((HandshakeProtocolError NodeToNodeVersion
  -> Maybe (SuspendDecision DiffTime))
 -> ErrorPolicy)
-> (HandshakeProtocolError NodeToNodeVersion
    -> Maybe (SuspendDecision DiffTime))
-> ErrorPolicy
forall a b. (a -> b) -> a -> b
$ \(HandshakeProtocolError NodeToNodeVersion
_ :: HandshakeProtocolError NodeToNodeVersion)
                  -> SuspendDecision DiffTime -> Maybe (SuspendDecision DiffTime)
forall a. a -> Maybe a
Just SuspendDecision DiffTime
misconfiguredPeer

          -- deserialisation failure; this means that the remote peer is either
          -- buggy, adversarial, or the connection return garbage.  In the last
          -- case it's also good to shutdown both the consumer and the
          -- producer, as it's likely that the other side of the connection
          -- will return garbage as well.
        , (DecoderFailure -> Maybe (SuspendDecision DiffTime)) -> ErrorPolicy
forall e.
Exception e =>
(e -> Maybe (SuspendDecision DiffTime)) -> ErrorPolicy
ErrorPolicy
           ((DecoderFailure -> Maybe (SuspendDecision DiffTime))
 -> ErrorPolicy)
-> (DecoderFailure -> Maybe (SuspendDecision DiffTime))
-> ErrorPolicy
forall a b. (a -> b) -> a -> b
$ \(DecoderFailure
_ :: DecoderFailure)
                 -> SuspendDecision DiffTime -> Maybe (SuspendDecision DiffTime)
forall a. a -> Maybe a
Just SuspendDecision DiffTime
theyBuggyOrEvil

        , (ProtocolLimitFailure -> Maybe (SuspendDecision DiffTime))
-> ErrorPolicy
forall e.
Exception e =>
(e -> Maybe (SuspendDecision DiffTime)) -> ErrorPolicy
ErrorPolicy
          ((ProtocolLimitFailure -> Maybe (SuspendDecision DiffTime))
 -> ErrorPolicy)
-> (ProtocolLimitFailure -> Maybe (SuspendDecision DiffTime))
-> ErrorPolicy
forall a b. (a -> b) -> a -> b
$ \(ProtocolLimitFailure
msg :: ProtocolLimitFailure)
                  -> case ProtocolLimitFailure
msg of
                      ExceededSizeLimit{} -> SuspendDecision DiffTime -> Maybe (SuspendDecision DiffTime)
forall a. a -> Maybe a
Just SuspendDecision DiffTime
theyBuggyOrEvil
                      ExceededTimeLimit{} -> SuspendDecision DiffTime -> Maybe (SuspendDecision DiffTime)
forall a. a -> Maybe a
Just (DiffTime -> SuspendDecision DiffTime
forall t. t -> SuspendDecision t
SuspendConsumer DiffTime
shortDelay)

          -- the connection was unexpectedly closed, we suspend the peer for
          -- a 'shortDelay'
        , (Error -> Maybe (SuspendDecision DiffTime)) -> ErrorPolicy
forall e.
Exception e =>
(e -> Maybe (SuspendDecision DiffTime)) -> ErrorPolicy
ErrorPolicy
            ((Error -> Maybe (SuspendDecision DiffTime)) -> ErrorPolicy)
-> (Error -> Maybe (SuspendDecision DiffTime)) -> ErrorPolicy
forall a b. (a -> b) -> a -> b
$ \Error
e -> case Error
e of
              Mx.UnknownMiniProtocol {} -> SuspendDecision DiffTime -> Maybe (SuspendDecision DiffTime)
forall a. a -> Maybe a
Just SuspendDecision DiffTime
theyBuggyOrEvil
              Mx.IngressQueueOverRun {} -> SuspendDecision DiffTime -> Maybe (SuspendDecision DiffTime)
forall a. a -> Maybe a
Just SuspendDecision DiffTime
theyBuggyOrEvil
              Mx.InitiatorOnly {}       -> SuspendDecision DiffTime -> Maybe (SuspendDecision DiffTime)
forall a. a -> Maybe a
Just SuspendDecision DiffTime
theyBuggyOrEvil

              -- in case of bearer closed / or IOException we suspend
              -- the peer for a short time
              --
              -- TODO: an exponential backoff would be nicer than a fixed 20s
              -- TODO: right now we cannot suspend just the
              -- 'responder'.  If a 'responder' throws 'MuxError' we
              -- might not want to shutdown the consumer (which is
              -- using different connection), as we do below:
              Mx.BearerClosed {}           -> SuspendDecision DiffTime -> Maybe (SuspendDecision DiffTime)
forall a. a -> Maybe a
Just (DiffTime -> DiffTime -> SuspendDecision DiffTime
forall t. t -> t -> SuspendDecision t
SuspendPeer DiffTime
veryShortDelay DiffTime
shortDelay)
              Mx.IOException {}            -> SuspendDecision DiffTime -> Maybe (SuspendDecision DiffTime)
forall a. a -> Maybe a
Just (DiffTime -> DiffTime -> SuspendDecision DiffTime
forall t. t -> t -> SuspendDecision t
SuspendPeer DiffTime
veryShortDelay DiffTime
shortDelay)
              Mx.SDUDecodeError {}         -> SuspendDecision DiffTime -> Maybe (SuspendDecision DiffTime)
forall a. a -> Maybe a
Just SuspendDecision DiffTime
theyBuggyOrEvil
              Error
Mx.SDUReadTimeout            -> SuspendDecision DiffTime -> Maybe (SuspendDecision DiffTime)
forall a. a -> Maybe a
Just (DiffTime -> DiffTime -> SuspendDecision DiffTime
forall t. t -> t -> SuspendDecision t
SuspendPeer DiffTime
veryShortDelay DiffTime
shortDelay)
              Error
Mx.SDUWriteTimeout           -> SuspendDecision DiffTime -> Maybe (SuspendDecision DiffTime)
forall a. a -> Maybe a
Just (DiffTime -> DiffTime -> SuspendDecision DiffTime
forall t. t -> t -> SuspendDecision t
SuspendPeer DiffTime
veryShortDelay DiffTime
shortDelay)
              Mx.Shutdown {}               -> SuspendDecision DiffTime -> Maybe (SuspendDecision DiffTime)
forall a. a -> Maybe a
Just (DiffTime -> DiffTime -> SuspendDecision DiffTime
forall t. t -> t -> SuspendDecision t
SuspendPeer DiffTime
veryShortDelay DiffTime
shortDelay)

        , (RuntimeError -> Maybe (SuspendDecision DiffTime)) -> ErrorPolicy
forall e.
Exception e =>
(e -> Maybe (SuspendDecision DiffTime)) -> ErrorPolicy
ErrorPolicy
            ((RuntimeError -> Maybe (SuspendDecision DiffTime)) -> ErrorPolicy)
-> (RuntimeError -> Maybe (SuspendDecision DiffTime))
-> ErrorPolicy
forall a b. (a -> b) -> a -> b
$ \(RuntimeError
e :: Mx.RuntimeError)
                  -> case RuntimeError
e of
                       Mx.ProtocolAlreadyRunning       {} -> SuspendDecision DiffTime -> Maybe (SuspendDecision DiffTime)
forall a. a -> Maybe a
Just (DiffTime -> DiffTime -> SuspendDecision DiffTime
forall t. t -> t -> SuspendDecision t
SuspendPeer DiffTime
shortDelay DiffTime
shortDelay)
                       Mx.UnknownProtocolInternalError {} -> SuspendDecision DiffTime -> Maybe (SuspendDecision DiffTime)
forall a. a -> Maybe a
Just SuspendDecision DiffTime
forall t. SuspendDecision t
Throw
                       Mx.BlockedOnCompletionVar       {} -> SuspendDecision DiffTime -> Maybe (SuspendDecision DiffTime)
forall a. a -> Maybe a
Just (DiffTime -> DiffTime -> SuspendDecision DiffTime
forall t. t -> t -> SuspendDecision t
SuspendPeer DiffTime
shortDelay DiffTime
shortDelay)

          -- Error policy for TxSubmission protocol: outbound side (client role)
        , (TxSubmissionProtocolError -> Maybe (SuspendDecision DiffTime))
-> ErrorPolicy
forall e.
Exception e =>
(e -> Maybe (SuspendDecision DiffTime)) -> ErrorPolicy
ErrorPolicy
            ((TxSubmissionProtocolError -> Maybe (SuspendDecision DiffTime))
 -> ErrorPolicy)
-> (TxSubmissionProtocolError -> Maybe (SuspendDecision DiffTime))
-> ErrorPolicy
forall a b. (a -> b) -> a -> b
$ \(TxSubmissionProtocolError
_ :: TxOutbound.TxSubmissionProtocolError)
                  -> SuspendDecision DiffTime -> Maybe (SuspendDecision DiffTime)
forall a. a -> Maybe a
Just SuspendDecision DiffTime
theyBuggyOrEvil

          -- Error policy for TxSubmission protocol: inbound side (server role)
        , (TxSubmissionProtocolError -> Maybe (SuspendDecision DiffTime))
-> ErrorPolicy
forall e.
Exception e =>
(e -> Maybe (SuspendDecision DiffTime)) -> ErrorPolicy
ErrorPolicy
            ((TxSubmissionProtocolError -> Maybe (SuspendDecision DiffTime))
 -> ErrorPolicy)
-> (TxSubmissionProtocolError -> Maybe (SuspendDecision DiffTime))
-> ErrorPolicy
forall a b. (a -> b) -> a -> b
$ \(TxSubmissionProtocolError
_ :: TxInbound.TxSubmissionProtocolError)
                  -> SuspendDecision DiffTime -> Maybe (SuspendDecision DiffTime)
forall a. a -> Maybe a
Just SuspendDecision DiffTime
theyBuggyOrEvil

          -- Error policy for BlockFetch protocol: consumer side (client role)
        , (BlockFetchProtocolFailure -> Maybe (SuspendDecision DiffTime))
-> ErrorPolicy
forall e.
Exception e =>
(e -> Maybe (SuspendDecision DiffTime)) -> ErrorPolicy
ErrorPolicy
            ((BlockFetchProtocolFailure -> Maybe (SuspendDecision DiffTime))
 -> ErrorPolicy)
-> (BlockFetchProtocolFailure -> Maybe (SuspendDecision DiffTime))
-> ErrorPolicy
forall a b. (a -> b) -> a -> b
$ \(BlockFetchProtocolFailure
_ :: BlockFetchProtocolFailure)
                  -> SuspendDecision DiffTime -> Maybe (SuspendDecision DiffTime)
forall a. a -> Maybe a
Just SuspendDecision DiffTime
theyBuggyOrEvil

          -- Error thrown by 'IOManager', this is fatal on Windows, and it will
          -- never fire on other platforms.
        , (IOManagerError -> Maybe (SuspendDecision DiffTime)) -> ErrorPolicy
forall e.
Exception e =>
(e -> Maybe (SuspendDecision DiffTime)) -> ErrorPolicy
ErrorPolicy
            ((IOManagerError -> Maybe (SuspendDecision DiffTime))
 -> ErrorPolicy)
-> (IOManagerError -> Maybe (SuspendDecision DiffTime))
-> ErrorPolicy
forall a b. (a -> b) -> a -> b
$ \(IOManagerError
_ :: IOManagerError)
                  -> SuspendDecision DiffTime -> Maybe (SuspendDecision DiffTime)
forall a. a -> Maybe a
Just SuspendDecision DiffTime
forall t. SuspendDecision t
Throw
        ],

      -- Exception raised during connect; suspend connecting to that peer for
      -- a 'shortDelay'
      epConErrorPolicies :: [ErrorPolicy]
epConErrorPolicies = [
          (IOException -> Maybe (SuspendDecision DiffTime)) -> ErrorPolicy
forall e.
Exception e =>
(e -> Maybe (SuspendDecision DiffTime)) -> ErrorPolicy
ErrorPolicy ((IOException -> Maybe (SuspendDecision DiffTime)) -> ErrorPolicy)
-> (IOException -> Maybe (SuspendDecision DiffTime)) -> ErrorPolicy
forall a b. (a -> b) -> a -> b
$ \(IOException
_ :: IOException) -> SuspendDecision DiffTime -> Maybe (SuspendDecision DiffTime)
forall a. a -> Maybe a
Just (SuspendDecision DiffTime -> Maybe (SuspendDecision DiffTime))
-> SuspendDecision DiffTime -> Maybe (SuspendDecision DiffTime)
forall a b. (a -> b) -> a -> b
$
            DiffTime -> SuspendDecision DiffTime
forall t. t -> SuspendDecision t
SuspendConsumer DiffTime
shortDelay

        , (IOManagerError -> Maybe (SuspendDecision DiffTime)) -> ErrorPolicy
forall e.
Exception e =>
(e -> Maybe (SuspendDecision DiffTime)) -> ErrorPolicy
ErrorPolicy
            ((IOManagerError -> Maybe (SuspendDecision DiffTime))
 -> ErrorPolicy)
-> (IOManagerError -> Maybe (SuspendDecision DiffTime))
-> ErrorPolicy
forall a b. (a -> b) -> a -> b
$ \(IOManagerError
_ :: IOManagerError)
                  -> SuspendDecision DiffTime -> Maybe (SuspendDecision DiffTime)
forall a. a -> Maybe a
Just SuspendDecision DiffTime
forall t. SuspendDecision t
Throw
        , (SubscriberError -> Maybe (SuspendDecision DiffTime))
-> ErrorPolicy
forall e.
Exception e =>
(e -> Maybe (SuspendDecision DiffTime)) -> ErrorPolicy
ErrorPolicy
          -- Multiple connection attempts are run in parallel and the last to
          -- finish are cancelled. There may be nothing wrong with the peer,
          -- it could just be slow to respond.
          ((SubscriberError -> Maybe (SuspendDecision DiffTime))
 -> ErrorPolicy)
-> (SubscriberError -> Maybe (SuspendDecision DiffTime))
-> ErrorPolicy
forall a b. (a -> b) -> a -> b
$ \(SubscriberError
_ :: SubscriberError)
                -> SuspendDecision DiffTime -> Maybe (SuspendDecision DiffTime)
forall a. a -> Maybe a
Just (DiffTime -> SuspendDecision DiffTime
forall t. t -> SuspendDecision t
SuspendConsumer DiffTime
veryShortDelay)
        ]
    }
  where
    theyBuggyOrEvil :: SuspendDecision DiffTime
    theyBuggyOrEvil :: SuspendDecision DiffTime
theyBuggyOrEvil = DiffTime -> DiffTime -> SuspendDecision DiffTime
forall t. t -> t -> SuspendDecision t
SuspendPeer DiffTime
defaultDelay DiffTime
defaultDelay

    misconfiguredPeer :: SuspendDecision DiffTime
    misconfiguredPeer :: SuspendDecision DiffTime
misconfiguredPeer = DiffTime -> SuspendDecision DiffTime
forall t. t -> SuspendDecision t
SuspendConsumer DiffTime
defaultDelay

    defaultDelay :: DiffTime
    defaultDelay :: DiffTime
defaultDelay = DiffTime
200 -- seconds

    shortDelay :: DiffTime
    shortDelay :: DiffTime
shortDelay = DiffTime
20 -- seconds

    veryShortDelay :: DiffTime
    veryShortDelay :: DiffTime
veryShortDelay = DiffTime
1 -- seconds

-- | Error policy for local clients.  This is equivalent to
-- 'nullErrorPolicies', but explicit in the errors which can be caught.
--
-- We are very permissive here, and very strict in the
-- `NodeToClient.networkErrorPolicy`.  After any failure the client will be
-- killed and not penalised by this policy.  This allows to restart the local
-- client without a delay.
--
localNetworkErrorPolicy :: ErrorPolicies
localNetworkErrorPolicy :: ErrorPolicies
localNetworkErrorPolicy = ErrorPolicies {
      epAppErrorPolicies :: [ErrorPolicy]
epAppErrorPolicies = [
          -- exception thrown by `runPeerWithLimits`
          (ProtocolLimitFailure -> Maybe (SuspendDecision DiffTime))
-> ErrorPolicy
forall e.
Exception e =>
(e -> Maybe (SuspendDecision DiffTime)) -> ErrorPolicy
ErrorPolicy
            ((ProtocolLimitFailure -> Maybe (SuspendDecision DiffTime))
 -> ErrorPolicy)
-> (ProtocolLimitFailure -> Maybe (SuspendDecision DiffTime))
-> ErrorPolicy
forall a b. (a -> b) -> a -> b
$ \(ProtocolLimitFailure
_ :: ProtocolLimitFailure)
                  -> Maybe (SuspendDecision DiffTime)
forall a. Maybe a
Nothing

          -- deserialisation failure
        , (DeserialiseFailure -> Maybe (SuspendDecision DiffTime))
-> ErrorPolicy
forall e.
Exception e =>
(e -> Maybe (SuspendDecision DiffTime)) -> ErrorPolicy
ErrorPolicy
            ((DeserialiseFailure -> Maybe (SuspendDecision DiffTime))
 -> ErrorPolicy)
-> (DeserialiseFailure -> Maybe (SuspendDecision DiffTime))
-> ErrorPolicy
forall a b. (a -> b) -> a -> b
$ \(DeserialiseFailure
_ :: CBOR.DeserialiseFailure) -> Maybe (SuspendDecision DiffTime)
forall a. Maybe a
Nothing

          -- the connection was unexpectedly closed, we suspend the peer for
          -- a 'shortDelay'
        , (Error -> Maybe (SuspendDecision DiffTime)) -> ErrorPolicy
forall e.
Exception e =>
(e -> Maybe (SuspendDecision DiffTime)) -> ErrorPolicy
ErrorPolicy
          ((Error -> Maybe (SuspendDecision DiffTime)) -> ErrorPolicy)
-> (Error -> Maybe (SuspendDecision DiffTime)) -> ErrorPolicy
forall a b. (a -> b) -> a -> b
$ \(Error
_ :: Mx.Error) -> Maybe (SuspendDecision DiffTime)
forall a. Maybe a
Nothing
        ],

      -- The node never connects to a local client
      epConErrorPolicies :: [ErrorPolicy]
epConErrorPolicies = []
    }

type RemoteAddress      = Socket.SockAddr

instance ShowProxy RemoteAddress where
  showProxy :: Proxy SockAddr -> String
showProxy Proxy SockAddr
_ = String
"SockAddr"

type RemoteConnectionId = ConnectionId RemoteAddress