{-# LANGUAGE DataKinds           #-}
{-# LANGUAGE GADTs               #-}
{-# LANGUAGE NamedFieldPuns      #-}
{-# LANGUAGE PolyKinds           #-}
{-# LANGUAGE RankNTypes          #-}
{-# LANGUAGE ScopedTypeVariables #-}
{-# LANGUAGE TypeFamilies        #-}
{-# LANGUAGE TypeOperators       #-}

-- | This is the starting point for a module that will bring together the
-- overall node to client protocol, as a collection of mini-protocols.
--
module Ouroboros.Network.NodeToClient
  ( nodeToClientProtocols
  , NodeToClientProtocols (..)
  , NodeToClientVersion (..)
  , NodeToClientVersionData (..)
  , NetworkConnectTracers (..)
  , nullNetworkConnectTracers
  , connectTo
  , connectToWithMux
  , NetworkServerTracers (..)
  , nullNetworkServerTracers
  , NetworkMutableState (..)
  , newNetworkMutableState
  , newNetworkMutableStateSTM
  , cleanNetworkMutableState
  , withServer
  , NetworkClientSubcriptionTracers
  , NetworkSubscriptionTracers (..)
  , ClientSubscriptionParams (..)
  , ncSubscriptionWorker
    -- * Null Protocol Peers
  , chainSyncPeerNull
  , localStateQueryPeerNull
  , localTxSubmissionPeerNull
  , localTxMonitorPeerNull
    -- * Re-exported network interface
  , IOManager (..)
  , AssociateWithIOCP
  , withIOManager
  , LocalSnocket
  , localSnocket
  , LocalSocket (..)
  , LocalAddress (..)
    -- * Versions
  , Versions (..)
  , versionedNodeToClientProtocols
  , simpleSingletonVersions
  , foldMapVersions
  , combineVersions
    -- ** Codecs
  , nodeToClientHandshakeCodec
  , nodeToClientVersionCodec
  , nodeToClientCodecCBORTerm
    -- * Re-exports
  , ConnectionId (..)
  , MinimalInitiatorContext (..)
  , ResponderContext (..)
  , LocalConnectionId
  , ErrorPolicies (..)
  , networkErrorPolicies
  , nullErrorPolicies
  , ErrorPolicy (..)
  , ErrorPolicyTrace (..)
  , WithAddr (..)
  , SuspendDecision (..)
  , TraceSendRecv (..)
  , ProtocolLimitFailure
  , Handshake
  , LocalAddresses (..)
  , SubscriptionTrace (..)
  , HandshakeTr
  ) where

import Cardano.Prelude (FatalError)

import Control.Concurrent.Async qualified as Async
import Control.Exception (ErrorCall, IOException, SomeException)
import Control.Monad (forever)
import Control.Monad.Class.MonadTimer.SI

import Codec.CBOR.Term qualified as CBOR
import Data.ByteString.Lazy qualified as BL
import Data.Functor (void)
import Data.Functor.Contravariant (contramap)
import Data.Functor.Identity (Identity (..))
import Data.Kind (Type)
import Data.Void (Void, absurd)

import Network.Mux qualified as Mx
import Network.TypedProtocol.Peer.Client
import Network.TypedProtocol.Stateful.Peer.Client qualified as Stateful

import Ouroboros.Network.Context
import Ouroboros.Network.Driver (TraceSendRecv (..))
import Ouroboros.Network.Driver.Limits (ProtocolLimitFailure (..))
import Ouroboros.Network.Driver.Simple (DecoderFailure)
import Ouroboros.Network.ErrorPolicy
import Ouroboros.Network.IOManager
import Ouroboros.Network.Mux
import Ouroboros.Network.NodeToClient.Version
import Ouroboros.Network.Protocol.ChainSync.Client as ChainSync
import Ouroboros.Network.Protocol.ChainSync.Type qualified as ChainSync
import Ouroboros.Network.Protocol.Handshake.Codec
import Ouroboros.Network.Protocol.Handshake.Type
import Ouroboros.Network.Protocol.Handshake.Version hiding (Accept)
import Ouroboros.Network.Protocol.LocalStateQuery.Client as LocalStateQuery
import Ouroboros.Network.Protocol.LocalStateQuery.Type qualified as LocalStateQuery
import Ouroboros.Network.Protocol.LocalTxMonitor.Client as LocalTxMonitor
import Ouroboros.Network.Protocol.LocalTxMonitor.Type qualified as LocalTxMonitor
import Ouroboros.Network.Protocol.LocalTxSubmission.Client as LocalTxSubmission
import Ouroboros.Network.Protocol.LocalTxSubmission.Type qualified as LocalTxSubmission
import Ouroboros.Network.Snocket
import Ouroboros.Network.Socket
import Ouroboros.Network.Subscription.Client (ClientSubscriptionParams (..))
import Ouroboros.Network.Subscription.Client qualified as Subscription
import Ouroboros.Network.Subscription.Ip (SubscriptionTrace (..))
import Ouroboros.Network.Subscription.Worker (LocalAddresses (..))
import Ouroboros.Network.Tracers

-- The Handshake tracer types are simply terrible.
type HandshakeTr ntcAddr ntcVersion =
    Mx.WithBearer (ConnectionId ntcAddr)
                  (TraceSendRecv (Handshake ntcVersion CBOR.Term))


-- | Record of node-to-client mini protocols.
--
data NodeToClientProtocols appType ntcAddr bytes m a b = NodeToClientProtocols {
    -- | local chain-sync mini-protocol
    --
    forall (appType :: Mode) ntcAddr bytes (m :: * -> *) a b.
NodeToClientProtocols appType ntcAddr bytes m a b
-> RunMiniProtocolWithMinimalCtx appType ntcAddr bytes m a b
localChainSyncProtocol    :: RunMiniProtocolWithMinimalCtx
                                   appType ntcAddr bytes m a b,

    -- | local tx-submission mini-protocol
    --
    forall (appType :: Mode) ntcAddr bytes (m :: * -> *) a b.
NodeToClientProtocols appType ntcAddr bytes m a b
-> RunMiniProtocolWithMinimalCtx appType ntcAddr bytes m a b
localTxSubmissionProtocol :: RunMiniProtocolWithMinimalCtx
                                   appType ntcAddr bytes m a b,

    -- | local state-query mini-protocol
    --
    forall (appType :: Mode) ntcAddr bytes (m :: * -> *) a b.
NodeToClientProtocols appType ntcAddr bytes m a b
-> RunMiniProtocolWithMinimalCtx appType ntcAddr bytes m a b
localStateQueryProtocol   :: RunMiniProtocolWithMinimalCtx
                                   appType ntcAddr bytes m a b,

    -- | local tx-monitor mini-protocol
    --
    forall (appType :: Mode) ntcAddr bytes (m :: * -> *) a b.
NodeToClientProtocols appType ntcAddr bytes m a b
-> RunMiniProtocolWithMinimalCtx appType ntcAddr bytes m a b
localTxMonitorProtocol    :: RunMiniProtocolWithMinimalCtx
                                   appType ntcAddr bytes m a b
  }


-- | Make an 'OuroborosApplication' for the bundle of mini-protocols that
-- make up the overall node-to-client protocol.
--
-- This function specifies the wire format protocol numbers as well as the
-- protocols that run for each 'NodeToClientVersion'.
--
-- They are chosen to not overlap with the node to node protocol numbers.
-- This is not essential for correctness, but is helpful to allow a single
-- shared implementation of tools that can analyse both protocols, e.g.
-- wireshark plugins.
--
nodeToClientProtocols
  :: NodeToClientProtocols appType addr bytes m a b
  -> NodeToClientVersion
  -> OuroborosApplicationWithMinimalCtx appType addr bytes m a b
nodeToClientProtocols :: forall (appType :: Mode) addr bytes (m :: * -> *) a b.
NodeToClientProtocols appType addr bytes m a b
-> NodeToClientVersion
-> OuroborosApplicationWithMinimalCtx appType addr bytes m a b
nodeToClientProtocols NodeToClientProtocols appType addr bytes m a b
protocols NodeToClientVersion
_version =
    [MiniProtocol
   appType
   (MinimalInitiatorContext addr)
   (ResponderContext addr)
   bytes
   m
   a
   b]
-> OuroborosApplication
     appType
     (MinimalInitiatorContext addr)
     (ResponderContext addr)
     bytes
     m
     a
     b
forall (mode :: Mode) initiatorCtx responderCtx bytes (m :: * -> *)
       a b.
[MiniProtocol mode initiatorCtx responderCtx bytes m a b]
-> OuroborosApplication mode initiatorCtx responderCtx bytes m a b
OuroborosApplication ([MiniProtocol
    appType
    (MinimalInitiatorContext addr)
    (ResponderContext addr)
    bytes
    m
    a
    b]
 -> OuroborosApplication
      appType
      (MinimalInitiatorContext addr)
      (ResponderContext addr)
      bytes
      m
      a
      b)
-> [MiniProtocol
      appType
      (MinimalInitiatorContext addr)
      (ResponderContext addr)
      bytes
      m
      a
      b]
-> OuroborosApplication
     appType
     (MinimalInitiatorContext addr)
     (ResponderContext addr)
     bytes
     m
     a
     b
forall a b. (a -> b) -> a -> b
$
      case NodeToClientProtocols appType addr bytes m a b
protocols of
        NodeToClientProtocols {
            RunMiniProtocolWithMinimalCtx appType addr bytes m a b
localChainSyncProtocol :: forall (appType :: Mode) ntcAddr bytes (m :: * -> *) a b.
NodeToClientProtocols appType ntcAddr bytes m a b
-> RunMiniProtocolWithMinimalCtx appType ntcAddr bytes m a b
localChainSyncProtocol :: RunMiniProtocolWithMinimalCtx appType addr bytes m a b
localChainSyncProtocol,
            RunMiniProtocolWithMinimalCtx appType addr bytes m a b
localTxSubmissionProtocol :: forall (appType :: Mode) ntcAddr bytes (m :: * -> *) a b.
NodeToClientProtocols appType ntcAddr bytes m a b
-> RunMiniProtocolWithMinimalCtx appType ntcAddr bytes m a b
localTxSubmissionProtocol :: RunMiniProtocolWithMinimalCtx appType addr bytes m a b
localTxSubmissionProtocol,
            RunMiniProtocolWithMinimalCtx appType addr bytes m a b
localStateQueryProtocol :: forall (appType :: Mode) ntcAddr bytes (m :: * -> *) a b.
NodeToClientProtocols appType ntcAddr bytes m a b
-> RunMiniProtocolWithMinimalCtx appType ntcAddr bytes m a b
localStateQueryProtocol :: RunMiniProtocolWithMinimalCtx appType addr bytes m a b
localStateQueryProtocol,
            RunMiniProtocolWithMinimalCtx appType addr bytes m a b
localTxMonitorProtocol :: forall (appType :: Mode) ntcAddr bytes (m :: * -> *) a b.
NodeToClientProtocols appType ntcAddr bytes m a b
-> RunMiniProtocolWithMinimalCtx appType ntcAddr bytes m a b
localTxMonitorProtocol :: RunMiniProtocolWithMinimalCtx appType addr bytes m a b
localTxMonitorProtocol
          } ->
          [ RunMiniProtocolWithMinimalCtx appType addr bytes m a b
-> MiniProtocol
     appType
     (MinimalInitiatorContext addr)
     (ResponderContext addr)
     bytes
     m
     a
     b
forall {mode :: Mode} {initiatorCtx} {responderCtx} {bytes}
       {m :: * -> *} {a} {b}.
RunMiniProtocol mode initiatorCtx responderCtx bytes m a b
-> MiniProtocol mode initiatorCtx responderCtx bytes m a b
localChainSyncMiniProtocol RunMiniProtocolWithMinimalCtx appType addr bytes m a b
localChainSyncProtocol
          , RunMiniProtocolWithMinimalCtx appType addr bytes m a b
-> MiniProtocol
     appType
     (MinimalInitiatorContext addr)
     (ResponderContext addr)
     bytes
     m
     a
     b
forall {mode :: Mode} {initiatorCtx} {responderCtx} {bytes}
       {m :: * -> *} {a} {b}.
RunMiniProtocol mode initiatorCtx responderCtx bytes m a b
-> MiniProtocol mode initiatorCtx responderCtx bytes m a b
localTxSubmissionMiniProtocol RunMiniProtocolWithMinimalCtx appType addr bytes m a b
localTxSubmissionProtocol
          , RunMiniProtocolWithMinimalCtx appType addr bytes m a b
-> MiniProtocol
     appType
     (MinimalInitiatorContext addr)
     (ResponderContext addr)
     bytes
     m
     a
     b
forall {mode :: Mode} {initiatorCtx} {responderCtx} {bytes}
       {m :: * -> *} {a} {b}.
RunMiniProtocol mode initiatorCtx responderCtx bytes m a b
-> MiniProtocol mode initiatorCtx responderCtx bytes m a b
localStateQueryMiniProtocol RunMiniProtocolWithMinimalCtx appType addr bytes m a b
localStateQueryProtocol
          , RunMiniProtocolWithMinimalCtx appType addr bytes m a b
-> MiniProtocol
     appType
     (MinimalInitiatorContext addr)
     (ResponderContext addr)
     bytes
     m
     a
     b
forall {mode :: Mode} {initiatorCtx} {responderCtx} {bytes}
       {m :: * -> *} {a} {b}.
RunMiniProtocol mode initiatorCtx responderCtx bytes m a b
-> MiniProtocol mode initiatorCtx responderCtx bytes m a b
localTxMonitorMiniProtocol RunMiniProtocolWithMinimalCtx appType addr bytes m a b
localTxMonitorProtocol
          ]

  where
    localChainSyncMiniProtocol :: RunMiniProtocol mode initiatorCtx responderCtx bytes m a b
-> MiniProtocol mode initiatorCtx responderCtx bytes m a b
localChainSyncMiniProtocol RunMiniProtocol mode initiatorCtx responderCtx bytes m a b
localChainSyncProtocol = MiniProtocol {
        miniProtocolNum :: MiniProtocolNum
miniProtocolNum    = Word16 -> MiniProtocolNum
MiniProtocolNum Word16
5,
        miniProtocolLimits :: MiniProtocolLimits
miniProtocolLimits = MiniProtocolLimits
maximumMiniProtocolLimits,
        miniProtocolRun :: RunMiniProtocol mode initiatorCtx responderCtx bytes m a b
miniProtocolRun    = RunMiniProtocol mode initiatorCtx responderCtx bytes m a b
localChainSyncProtocol
      }
    localTxSubmissionMiniProtocol :: RunMiniProtocol mode initiatorCtx responderCtx bytes m a b
-> MiniProtocol mode initiatorCtx responderCtx bytes m a b
localTxSubmissionMiniProtocol RunMiniProtocol mode initiatorCtx responderCtx bytes m a b
localTxSubmissionProtocol = MiniProtocol {
        miniProtocolNum :: MiniProtocolNum
miniProtocolNum    = Word16 -> MiniProtocolNum
MiniProtocolNum Word16
6,
        miniProtocolLimits :: MiniProtocolLimits
miniProtocolLimits = MiniProtocolLimits
maximumMiniProtocolLimits,
        miniProtocolRun :: RunMiniProtocol mode initiatorCtx responderCtx bytes m a b
miniProtocolRun    = RunMiniProtocol mode initiatorCtx responderCtx bytes m a b
localTxSubmissionProtocol
      }
    localStateQueryMiniProtocol :: RunMiniProtocol mode initiatorCtx responderCtx bytes m a b
-> MiniProtocol mode initiatorCtx responderCtx bytes m a b
localStateQueryMiniProtocol RunMiniProtocol mode initiatorCtx responderCtx bytes m a b
localStateQueryProtocol = MiniProtocol {
        miniProtocolNum :: MiniProtocolNum
miniProtocolNum    = Word16 -> MiniProtocolNum
MiniProtocolNum Word16
7,
        miniProtocolLimits :: MiniProtocolLimits
miniProtocolLimits = MiniProtocolLimits
maximumMiniProtocolLimits,
        miniProtocolRun :: RunMiniProtocol mode initiatorCtx responderCtx bytes m a b
miniProtocolRun    = RunMiniProtocol mode initiatorCtx responderCtx bytes m a b
localStateQueryProtocol
      }
    localTxMonitorMiniProtocol :: RunMiniProtocol mode initiatorCtx responderCtx bytes m a b
-> MiniProtocol mode initiatorCtx responderCtx bytes m a b
localTxMonitorMiniProtocol RunMiniProtocol mode initiatorCtx responderCtx bytes m a b
localTxMonitorProtocol = MiniProtocol {
        miniProtocolNum :: MiniProtocolNum
miniProtocolNum    = Word16 -> MiniProtocolNum
MiniProtocolNum Word16
9,
        miniProtocolLimits :: MiniProtocolLimits
miniProtocolLimits = MiniProtocolLimits
maximumMiniProtocolLimits,
        miniProtocolRun :: RunMiniProtocol mode initiatorCtx responderCtx bytes m a b
miniProtocolRun    = RunMiniProtocol mode initiatorCtx responderCtx bytes m a b
localTxMonitorProtocol
    }

maximumMiniProtocolLimits :: MiniProtocolLimits
maximumMiniProtocolLimits :: MiniProtocolLimits
maximumMiniProtocolLimits =
    MiniProtocolLimits {
      maximumIngressQueue :: Int
maximumIngressQueue = Int
0xffffffff
    }


-- | 'Versions' containing a single version of 'nodeToClientProtocols'.
--
versionedNodeToClientProtocols
    :: NodeToClientVersion
    -> NodeToClientVersionData
    -> NodeToClientProtocols appType LocalAddress bytes m a b
    -> Versions NodeToClientVersion
                NodeToClientVersionData
                (OuroborosApplicationWithMinimalCtx appType LocalAddress bytes m a b)
versionedNodeToClientProtocols :: forall (appType :: Mode) bytes (m :: * -> *) a b.
NodeToClientVersion
-> NodeToClientVersionData
-> NodeToClientProtocols appType LocalAddress bytes m a b
-> Versions
     NodeToClientVersion
     NodeToClientVersionData
     (OuroborosApplicationWithMinimalCtx
        appType LocalAddress bytes m a b)
versionedNodeToClientProtocols NodeToClientVersion
versionNumber NodeToClientVersionData
versionData NodeToClientProtocols appType LocalAddress bytes m a b
protocols =
    NodeToClientVersion
-> NodeToClientVersionData
-> OuroborosApplicationWithMinimalCtx
     appType LocalAddress bytes m a b
-> Versions
     NodeToClientVersion
     NodeToClientVersionData
     (OuroborosApplicationWithMinimalCtx
        appType LocalAddress bytes m a b)
forall vNum vData r. vNum -> vData -> r -> Versions vNum vData r
simpleSingletonVersions
      NodeToClientVersion
versionNumber
      NodeToClientVersionData
versionData
      (NodeToClientProtocols appType LocalAddress bytes m a b
-> NodeToClientVersion
-> OuroborosApplicationWithMinimalCtx
     appType LocalAddress bytes m a b
forall (appType :: Mode) addr bytes (m :: * -> *) a b.
NodeToClientProtocols appType addr bytes m a b
-> NodeToClientVersion
-> OuroborosApplicationWithMinimalCtx appType addr bytes m a b
nodeToClientProtocols NodeToClientProtocols appType LocalAddress bytes m a b
protocols NodeToClientVersion
versionNumber)

-- | A specialised version of 'Ouroboros.Network.Socket.connectToNode'.  It is
-- a general purpose function which can connect using any version of the
-- protocol.  This is mostly useful for future enhancements.
--
connectTo
  :: LocalSnocket
  -- ^ callback constructed by 'Ouroboros.Network.IOManager.withIOManager'
  -> NetworkConnectTracers LocalAddress NodeToClientVersion
  -> Versions NodeToClientVersion
              NodeToClientVersionData
              (OuroborosApplicationWithMinimalCtx
                 Mx.InitiatorMode LocalAddress BL.ByteString IO a Void)
  -- ^ A dictionary of protocol versions & applications to run on an established
  -- connection.  The application to run will be chosen by initial handshake
  -- protocol (the highest shared version will be chosen).
  -> FilePath
  -- ^ path of the unix socket or named pipe
  -> IO (Either SomeException a)
connectTo :: forall a.
LocalSnocket
-> NetworkConnectTracers LocalAddress NodeToClientVersion
-> Versions
     NodeToClientVersion
     NodeToClientVersionData
     (OuroborosApplicationWithMinimalCtx
        'InitiatorMode LocalAddress ByteString IO a IOManagerError)
-> FilePath
-> IO (Either SomeException a)
connectTo LocalSnocket
snocket NetworkConnectTracers LocalAddress NodeToClientVersion
tracers Versions
  NodeToClientVersion
  NodeToClientVersionData
  (OuroborosApplicationWithMinimalCtx
     'InitiatorMode LocalAddress ByteString IO a IOManagerError)
versions FilePath
path =
    (Either a IOManagerError -> a)
-> Either SomeException (Either a IOManagerError)
-> Either SomeException a
forall a b.
(a -> b) -> Either SomeException a -> Either SomeException b
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
fmap Either a IOManagerError -> a
forall x. Either x IOManagerError -> x
fn (Either SomeException (Either a IOManagerError)
 -> Either SomeException a)
-> IO (Either SomeException (Either a IOManagerError))
-> IO (Either SomeException a)
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$>
    LocalSnocket
-> MakeBearer IO LocalSocket
-> ConnectToArgs
     LocalSocket
     LocalAddress
     NodeToClientVersion
     NodeToClientVersionData
-> (LocalSocket -> IO ())
-> Versions
     NodeToClientVersion
     NodeToClientVersionData
     (OuroborosApplicationWithMinimalCtx
        'InitiatorMode LocalAddress ByteString IO a IOManagerError)
-> Maybe LocalAddress
-> LocalAddress
-> IO (Either SomeException (Either a IOManagerError))
forall (muxMode :: Mode) vNumber vData fd addr a b.
(Ord vNumber, Typeable vNumber, Show vNumber,
 HasInitiator muxMode ~ 'True) =>
Snocket IO fd addr
-> MakeBearer IO fd
-> ConnectToArgs fd addr vNumber vData
-> (fd -> IO ())
-> Versions
     vNumber
     vData
     (OuroborosApplicationWithMinimalCtx muxMode addr ByteString IO a b)
-> Maybe addr
-> addr
-> IO (Either SomeException (Either a b))
connectToNode
      LocalSnocket
snocket
      MakeBearer IO LocalSocket
makeLocalBearer
      ConnectToArgs {
        ctaHandshakeCodec :: Codec
  (Handshake NodeToClientVersion Term)
  DeserialiseFailure
  IO
  ByteString
ctaHandshakeCodec      = Codec
  (Handshake NodeToClientVersion Term)
  DeserialiseFailure
  IO
  ByteString
forall (m :: * -> *).
MonadST m =>
Codec
  (Handshake NodeToClientVersion Term)
  DeserialiseFailure
  m
  ByteString
nodeToClientHandshakeCodec,
        ctaHandshakeTimeLimits :: ProtocolTimeLimits (Handshake NodeToClientVersion Term)
ctaHandshakeTimeLimits = ProtocolTimeLimits (Handshake NodeToClientVersion Term)
forall {k} (vNumber :: k).
ProtocolTimeLimits (Handshake vNumber Term)
noTimeLimitsHandshake,
        ctaVersionDataCodec :: VersionDataCodec Term NodeToClientVersion NodeToClientVersionData
ctaVersionDataCodec    = (NodeToClientVersion -> CodecCBORTerm Text NodeToClientVersionData)
-> VersionDataCodec
     Term NodeToClientVersion NodeToClientVersionData
forall vNumber vData.
(vNumber -> CodecCBORTerm Text vData)
-> VersionDataCodec Term vNumber vData
cborTermVersionDataCodec NodeToClientVersion -> CodecCBORTerm Text NodeToClientVersionData
nodeToClientCodecCBORTerm,
        ctaConnectTracers :: NetworkConnectTracers LocalAddress NodeToClientVersion
ctaConnectTracers      = NetworkConnectTracers LocalAddress NodeToClientVersion
tracers,
        ctaHandshakeCallbacks :: HandshakeCallbacks NodeToClientVersionData
ctaHandshakeCallbacks  = (NodeToClientVersionData
 -> NodeToClientVersionData -> Accept NodeToClientVersionData)
-> (NodeToClientVersionData -> Bool)
-> HandshakeCallbacks NodeToClientVersionData
forall vData.
(vData -> vData -> Accept vData)
-> (vData -> Bool) -> HandshakeCallbacks vData
HandshakeCallbacks NodeToClientVersionData
-> NodeToClientVersionData -> Accept NodeToClientVersionData
forall v. Acceptable v => v -> v -> Accept v
acceptableVersion NodeToClientVersionData -> Bool
forall v. Queryable v => v -> Bool
queryVersion
      }
      LocalSocket -> IO ()
forall a. Monoid a => a
mempty
      Versions
  NodeToClientVersion
  NodeToClientVersionData
  (OuroborosApplicationWithMinimalCtx
     'InitiatorMode LocalAddress ByteString IO a IOManagerError)
versions
      Maybe LocalAddress
forall a. Maybe a
Nothing
      (FilePath -> LocalAddress
localAddressFromPath FilePath
path)
  where
    fn :: forall x. Either x Void -> x
    fn :: forall x. Either x IOManagerError -> x
fn = (x -> x) -> (IOManagerError -> x) -> Either x IOManagerError -> x
forall a c b. (a -> c) -> (b -> c) -> Either a b -> c
either x -> x
forall a. a -> a
id IOManagerError -> x
forall a. IOManagerError -> a
absurd

-- | A version of `connectTo` which exposes `Mx.Mux` interfaces which allows to
-- run mini-protocols and handle their termination (e.g. restart them when they
-- terminate or error).
--
connectToWithMux
  :: LocalSnocket
  -- ^ callback constructed by 'Ouroboros.Network.IOManager.withIOManager'
  -> NetworkConnectTracers LocalAddress NodeToClientVersion
  -> Versions NodeToClientVersion
              NodeToClientVersionData
              (OuroborosApplicationWithMinimalCtx
                 Mx.InitiatorMode LocalAddress BL.ByteString IO a b)
  -- ^ A dictionary of protocol versions & applications to run on an established
  -- connection.  The application to run will be chosen by initial handshake
  -- protocol (the highest shared version will be chosen).
  -> FilePath
  -- ^ path of the unix socket or named pipe
  -> (    ConnectionId LocalAddress
       -> NodeToClientVersion
       -> NodeToClientVersionData
       -> OuroborosApplicationWithMinimalCtx Mx.InitiatorMode LocalAddress BL.ByteString IO a b
       -> Mx.Mux Mx.InitiatorMode IO
       -> Async.Async ()
       -> IO x)
  -- ^ callback which has access to negotiated protocols and mux handle created for
  -- that connection.  The `Async` is a handle the the thread which runs
  -- `Mx.runMux`.  The `Mux` handle allows schedule mini-protocols.
  --
  -- NOTE: when the callback returns or errors, the mux thread will be killed.
  -> IO x
connectToWithMux :: forall a b x.
LocalSnocket
-> NetworkConnectTracers LocalAddress NodeToClientVersion
-> Versions
     NodeToClientVersion
     NodeToClientVersionData
     (OuroborosApplicationWithMinimalCtx
        'InitiatorMode LocalAddress ByteString IO a b)
-> FilePath
-> (ConnectionId LocalAddress
    -> NodeToClientVersion
    -> NodeToClientVersionData
    -> OuroborosApplicationWithMinimalCtx
         'InitiatorMode LocalAddress ByteString IO a b
    -> Mux 'InitiatorMode IO
    -> Async ()
    -> IO x)
-> IO x
connectToWithMux LocalSnocket
snocket NetworkConnectTracers LocalAddress NodeToClientVersion
tracers Versions
  NodeToClientVersion
  NodeToClientVersionData
  (OuroborosApplicationWithMinimalCtx
     'InitiatorMode LocalAddress ByteString IO a b)
versions FilePath
path ConnectionId LocalAddress
-> NodeToClientVersion
-> NodeToClientVersionData
-> OuroborosApplicationWithMinimalCtx
     'InitiatorMode LocalAddress ByteString IO a b
-> Mux 'InitiatorMode IO
-> Async ()
-> IO x
k =
  LocalSnocket
-> MakeBearer IO LocalSocket
-> ConnectToArgs
     LocalSocket
     LocalAddress
     NodeToClientVersion
     NodeToClientVersionData
-> (LocalSocket -> IO ())
-> Versions
     NodeToClientVersion
     NodeToClientVersionData
     (OuroborosApplicationWithMinimalCtx
        'InitiatorMode LocalAddress ByteString IO a b)
-> Maybe LocalAddress
-> LocalAddress
-> (ConnectionId LocalAddress
    -> NodeToClientVersion
    -> NodeToClientVersionData
    -> OuroborosApplicationWithMinimalCtx
         'InitiatorMode LocalAddress ByteString IO a b
    -> Mux 'InitiatorMode IO
    -> Async IO ()
    -> IO x)
-> IO x
forall (muxMode :: Mode) vNumber vData fd addr a b x.
(Ord vNumber, Typeable vNumber, Show vNumber,
 HasInitiator muxMode ~ 'True) =>
Snocket IO fd addr
-> MakeBearer IO fd
-> ConnectToArgs fd addr vNumber vData
-> (fd -> IO ())
-> Versions
     vNumber
     vData
     (OuroborosApplicationWithMinimalCtx muxMode addr ByteString IO a b)
-> Maybe addr
-> addr
-> (ConnectionId addr
    -> vNumber
    -> vData
    -> OuroborosApplicationWithMinimalCtx
         muxMode addr ByteString IO a b
    -> Mux muxMode IO
    -> Async IO ()
    -> IO x)
-> IO x
connectToNodeWithMux
    LocalSnocket
snocket
    MakeBearer IO LocalSocket
makeLocalBearer
    ConnectToArgs {
      ctaHandshakeCodec :: Codec
  (Handshake NodeToClientVersion Term)
  DeserialiseFailure
  IO
  ByteString
ctaHandshakeCodec      = Codec
  (Handshake NodeToClientVersion Term)
  DeserialiseFailure
  IO
  ByteString
forall (m :: * -> *).
MonadST m =>
Codec
  (Handshake NodeToClientVersion Term)
  DeserialiseFailure
  m
  ByteString
nodeToClientHandshakeCodec,
      ctaHandshakeTimeLimits :: ProtocolTimeLimits (Handshake NodeToClientVersion Term)
ctaHandshakeTimeLimits = ProtocolTimeLimits (Handshake NodeToClientVersion Term)
forall {k} (vNumber :: k).
ProtocolTimeLimits (Handshake vNumber Term)
noTimeLimitsHandshake,
      ctaVersionDataCodec :: VersionDataCodec Term NodeToClientVersion NodeToClientVersionData
ctaVersionDataCodec    = (NodeToClientVersion -> CodecCBORTerm Text NodeToClientVersionData)
-> VersionDataCodec
     Term NodeToClientVersion NodeToClientVersionData
forall vNumber vData.
(vNumber -> CodecCBORTerm Text vData)
-> VersionDataCodec Term vNumber vData
cborTermVersionDataCodec NodeToClientVersion -> CodecCBORTerm Text NodeToClientVersionData
nodeToClientCodecCBORTerm,
      ctaConnectTracers :: NetworkConnectTracers LocalAddress NodeToClientVersion
ctaConnectTracers      = NetworkConnectTracers LocalAddress NodeToClientVersion
tracers,
      ctaHandshakeCallbacks :: HandshakeCallbacks NodeToClientVersionData
ctaHandshakeCallbacks  = (NodeToClientVersionData
 -> NodeToClientVersionData -> Accept NodeToClientVersionData)
-> (NodeToClientVersionData -> Bool)
-> HandshakeCallbacks NodeToClientVersionData
forall vData.
(vData -> vData -> Accept vData)
-> (vData -> Bool) -> HandshakeCallbacks vData
HandshakeCallbacks NodeToClientVersionData
-> NodeToClientVersionData -> Accept NodeToClientVersionData
forall v. Acceptable v => v -> v -> Accept v
acceptableVersion NodeToClientVersionData -> Bool
forall v. Queryable v => v -> Bool
queryVersion
    }
    LocalSocket -> IO ()
forall a. Monoid a => a
mempty
    Versions
  NodeToClientVersion
  NodeToClientVersionData
  (OuroborosApplicationWithMinimalCtx
     'InitiatorMode LocalAddress ByteString IO a b)
versions
    Maybe LocalAddress
forall a. Maybe a
Nothing
    (FilePath -> LocalAddress
localAddressFromPath FilePath
path)
    ConnectionId LocalAddress
-> NodeToClientVersion
-> NodeToClientVersionData
-> OuroborosApplicationWithMinimalCtx
     'InitiatorMode LocalAddress ByteString IO a b
-> Mux 'InitiatorMode IO
-> Async ()
-> IO x
ConnectionId LocalAddress
-> NodeToClientVersion
-> NodeToClientVersionData
-> OuroborosApplicationWithMinimalCtx
     'InitiatorMode LocalAddress ByteString IO a b
-> Mux 'InitiatorMode IO
-> Async IO ()
-> IO x
k



-- | A specialised version of 'Ouroboros.Network.Socket.withServerNode'.
--
-- Comments to 'Ouroboros.Network.NodeToNode.withServer' apply here as well.
--
withServer
  :: LocalSnocket
  -> NetworkServerTracers LocalAddress NodeToClientVersion
  -> NetworkMutableState LocalAddress
  -> LocalSocket
  -> Versions NodeToClientVersion
              NodeToClientVersionData
              (OuroborosApplicationWithMinimalCtx
                 Mx.ResponderMode LocalAddress BL.ByteString IO a b)
  -> ErrorPolicies
  -> IO Void
withServer :: forall a b.
LocalSnocket
-> NetworkServerTracers LocalAddress NodeToClientVersion
-> NetworkMutableState LocalAddress
-> LocalSocket
-> Versions
     NodeToClientVersion
     NodeToClientVersionData
     (OuroborosApplicationWithMinimalCtx
        'ResponderMode LocalAddress ByteString IO a b)
-> ErrorPolicies
-> IO IOManagerError
withServer LocalSnocket
sn NetworkServerTracers LocalAddress NodeToClientVersion
tracers NetworkMutableState LocalAddress
networkState LocalSocket
sd Versions
  NodeToClientVersion
  NodeToClientVersionData
  (OuroborosApplicationWithMinimalCtx
     'ResponderMode LocalAddress ByteString IO a b)
versions ErrorPolicies
errPolicies =
  LocalSnocket
-> MakeBearer IO LocalSocket
-> NetworkServerTracers LocalAddress NodeToClientVersion
-> NetworkMutableState LocalAddress
-> AcceptedConnectionsLimit
-> LocalSocket
-> Codec
     (Handshake NodeToClientVersion Term)
     DeserialiseFailure
     IO
     ByteString
-> ProtocolTimeLimits (Handshake NodeToClientVersion Term)
-> VersionDataCodec
     Term NodeToClientVersion NodeToClientVersionData
-> HandshakeCallbacks NodeToClientVersionData
-> Versions
     NodeToClientVersion
     NodeToClientVersionData
     (SomeResponderApplication LocalAddress ByteString IO b)
-> ErrorPolicies
-> (LocalAddress -> Async IO IOManagerError -> IO IOManagerError)
-> IO IOManagerError
forall vNumber vData t fd addr b.
(Ord vNumber, Typeable vNumber, Show vNumber, Ord addr) =>
Snocket IO fd addr
-> MakeBearer IO fd
-> NetworkServerTracers addr vNumber
-> NetworkMutableState addr
-> AcceptedConnectionsLimit
-> fd
-> Codec (Handshake vNumber Term) DeserialiseFailure IO ByteString
-> ProtocolTimeLimits (Handshake vNumber Term)
-> VersionDataCodec Term vNumber vData
-> HandshakeCallbacks vData
-> Versions
     vNumber vData (SomeResponderApplication addr ByteString IO b)
-> ErrorPolicies
-> (addr -> Async IO IOManagerError -> IO t)
-> IO t
withServerNode'
    LocalSnocket
sn
    MakeBearer IO LocalSocket
makeLocalBearer
    NetworkServerTracers LocalAddress NodeToClientVersion
tracers
    NetworkMutableState LocalAddress
networkState
    (Word32 -> Word32 -> DiffTime -> AcceptedConnectionsLimit
AcceptedConnectionsLimit Word32
forall a. Bounded a => a
maxBound Word32
forall a. Bounded a => a
maxBound DiffTime
0)
    LocalSocket
sd
    Codec
  (Handshake NodeToClientVersion Term)
  DeserialiseFailure
  IO
  ByteString
forall (m :: * -> *).
MonadST m =>
Codec
  (Handshake NodeToClientVersion Term)
  DeserialiseFailure
  m
  ByteString
nodeToClientHandshakeCodec
    ProtocolTimeLimits (Handshake NodeToClientVersion Term)
forall {k} (vNumber :: k).
ProtocolTimeLimits (Handshake vNumber Term)
noTimeLimitsHandshake
    ((NodeToClientVersion -> CodecCBORTerm Text NodeToClientVersionData)
-> VersionDataCodec
     Term NodeToClientVersion NodeToClientVersionData
forall vNumber vData.
(vNumber -> CodecCBORTerm Text vData)
-> VersionDataCodec Term vNumber vData
cborTermVersionDataCodec NodeToClientVersion -> CodecCBORTerm Text NodeToClientVersionData
nodeToClientCodecCBORTerm)
    ((NodeToClientVersionData
 -> NodeToClientVersionData -> Accept NodeToClientVersionData)
-> (NodeToClientVersionData -> Bool)
-> HandshakeCallbacks NodeToClientVersionData
forall vData.
(vData -> vData -> Accept vData)
-> (vData -> Bool) -> HandshakeCallbacks vData
HandshakeCallbacks NodeToClientVersionData
-> NodeToClientVersionData -> Accept NodeToClientVersionData
forall v. Acceptable v => v -> v -> Accept v
acceptableVersion NodeToClientVersionData -> Bool
forall v. Queryable v => v -> Bool
queryVersion)
    (OuroborosApplicationWithMinimalCtx
  'ResponderMode LocalAddress ByteString IO a b
-> SomeResponderApplication LocalAddress ByteString IO b
forall (muxMode :: Mode) addr bytes (m :: * -> *) a b.
(HasResponder muxMode ~ 'True) =>
OuroborosApplicationWithMinimalCtx muxMode addr bytes m a b
-> SomeResponderApplication addr bytes m b
SomeResponderApplication (OuroborosApplicationWithMinimalCtx
   'ResponderMode LocalAddress ByteString IO a b
 -> SomeResponderApplication LocalAddress ByteString IO b)
-> Versions
     NodeToClientVersion
     NodeToClientVersionData
     (OuroborosApplicationWithMinimalCtx
        'ResponderMode LocalAddress ByteString IO a b)
-> Versions
     NodeToClientVersion
     NodeToClientVersionData
     (SomeResponderApplication LocalAddress ByteString IO b)
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> Versions
  NodeToClientVersion
  NodeToClientVersionData
  (OuroborosApplicationWithMinimalCtx
     'ResponderMode LocalAddress ByteString IO a b)
versions)
    ErrorPolicies
errPolicies
    (\LocalAddress
_ Async IO IOManagerError
async -> Async IOManagerError -> IO IOManagerError
forall a. Async a -> IO a
Async.wait Async IOManagerError
Async IO IOManagerError
async)

type NetworkClientSubcriptionTracers
    = NetworkSubscriptionTracers Identity LocalAddress NodeToClientVersion


-- | 'ncSubscriptionWorker' which starts given application versions on each
-- established connection.
--
ncSubscriptionWorker
    :: forall mode x y.
       ( HasInitiator mode ~ True
       )
    => LocalSnocket
    -> NetworkClientSubcriptionTracers
    -> NetworkMutableState LocalAddress
    -> ClientSubscriptionParams ()
    -> Versions
        NodeToClientVersion
        NodeToClientVersionData
        (OuroborosApplicationWithMinimalCtx
           mode LocalAddress BL.ByteString IO x y)
    -> IO Void
ncSubscriptionWorker :: forall (mode :: Mode) x y.
(HasInitiator mode ~ 'True) =>
LocalSnocket
-> NetworkClientSubcriptionTracers
-> NetworkMutableState LocalAddress
-> ClientSubscriptionParams ()
-> Versions
     NodeToClientVersion
     NodeToClientVersionData
     (OuroborosApplicationWithMinimalCtx
        mode LocalAddress ByteString IO x y)
-> IO IOManagerError
ncSubscriptionWorker
  LocalSnocket
sn
  NetworkSubscriptionTracers
    { Tracer IO (Identity (SubscriptionTrace LocalAddress))
nsSubscriptionTracer :: Tracer IO (Identity (SubscriptionTrace LocalAddress))
nsSubscriptionTracer :: forall (withIPList :: * -> *) addr vNumber.
NetworkSubscriptionTracers withIPList addr vNumber
-> Tracer IO (withIPList (SubscriptionTrace addr))
nsSubscriptionTracer
    , Tracer IO (WithBearer (ConnectionId LocalAddress) Trace)
nsMuxTracer :: Tracer IO (WithBearer (ConnectionId LocalAddress) Trace)
nsMuxTracer :: forall (withIPList :: * -> *) addr vNumber.
NetworkSubscriptionTracers withIPList addr vNumber
-> Tracer IO (WithBearer (ConnectionId addr) Trace)
nsMuxTracer
    , Tracer
  IO
  (WithBearer
     (ConnectionId LocalAddress)
     (TraceSendRecv (Handshake NodeToClientVersion Term)))
nsHandshakeTracer :: Tracer
  IO
  (WithBearer
     (ConnectionId LocalAddress)
     (TraceSendRecv (Handshake NodeToClientVersion Term)))
nsHandshakeTracer :: forall (withIPList :: * -> *) addr vNumber.
NetworkSubscriptionTracers withIPList addr vNumber
-> Tracer
     IO
     (WithBearer
        (ConnectionId addr) (TraceSendRecv (Handshake vNumber Term)))
nsHandshakeTracer
    , Tracer IO (WithAddr LocalAddress ErrorPolicyTrace)
nsErrorPolicyTracer :: Tracer IO (WithAddr LocalAddress ErrorPolicyTrace)
nsErrorPolicyTracer :: forall (withIPList :: * -> *) addr vNumber.
NetworkSubscriptionTracers withIPList addr vNumber
-> Tracer IO (WithAddr addr ErrorPolicyTrace)
nsErrorPolicyTracer
    }
  NetworkMutableState LocalAddress
networkState
  ClientSubscriptionParams ()
subscriptionParams
  Versions
  NodeToClientVersion
  NodeToClientVersionData
  (OuroborosApplicationWithMinimalCtx
     mode LocalAddress ByteString IO x y)
versions
    = LocalSnocket
-> Tracer IO (SubscriptionTrace LocalAddress)
-> Tracer IO (WithAddr LocalAddress ErrorPolicyTrace)
-> NetworkMutableState LocalAddress
-> ClientSubscriptionParams ()
-> (LocalSocket -> IO ())
-> IO IOManagerError
forall a.
LocalSnocket
-> Tracer IO (SubscriptionTrace LocalAddress)
-> Tracer IO (WithAddr LocalAddress ErrorPolicyTrace)
-> NetworkMutableState LocalAddress
-> ClientSubscriptionParams a
-> (LocalSocket -> IO a)
-> IO IOManagerError
Subscription.clientSubscriptionWorker
        LocalSnocket
sn
        (SubscriptionTrace LocalAddress
-> Identity (SubscriptionTrace LocalAddress)
forall a. a -> Identity a
Identity (SubscriptionTrace LocalAddress
 -> Identity (SubscriptionTrace LocalAddress))
-> Tracer IO (Identity (SubscriptionTrace LocalAddress))
-> Tracer IO (SubscriptionTrace LocalAddress)
forall a' a. (a' -> a) -> Tracer IO a -> Tracer IO a'
forall (f :: * -> *) a' a.
Contravariant f =>
(a' -> a) -> f a -> f a'
`contramap` Tracer IO (Identity (SubscriptionTrace LocalAddress))
nsSubscriptionTracer)
        Tracer IO (WithAddr LocalAddress ErrorPolicyTrace)
nsErrorPolicyTracer
        NetworkMutableState LocalAddress
networkState
        ClientSubscriptionParams ()
subscriptionParams
        (IO (Either SomeException (Either x y)) -> IO ()
forall (f :: * -> *) a. Functor f => f a -> f ()
void (IO (Either SomeException (Either x y)) -> IO ())
-> (LocalSocket -> IO (Either SomeException (Either x y)))
-> LocalSocket
-> IO ()
forall b c a. (b -> c) -> (a -> b) -> a -> c
. LocalSnocket
-> MakeBearer IO LocalSocket
-> ConnectToArgs
     LocalSocket
     LocalAddress
     NodeToClientVersion
     NodeToClientVersionData
-> Versions
     NodeToClientVersion
     NodeToClientVersionData
     (OuroborosApplicationWithMinimalCtx
        mode LocalAddress ByteString IO x y)
-> LocalSocket
-> IO (Either SomeException (Either x y))
forall (muxMode :: Mode) vNumber vData fd addr a b.
(Ord vNumber, Typeable vNumber, Show vNumber,
 HasInitiator muxMode ~ 'True) =>
Snocket IO fd addr
-> MakeBearer IO fd
-> ConnectToArgs fd addr vNumber vData
-> Versions
     vNumber
     vData
     (OuroborosApplicationWithMinimalCtx muxMode addr ByteString IO a b)
-> fd
-> IO (Either SomeException (Either a b))
connectToNode'
          LocalSnocket
sn
          MakeBearer IO LocalSocket
makeLocalBearer
          ConnectToArgs {
            ctaHandshakeCodec :: Codec
  (Handshake NodeToClientVersion Term)
  DeserialiseFailure
  IO
  ByteString
ctaHandshakeCodec      = Codec
  (Handshake NodeToClientVersion Term)
  DeserialiseFailure
  IO
  ByteString
forall (m :: * -> *).
MonadST m =>
Codec
  (Handshake NodeToClientVersion Term)
  DeserialiseFailure
  m
  ByteString
nodeToClientHandshakeCodec,
            ctaHandshakeTimeLimits :: ProtocolTimeLimits (Handshake NodeToClientVersion Term)
ctaHandshakeTimeLimits = ProtocolTimeLimits (Handshake NodeToClientVersion Term)
forall {k} (vNumber :: k).
ProtocolTimeLimits (Handshake vNumber Term)
noTimeLimitsHandshake,
            ctaVersionDataCodec :: VersionDataCodec Term NodeToClientVersion NodeToClientVersionData
ctaVersionDataCodec    = (NodeToClientVersion -> CodecCBORTerm Text NodeToClientVersionData)
-> VersionDataCodec
     Term NodeToClientVersion NodeToClientVersionData
forall vNumber vData.
(vNumber -> CodecCBORTerm Text vData)
-> VersionDataCodec Term vNumber vData
cborTermVersionDataCodec NodeToClientVersion -> CodecCBORTerm Text NodeToClientVersionData
nodeToClientCodecCBORTerm,
            ctaConnectTracers :: NetworkConnectTracers LocalAddress NodeToClientVersion
ctaConnectTracers      = Tracer IO (WithBearer (ConnectionId LocalAddress) Trace)
-> Tracer
     IO
     (WithBearer
        (ConnectionId LocalAddress)
        (TraceSendRecv (Handshake NodeToClientVersion Term)))
-> NetworkConnectTracers LocalAddress NodeToClientVersion
forall addr vNumber.
Tracer IO (WithBearer (ConnectionId addr) Trace)
-> Tracer
     IO
     (WithBearer
        (ConnectionId addr) (TraceSendRecv (Handshake vNumber Term)))
-> NetworkConnectTracers addr vNumber
NetworkConnectTracers Tracer IO (WithBearer (ConnectionId LocalAddress) Trace)
nsMuxTracer Tracer
  IO
  (WithBearer
     (ConnectionId LocalAddress)
     (TraceSendRecv (Handshake NodeToClientVersion Term)))
nsHandshakeTracer,
            ctaHandshakeCallbacks :: HandshakeCallbacks NodeToClientVersionData
ctaHandshakeCallbacks  = (NodeToClientVersionData
 -> NodeToClientVersionData -> Accept NodeToClientVersionData)
-> (NodeToClientVersionData -> Bool)
-> HandshakeCallbacks NodeToClientVersionData
forall vData.
(vData -> vData -> Accept vData)
-> (vData -> Bool) -> HandshakeCallbacks vData
HandshakeCallbacks NodeToClientVersionData
-> NodeToClientVersionData -> Accept NodeToClientVersionData
forall v. Acceptable v => v -> v -> Accept v
acceptableVersion NodeToClientVersionData -> Bool
forall v. Queryable v => v -> Bool
queryVersion
          }
          Versions
  NodeToClientVersion
  NodeToClientVersionData
  (OuroborosApplicationWithMinimalCtx
     mode LocalAddress ByteString IO x y)
versions)

-- | 'ErrorPolicies' for client application.  Additional rules can be added by
-- means of a 'Semigroup' instance of 'ErrorPolicies'.
--
-- This error policies will try to preserve `subscriptionWorker`, e.g. if the
-- connect function throws an `IOException` we will suspend it for
-- a 'shortDelay', and try to re-connect.
--
-- This allows to recover from a situation where a node temporarily shutsdown,
-- or running a client application which is subscribed two more than one node
-- (possibly over network).
--
networkErrorPolicies :: ErrorPolicies
networkErrorPolicies :: ErrorPolicies
networkErrorPolicies = ErrorPolicies
    { epAppErrorPolicies :: [ErrorPolicy]
epAppErrorPolicies = [
        -- Handshake client protocol error: we either did not recognise received
        -- version or we refused it.  This is only for outbound connections to
        -- a local node, thus we throw the exception.
        (HandshakeProtocolError NodeToClientVersion
 -> Maybe (SuspendDecision DiffTime))
-> ErrorPolicy
forall e.
Exception e =>
(e -> Maybe (SuspendDecision DiffTime)) -> ErrorPolicy
ErrorPolicy
          ((HandshakeProtocolError NodeToClientVersion
  -> Maybe (SuspendDecision DiffTime))
 -> ErrorPolicy)
-> (HandshakeProtocolError NodeToClientVersion
    -> Maybe (SuspendDecision DiffTime))
-> ErrorPolicy
forall a b. (a -> b) -> a -> b
$ \(HandshakeProtocolError NodeToClientVersion
_ :: HandshakeProtocolError NodeToClientVersion)
                -> SuspendDecision DiffTime -> Maybe (SuspendDecision DiffTime)
forall a. a -> Maybe a
Just SuspendDecision DiffTime
ourBug

        -- exception thrown by `runPeerWithLimits`
        -- trusted node send too much input
      , (ProtocolLimitFailure -> Maybe (SuspendDecision DiffTime))
-> ErrorPolicy
forall e.
Exception e =>
(e -> Maybe (SuspendDecision DiffTime)) -> ErrorPolicy
ErrorPolicy
          ((ProtocolLimitFailure -> Maybe (SuspendDecision DiffTime))
 -> ErrorPolicy)
-> (ProtocolLimitFailure -> Maybe (SuspendDecision DiffTime))
-> ErrorPolicy
forall a b. (a -> b) -> a -> b
$ \(ProtocolLimitFailure
_ :: ProtocolLimitFailure)
                -> SuspendDecision DiffTime -> Maybe (SuspendDecision DiffTime)
forall a. a -> Maybe a
Just SuspendDecision DiffTime
ourBug

        -- deserialisation failure of a message from a trusted node
      , (DecoderFailure -> Maybe (SuspendDecision DiffTime)) -> ErrorPolicy
forall e.
Exception e =>
(e -> Maybe (SuspendDecision DiffTime)) -> ErrorPolicy
ErrorPolicy
          ((DecoderFailure -> Maybe (SuspendDecision DiffTime))
 -> ErrorPolicy)
-> (DecoderFailure -> Maybe (SuspendDecision DiffTime))
-> ErrorPolicy
forall a b. (a -> b) -> a -> b
$ \(DecoderFailure
_ :: DecoderFailure)
                -> SuspendDecision DiffTime -> Maybe (SuspendDecision DiffTime)
forall a. a -> Maybe a
Just SuspendDecision DiffTime
ourBug

      , (Error -> Maybe (SuspendDecision DiffTime)) -> ErrorPolicy
forall e.
Exception e =>
(e -> Maybe (SuspendDecision DiffTime)) -> ErrorPolicy
ErrorPolicy
          ((Error -> Maybe (SuspendDecision DiffTime)) -> ErrorPolicy)
-> (Error -> Maybe (SuspendDecision DiffTime)) -> ErrorPolicy
forall a b. (a -> b) -> a -> b
$ \Error
e -> case Error
e of
            Mx.UnknownMiniProtocol {}    -> SuspendDecision DiffTime -> Maybe (SuspendDecision DiffTime)
forall a. a -> Maybe a
Just SuspendDecision DiffTime
ourBug
            Mx.IngressQueueOverRun {}    -> SuspendDecision DiffTime -> Maybe (SuspendDecision DiffTime)
forall a. a -> Maybe a
Just SuspendDecision DiffTime
ourBug
            Mx.InitiatorOnly {}          -> SuspendDecision DiffTime -> Maybe (SuspendDecision DiffTime)
forall a. a -> Maybe a
Just SuspendDecision DiffTime
ourBug
            Mx.Shutdown {}               -> SuspendDecision DiffTime -> Maybe (SuspendDecision DiffTime)
forall a. a -> Maybe a
Just SuspendDecision DiffTime
ourBug

            -- in case of bearer closed / or IOException we suspend
            -- the peer for a short time
            --
            -- TODO: the same notes apply as to
            -- 'NodeToNode.networkErrorPolicies'
            Mx.BearerClosed {}      -> SuspendDecision DiffTime -> Maybe (SuspendDecision DiffTime)
forall a. a -> Maybe a
Just (DiffTime -> DiffTime -> SuspendDecision DiffTime
forall t. t -> t -> SuspendDecision t
SuspendPeer DiffTime
shortDelay DiffTime
shortDelay)
            Mx.IOException {}       -> SuspendDecision DiffTime -> Maybe (SuspendDecision DiffTime)
forall a. a -> Maybe a
Just (DiffTime -> DiffTime -> SuspendDecision DiffTime
forall t. t -> t -> SuspendDecision t
SuspendPeer DiffTime
shortDelay DiffTime
shortDelay)
            Mx.SDUDecodeError {}    -> SuspendDecision DiffTime -> Maybe (SuspendDecision DiffTime)
forall a. a -> Maybe a
Just SuspendDecision DiffTime
ourBug
            Error
Mx.SDUReadTimeout       -> SuspendDecision DiffTime -> Maybe (SuspendDecision DiffTime)
forall a. a -> Maybe a
Just (DiffTime -> DiffTime -> SuspendDecision DiffTime
forall t. t -> t -> SuspendDecision t
SuspendPeer DiffTime
shortDelay DiffTime
shortDelay)
            Error
Mx.SDUWriteTimeout      -> SuspendDecision DiffTime -> Maybe (SuspendDecision DiffTime)
forall a. a -> Maybe a
Just (DiffTime -> DiffTime -> SuspendDecision DiffTime
forall t. t -> t -> SuspendDecision t
SuspendPeer DiffTime
shortDelay DiffTime
shortDelay)

      , (RuntimeError -> Maybe (SuspendDecision DiffTime)) -> ErrorPolicy
forall e.
Exception e =>
(e -> Maybe (SuspendDecision DiffTime)) -> ErrorPolicy
ErrorPolicy
          ((RuntimeError -> Maybe (SuspendDecision DiffTime)) -> ErrorPolicy)
-> (RuntimeError -> Maybe (SuspendDecision DiffTime))
-> ErrorPolicy
forall a b. (a -> b) -> a -> b
$ \(RuntimeError
e :: Mx.RuntimeError)
                -> case RuntimeError
e of
                     Mx.ProtocolAlreadyRunning       {} -> SuspendDecision DiffTime -> Maybe (SuspendDecision DiffTime)
forall a. a -> Maybe a
Just SuspendDecision DiffTime
ourBug
                     Mx.UnknownProtocolInternalError {} -> SuspendDecision DiffTime -> Maybe (SuspendDecision DiffTime)
forall a. a -> Maybe a
Just SuspendDecision DiffTime
ourBug
                     Mx.BlockedOnCompletionVar       {} -> SuspendDecision DiffTime -> Maybe (SuspendDecision DiffTime)
forall a. a -> Maybe a
Just SuspendDecision DiffTime
ourBug

        -- Error thrown by 'IOManager', this is fatal on Windows, and it will
        -- never fire on other platofrms.
      , (IOManagerError -> Maybe (SuspendDecision DiffTime)) -> ErrorPolicy
forall e.
Exception e =>
(e -> Maybe (SuspendDecision DiffTime)) -> ErrorPolicy
ErrorPolicy
          ((IOManagerError -> Maybe (SuspendDecision DiffTime))
 -> ErrorPolicy)
-> (IOManagerError -> Maybe (SuspendDecision DiffTime))
-> ErrorPolicy
forall a b. (a -> b) -> a -> b
$ \(IOManagerError
_ :: IOManagerError)
                -> SuspendDecision DiffTime -> Maybe (SuspendDecision DiffTime)
forall a. a -> Maybe a
Just SuspendDecision DiffTime
forall t. SuspendDecision t
Throw

        -- Using 'error' throws.
      , (ErrorCall -> Maybe (SuspendDecision DiffTime)) -> ErrorPolicy
forall e.
Exception e =>
(e -> Maybe (SuspendDecision DiffTime)) -> ErrorPolicy
ErrorPolicy
          ((ErrorCall -> Maybe (SuspendDecision DiffTime)) -> ErrorPolicy)
-> (ErrorCall -> Maybe (SuspendDecision DiffTime)) -> ErrorPolicy
forall a b. (a -> b) -> a -> b
$ \(ErrorCall
_ :: ErrorCall)
                -> SuspendDecision DiffTime -> Maybe (SuspendDecision DiffTime)
forall a. a -> Maybe a
Just SuspendDecision DiffTime
forall t. SuspendDecision t
Throw

        -- Using 'panic' throws.
      , (FatalError -> Maybe (SuspendDecision DiffTime)) -> ErrorPolicy
forall e.
Exception e =>
(e -> Maybe (SuspendDecision DiffTime)) -> ErrorPolicy
ErrorPolicy
          ((FatalError -> Maybe (SuspendDecision DiffTime)) -> ErrorPolicy)
-> (FatalError -> Maybe (SuspendDecision DiffTime)) -> ErrorPolicy
forall a b. (a -> b) -> a -> b
$ \(FatalError
_ :: FatalError)
                -> SuspendDecision DiffTime -> Maybe (SuspendDecision DiffTime)
forall a. a -> Maybe a
Just SuspendDecision DiffTime
forall t. SuspendDecision t
Throw
      ]
    , epConErrorPolicies :: [ErrorPolicy]
epConErrorPolicies = [
        -- If an 'IOException' is thrown by the 'connect' call we suspend the
        -- peer for 'shortDelay' and we will try to re-connect to it after that
        -- period.
        (IOException -> Maybe (SuspendDecision DiffTime)) -> ErrorPolicy
forall e.
Exception e =>
(e -> Maybe (SuspendDecision DiffTime)) -> ErrorPolicy
ErrorPolicy ((IOException -> Maybe (SuspendDecision DiffTime)) -> ErrorPolicy)
-> (IOException -> Maybe (SuspendDecision DiffTime)) -> ErrorPolicy
forall a b. (a -> b) -> a -> b
$ \(IOException
_ :: IOException) -> SuspendDecision DiffTime -> Maybe (SuspendDecision DiffTime)
forall a. a -> Maybe a
Just (SuspendDecision DiffTime -> Maybe (SuspendDecision DiffTime))
-> SuspendDecision DiffTime -> Maybe (SuspendDecision DiffTime)
forall a b. (a -> b) -> a -> b
$
          DiffTime -> DiffTime -> SuspendDecision DiffTime
forall t. t -> t -> SuspendDecision t
SuspendPeer DiffTime
shortDelay DiffTime
shortDelay

      , (IOManagerError -> Maybe (SuspendDecision DiffTime)) -> ErrorPolicy
forall e.
Exception e =>
(e -> Maybe (SuspendDecision DiffTime)) -> ErrorPolicy
ErrorPolicy
          ((IOManagerError -> Maybe (SuspendDecision DiffTime))
 -> ErrorPolicy)
-> (IOManagerError -> Maybe (SuspendDecision DiffTime))
-> ErrorPolicy
forall a b. (a -> b) -> a -> b
$ \(IOManagerError
_ :: IOManagerError)
                -> SuspendDecision DiffTime -> Maybe (SuspendDecision DiffTime)
forall a. a -> Maybe a
Just SuspendDecision DiffTime
forall t. SuspendDecision t
Throw
      ]
    }
  where
    ourBug :: SuspendDecision DiffTime
    ourBug :: SuspendDecision DiffTime
ourBug = SuspendDecision DiffTime
forall t. SuspendDecision t
Throw

    shortDelay :: DiffTime
    shortDelay :: DiffTime
shortDelay = DiffTime
20 -- seconds

type LocalConnectionId = ConnectionId LocalAddress

--
-- Null Protocol Peers
--

chainSyncPeerNull
    :: forall (header :: Type) (point :: Type) (tip :: Type) m a. MonadDelay m
    => Client (ChainSync.ChainSync header point tip)
               NonPipelined ChainSync.StIdle m a
chainSyncPeerNull :: forall header point tip (m :: * -> *) a.
MonadDelay m =>
Client (ChainSync header point tip) 'NonPipelined 'StIdle m a
chainSyncPeerNull =
    ChainSyncClient header point tip m a
-> Client (ChainSync header point tip) 'NonPipelined 'StIdle m a
forall header point tip (m :: * -> *) a.
Monad m =>
ChainSyncClient header point tip m a
-> Client (ChainSync header point tip) 'NonPipelined 'StIdle m a
ChainSync.chainSyncClientPeer
      (m (ClientStIdle header point tip m a)
-> ChainSyncClient header point tip m a
forall header point tip (m :: * -> *) a.
m (ClientStIdle header point tip m a)
-> ChainSyncClient header point tip m a
ChainSync.ChainSyncClient m (ClientStIdle header point tip m a)
forall (m :: * -> *) a. MonadDelay m => m a
untilTheCowsComeHome )

localStateQueryPeerNull
    :: forall (block :: Type) (point :: Type) (query :: Type -> Type) m a.
       MonadDelay m
    => Stateful.Client (LocalStateQuery.LocalStateQuery block point query)
                       LocalStateQuery.StIdle LocalStateQuery.State m a
localStateQueryPeerNull :: forall block point (query :: * -> *) (m :: * -> *) a.
MonadDelay m =>
Client (LocalStateQuery block point query) 'StIdle State m a
localStateQueryPeerNull =
    LocalStateQueryClient block point query m a
-> Client (LocalStateQuery block point query) 'StIdle State m a
forall block point (query :: * -> *) (m :: * -> *) a.
Monad m =>
LocalStateQueryClient block point query m a
-> Client (LocalStateQuery block point query) 'StIdle State m a
LocalStateQuery.localStateQueryClientPeer
      (m (ClientStIdle block point query m a)
-> LocalStateQueryClient block point query m a
forall block point (query :: * -> *) (m :: * -> *) a.
m (ClientStIdle block point query m a)
-> LocalStateQueryClient block point query m a
LocalStateQuery.LocalStateQueryClient m (ClientStIdle block point query m a)
forall (m :: * -> *) a. MonadDelay m => m a
untilTheCowsComeHome)

localTxSubmissionPeerNull
    :: forall (tx :: Type) (reject :: Type) m a. MonadDelay m
    => Client (LocalTxSubmission.LocalTxSubmission tx reject)
              NonPipelined LocalTxSubmission.StIdle m a
localTxSubmissionPeerNull :: forall tx reject (m :: * -> *) a.
MonadDelay m =>
Client (LocalTxSubmission tx reject) 'NonPipelined 'StIdle m a
localTxSubmissionPeerNull =
    LocalTxSubmissionClient tx reject m a
-> Client (LocalTxSubmission tx reject) 'NonPipelined 'StIdle m a
forall tx reject (m :: * -> *) a.
Monad m =>
LocalTxSubmissionClient tx reject m a
-> Client (LocalTxSubmission tx reject) 'NonPipelined 'StIdle m a
LocalTxSubmission.localTxSubmissionClientPeer
      (m (LocalTxClientStIdle tx reject m a)
-> LocalTxSubmissionClient tx reject m a
forall tx reject (m :: * -> *) a.
m (LocalTxClientStIdle tx reject m a)
-> LocalTxSubmissionClient tx reject m a
LocalTxSubmission.LocalTxSubmissionClient m (LocalTxClientStIdle tx reject m a)
forall (m :: * -> *) a. MonadDelay m => m a
untilTheCowsComeHome)

localTxMonitorPeerNull
    :: forall (txid :: Type) (tx :: Type) (slot :: Type) m a. MonadDelay m
    => Client (LocalTxMonitor.LocalTxMonitor txid tx slot)
              NonPipelined LocalTxMonitor.StIdle m a
localTxMonitorPeerNull :: forall txid tx slot (m :: * -> *) a.
MonadDelay m =>
Client (LocalTxMonitor txid tx slot) 'NonPipelined 'StIdle m a
localTxMonitorPeerNull =
    LocalTxMonitorClient txid tx slot m a
-> Client (LocalTxMonitor txid tx slot) 'NonPipelined 'StIdle m a
forall txid tx slot (m :: * -> *) a.
Monad m =>
LocalTxMonitorClient txid tx slot m a
-> Client (LocalTxMonitor txid tx slot) 'NonPipelined 'StIdle m a
LocalTxMonitor.localTxMonitorClientPeer
      (m (ClientStIdle txid tx slot m a)
-> LocalTxMonitorClient txid tx slot m a
forall txid tx slot (m :: * -> *) a.
m (ClientStIdle txid tx slot m a)
-> LocalTxMonitorClient txid tx slot m a
LocalTxMonitor.LocalTxMonitorClient m (ClientStIdle txid tx slot m a)
forall (m :: * -> *) a. MonadDelay m => m a
untilTheCowsComeHome)

-- ;)
untilTheCowsComeHome :: MonadDelay m => m a
untilTheCowsComeHome :: forall (m :: * -> *) a. MonadDelay m => m a
untilTheCowsComeHome = m () -> m a
forall (f :: * -> *) a b. Applicative f => f a -> f b
forever (m () -> m a) -> m () -> m a
forall a b. (a -> b) -> a -> b
$ DiffTime -> m ()
forall (m :: * -> *). MonadDelay m => DiffTime -> m ()
threadDelay DiffTime
43200 {- day in seconds -}