{-# LANGUAGE DataKinds        #-}
{-# LANGUAGE FlexibleContexts #-}
{-# LANGUAGE RankNTypes       #-}

module DMQ.NodeToClient
  ( module DMQ.NodeToClient.Version
  , Protocols (..)
  , HandshakeTr
  , Apps
  , dmqCodecs
  , ntcApps
  , ntcHandshakeArguments
  , responders
  ) where

import Data.Aeson qualified as Aeson
import Data.ByteString.Lazy (ByteString)
import Data.Functor.Contravariant ((>$<))
import Data.Typeable (Typeable)
import Data.Void
import Data.Word

import Control.Concurrent.Class.MonadSTM
import Control.Monad.Class.MonadFork
import Control.Monad.Class.MonadST (MonadST)
import Control.Monad.Class.MonadThrow
import Control.Tracer (Tracer, nullTracer)

import Codec.CBOR.Decoding qualified as CBOR
import Codec.CBOR.Encoding qualified as CBOR
import Codec.CBOR.Term qualified as CBOR

import Cardano.KESAgent.KES.Crypto (Crypto (..))

import Network.Mux qualified as Mx
import Network.TypedProtocol.Codec hiding (decode, encode)
import Network.TypedProtocol.Codec.CBOR qualified as CBOR

import DMQ.Configuration
import DMQ.NodeToClient.LocalMsgNotification
import DMQ.NodeToClient.LocalMsgSubmission
import DMQ.NodeToClient.Version
import DMQ.Protocol.LocalMsgNotification.Codec
import DMQ.Protocol.LocalMsgNotification.Server
import DMQ.Protocol.LocalMsgNotification.Type
import DMQ.Protocol.LocalMsgSubmission.Codec
import DMQ.Protocol.LocalMsgSubmission.Server
import DMQ.Protocol.LocalMsgSubmission.Type
import DMQ.Protocol.SigSubmission.Type (Sig, SigId, sigId)
import DMQ.Tracer

import Ouroboros.Network.Context
import Ouroboros.Network.Driver.Simple
import Ouroboros.Network.Handshake.Acceptable (Acceptable (..))
import Ouroboros.Network.Handshake.Queryable (Queryable (..))
import Ouroboros.Network.Mux
import Ouroboros.Network.OrphanInstances ()
import Ouroboros.Network.Protocol.Handshake (Handshake, HandshakeArguments (..))
import Ouroboros.Network.Protocol.Handshake.Codec (cborTermVersionDataCodec,
           codecHandshake, noTimeLimitsHandshake)
import Ouroboros.Network.TxSubmission.Inbound.V2.Types
           (TxSubmissionMempoolWriter)
import Ouroboros.Network.TxSubmission.Mempool.Reader


type HandshakeTr ntcAddr = Mx.WithBearer (ConnectionId ntcAddr) (TraceSendRecv (Handshake NodeToClientVersion CBOR.Term))

ntcHandshakeArguments
  :: MonadST m
  => Tracer m (HandshakeTr ntcAddr)
  -> HandshakeArguments
      (ConnectionId ntcAddr)
      NodeToClientVersion
      NodeToClientVersionData
      m
ntcHandshakeArguments :: forall (m :: * -> *) ntcAddr.
MonadST m =>
Tracer m (HandshakeTr ntcAddr)
-> HandshakeArguments
     (ConnectionId ntcAddr)
     NodeToClientVersion
     NodeToClientVersionData
     m
ntcHandshakeArguments Tracer m (HandshakeTr ntcAddr)
tracer =
  HandshakeArguments {
    haHandshakeTracer :: Tracer m (HandshakeTr ntcAddr)
haHandshakeTracer  = Tracer m (HandshakeTr ntcAddr)
tracer
  , haBearerTracer :: Tracer m (WithBearer (ConnectionId ntcAddr) BearerTrace)
haBearerTracer     = Tracer m (WithBearer (ConnectionId ntcAddr) BearerTrace)
forall (m :: * -> *) a. Applicative m => Tracer m a
nullTracer -- TODO
  , haHandshakeCodec :: Codec
  (Handshake NodeToClientVersion Term)
  DeserialiseFailure
  m
  ByteString
haHandshakeCodec   = CodecCBORTerm (Text, Maybe Int) NodeToClientVersion
-> Codec
     (Handshake NodeToClientVersion Term)
     DeserialiseFailure
     m
     ByteString
forall vNumber (m :: * -> *) failure.
(MonadST m, Ord vNumber, Show failure) =>
CodecCBORTerm (failure, Maybe Int) vNumber
-> Codec (Handshake vNumber Term) DeserialiseFailure m ByteString
codecHandshake CodecCBORTerm (Text, Maybe Int) NodeToClientVersion
nodeToClientVersionCodec
  , haVersionDataCodec :: VersionDataCodec Term NodeToClientVersion NodeToClientVersionData
haVersionDataCodec =
      (NodeToClientVersion -> CodecCBORTerm Text NodeToClientVersionData)
-> VersionDataCodec
     Term NodeToClientVersion NodeToClientVersionData
forall vNumber vData.
(vNumber -> CodecCBORTerm Text vData)
-> VersionDataCodec Term vNumber vData
cborTermVersionDataCodec
        NodeToClientVersion -> CodecCBORTerm Text NodeToClientVersionData
nodeToClientCodecCBORTerm
  , haAcceptVersion :: NodeToClientVersionData
-> NodeToClientVersionData -> Accept NodeToClientVersionData
haAcceptVersion = NodeToClientVersionData
-> NodeToClientVersionData -> Accept NodeToClientVersionData
forall v. Acceptable v => v -> v -> Accept v
acceptableVersion
  , haQueryVersion :: NodeToClientVersionData -> Bool
haQueryVersion  = NodeToClientVersionData -> Bool
forall v. Queryable v => v -> Bool
queryVersion
  , haTimeLimits :: ProtocolTimeLimits (Handshake NodeToClientVersion Term)
haTimeLimits    = ProtocolTimeLimits (Handshake NodeToClientVersion Term)
forall {k} (vNumber :: k).
ProtocolTimeLimits (Handshake vNumber Term)
noTimeLimitsHandshake
  }


data Codecs crypto m =
  Codecs {
    forall crypto (m :: * -> *).
Codecs crypto m
-> AnnotatedCodec
     (LocalMsgSubmission (Sig crypto)) DeserialiseFailure m ByteString
msgSubmissionCodec
      :: !(AnnotatedCodec (LocalMsgSubmission (Sig crypto))
                 CBOR.DeserialiseFailure m ByteString)
  , forall crypto (m :: * -> *).
Codecs crypto m
-> AnnotatedCodec
     (LocalMsgNotification (Sig crypto)) DeserialiseFailure m ByteString
msgNotificationCodec
      :: !(AnnotatedCodec (LocalMsgNotification (Sig crypto))
               CBOR.DeserialiseFailure m ByteString)
  }

dmqCodecs :: ( MonadST m
             , Crypto crypto
             )
          => (SigMempoolFail -> CBOR.Encoding)
          -> (forall s. CBOR.Decoder s SigMempoolFail)
          -> Codecs crypto m
dmqCodecs :: forall (m :: * -> *) crypto.
(MonadST m, Crypto crypto) =>
(SigMempoolFail -> Encoding)
-> (forall s. Decoder s SigMempoolFail) -> Codecs crypto m
dmqCodecs SigMempoolFail -> Encoding
encodeReject' forall s. Decoder s SigMempoolFail
decodeReject' =
  Codecs {
    msgSubmissionCodec :: AnnotatedCodec
  (LocalMsgSubmission (Sig crypto)) DeserialiseFailure m ByteString
msgSubmissionCodec   = (SigMempoolFail -> Encoding)
-> (forall s. Decoder s SigMempoolFail)
-> AnnotatedCodec
     (LocalMsgSubmission (Sig crypto)) DeserialiseFailure m ByteString
forall crypto (m :: * -> *).
(MonadST m, Crypto crypto) =>
(SigMempoolFail -> Encoding)
-> (forall s. Decoder s SigMempoolFail)
-> AnnotatedCodec
     (LocalMsgSubmission (Sig crypto)) DeserialiseFailure m ByteString
codecLocalMsgSubmission SigMempoolFail -> Encoding
encodeReject' Decoder s SigMempoolFail
forall s. Decoder s SigMempoolFail
decodeReject'
  , msgNotificationCodec :: AnnotatedCodec
  (LocalMsgNotification (Sig crypto)) DeserialiseFailure m ByteString
msgNotificationCodec = AnnotatedCodec
  (LocalMsgNotification (Sig crypto)) DeserialiseFailure m ByteString
forall crypto (m :: * -> *).
(MonadST m, Crypto crypto) =>
AnnotatedCodec
  (LocalMsgNotification (Sig crypto)) DeserialiseFailure m ByteString
codecLocalMsgNotification
  }


-- | A node-to-client application
--
type App ntcAddr m a =
     NodeToClientVersion
  -> ResponderContext ntcAddr
  -> Mx.Channel m ByteString
  -> m (a, Maybe ByteString)


data Apps ntcAddr m a =
  Apps {
    -- | Start a sig-submission client
    forall ntcAddr (m :: * -> *) a. Apps ntcAddr m a -> App ntcAddr m a
aLocalMsgSubmission   :: !(App ntcAddr m a)

    -- | Start a sig-submission server
  , forall ntcAddr (m :: * -> *) a. Apps ntcAddr m a -> App ntcAddr m a
aLocalMsgNotification :: !(App ntcAddr m a)
  }


-- | Construct applications for the node-to-client protocols
--
ntcApps
  :: forall crypto idx ntcAddr m.
     ( MonadThrow m
     , MonadThread m
     , MonadSTM m
     , Crypto crypto
     , Typeable crypto
     , Aeson.ToJSON ntcAddr
     )
  => (forall ev. Aeson.ToJSON ev => Tracer m (WithEventType ev))
  -> Configuration
  -> TxSubmissionMempoolReader SigId (Sig crypto) idx m
  -> TxSubmissionMempoolWriter SigId (Sig crypto) idx m
  -> Word16
  -> Codecs crypto m
  -> Apps ntcAddr m ()
ntcApps :: forall crypto idx ntcAddr (m :: * -> *).
(MonadThrow m, MonadThread m, MonadSTM m, Crypto crypto,
 Typeable crypto, ToJSON ntcAddr) =>
(forall ev. ToJSON ev => Tracer m (WithEventType ev))
-> Configuration
-> TxSubmissionMempoolReader SigId (Sig crypto) idx m
-> TxSubmissionMempoolWriter SigId (Sig crypto) idx m
-> Word16
-> Codecs crypto m
-> Apps ntcAddr m ()
ntcApps forall ev. ToJSON ev => Tracer m (WithEventType ev)
tracer
        Configuration { dmqcLocalMsgSubmissionServerProtocolTracer :: forall (f :: * -> *). Configuration' f -> f Bool
dmqcLocalMsgSubmissionServerProtocolTracer   = I Bool
localMsgSubmissionServerProtocolTracer,
                        dmqcLocalMsgNotificationServerProtocolTracer :: forall (f :: * -> *). Configuration' f -> f Bool
dmqcLocalMsgNotificationServerProtocolTracer = I Bool
localMsgNotificationServerProtocolTracer,
                        dmqcLocalMsgSubmissionServerTracer :: forall (f :: * -> *). Configuration' f -> f Bool
dmqcLocalMsgSubmissionServerTracer           = I Bool
localMsgSubmissionServerTracer
                      }
        TxSubmissionMempoolReader SigId (Sig crypto) idx m
mempoolReader
        TxSubmissionMempoolWriter SigId (Sig crypto) idx m
mempoolWriter
        Word16
maxMsgs
        Codecs { AnnotatedCodec
  (LocalMsgSubmission (Sig crypto)) DeserialiseFailure m ByteString
msgSubmissionCodec :: forall crypto (m :: * -> *).
Codecs crypto m
-> AnnotatedCodec
     (LocalMsgSubmission (Sig crypto)) DeserialiseFailure m ByteString
msgSubmissionCodec :: AnnotatedCodec
  (LocalMsgSubmission (Sig crypto)) DeserialiseFailure m ByteString
msgSubmissionCodec, AnnotatedCodec
  (LocalMsgNotification (Sig crypto)) DeserialiseFailure m ByteString
msgNotificationCodec :: forall crypto (m :: * -> *).
Codecs crypto m
-> AnnotatedCodec
     (LocalMsgNotification (Sig crypto)) DeserialiseFailure m ByteString
msgNotificationCodec :: AnnotatedCodec
  (LocalMsgNotification (Sig crypto)) DeserialiseFailure m ByteString
msgNotificationCodec } =
  Apps {
    App ntcAddr m ()
aLocalMsgSubmission :: App ntcAddr m ()
aLocalMsgSubmission :: App ntcAddr m ()
aLocalMsgSubmission
  , App ntcAddr m ()
aLocalMsgNotification :: App ntcAddr m ()
aLocalMsgNotification :: App ntcAddr m ()
aLocalMsgNotification
  }
  where
    aLocalMsgSubmission :: App ntcAddr m ()
aLocalMsgSubmission NodeToClientVersion
_version ResponderContext { rcConnectionId :: forall addr. ResponderContext addr -> ConnectionId addr
rcConnectionId = ConnectionId ntcAddr
connId } Channel m ByteString
channel = do
      String -> m ()
forall (m :: * -> *). MonadThread m => String -> m ()
labelThisThread String
"LocalMsgSubmission.Server"
      Tracer m (TraceSendRecv (LocalMsgSubmission (Sig crypto)))
-> AnnotatedCodec
     (LocalMsgSubmission (Sig crypto)) DeserialiseFailure m ByteString
-> Channel m ByteString
-> Peer
     (LocalMsgSubmission (Sig crypto))
     'AsServer
     'NonPipelined
     'StIdle
     m
     ()
-> m ((), Maybe ByteString)
forall ps (st :: ps) (pr :: PeerRole) failure bytes (m :: * -> *)
       a.
(MonadThrow m, Monoid bytes, ShowProxy ps,
 forall (st' :: ps) stok. (stok ~ StateToken st') => Show stok,
 Show failure) =>
Tracer m (TraceSendRecv ps)
-> AnnotatedCodec ps failure m bytes
-> Channel m bytes
-> Peer ps pr 'NonPipelined st m a
-> m (a, Maybe bytes)
runAnnotatedPeer
        (if Bool
localMsgSubmissionServerProtocolTracer
           then String
-> WithBearer
     (ConnectionId ntcAddr)
     (TraceSendRecv (LocalMsgSubmission (Sig crypto)))
-> WithEventType
     (WithBearer
        (ConnectionId ntcAddr)
        (TraceSendRecv (LocalMsgSubmission (Sig crypto))))
forall a. String -> a -> WithEventType a
WithEventType String
"LocalMsgSubmission.Protocol.Server" (WithBearer
   (ConnectionId ntcAddr)
   (TraceSendRecv (LocalMsgSubmission (Sig crypto)))
 -> WithEventType
      (WithBearer
         (ConnectionId ntcAddr)
         (TraceSendRecv (LocalMsgSubmission (Sig crypto)))))
-> (TraceSendRecv (LocalMsgSubmission (Sig crypto))
    -> WithBearer
         (ConnectionId ntcAddr)
         (TraceSendRecv (LocalMsgSubmission (Sig crypto))))
-> TraceSendRecv (LocalMsgSubmission (Sig crypto))
-> WithEventType
     (WithBearer
        (ConnectionId ntcAddr)
        (TraceSendRecv (LocalMsgSubmission (Sig crypto))))
forall b c a. (b -> c) -> (a -> b) -> a -> c
. ConnectionId ntcAddr
-> TraceSendRecv (LocalMsgSubmission (Sig crypto))
-> WithBearer
     (ConnectionId ntcAddr)
     (TraceSendRecv (LocalMsgSubmission (Sig crypto)))
forall peerid a. peerid -> a -> WithBearer peerid a
Mx.WithBearer ConnectionId ntcAddr
connId (TraceSendRecv (LocalMsgSubmission (Sig crypto))
 -> WithEventType
      (WithBearer
         (ConnectionId ntcAddr)
         (TraceSendRecv (LocalMsgSubmission (Sig crypto)))))
-> Tracer
     m
     (WithEventType
        (WithBearer
           (ConnectionId ntcAddr)
           (TraceSendRecv (LocalMsgSubmission (Sig crypto)))))
-> Tracer m (TraceSendRecv (LocalMsgSubmission (Sig crypto)))
forall (f :: * -> *) a b. Contravariant f => (a -> b) -> f b -> f a
>$< Tracer
  m
  (WithEventType
     (WithBearer
        (ConnectionId ntcAddr)
        (TraceSendRecv (LocalMsgSubmission (Sig crypto)))))
forall ev. ToJSON ev => Tracer m (WithEventType ev)
tracer
           else Tracer m (TraceSendRecv (LocalMsgSubmission (Sig crypto)))
forall (m :: * -> *) a. Applicative m => Tracer m a
nullTracer)
        AnnotatedCodec
  (LocalMsgSubmission (Sig crypto)) DeserialiseFailure m ByteString
msgSubmissionCodec
        Channel m ByteString
channel
        (m (LocalMsgSubmissionServer (Sig crypto) m ())
-> Peer
     (LocalMsgSubmission (Sig crypto))
     'AsServer
     'NonPipelined
     'StIdle
     m
     ()
forall msg (m :: * -> *) a.
Monad m =>
m (LocalMsgSubmissionServer msg m a)
-> Server (LocalMsgSubmission msg) 'NonPipelined 'StIdle m a
localMsgSubmissionServerPeer (m (LocalMsgSubmissionServer (Sig crypto) m ())
 -> Peer
      (LocalMsgSubmission (Sig crypto))
      'AsServer
      'NonPipelined
      'StIdle
      m
      ())
-> m (LocalMsgSubmissionServer (Sig crypto) m ())
-> Peer
     (LocalMsgSubmission (Sig crypto))
     'AsServer
     'NonPipelined
     'StIdle
     m
     ()
forall a b. (a -> b) -> a -> b
$
          (Sig crypto -> SigId)
-> Tracer m (TraceLocalMsgSubmission SigId)
-> TxSubmissionMempoolWriter SigId (Sig crypto) idx m
-> m (LocalMsgSubmissionServer (Sig crypto) m ())
forall (m :: * -> *) msg msgid idx.
MonadSTM m =>
(msg -> msgid)
-> Tracer m (TraceLocalMsgSubmission msgid)
-> TxSubmissionMempoolWriter msgid msg idx m
-> m (LocalMsgSubmissionServer msg m ())
localMsgSubmissionServer
            Sig crypto -> SigId
forall crypto. Sig crypto -> SigId
sigId
            -- TODO: use a separate option for this tracer rather than reusing
            -- `dmqLocalMsgSubmissionServerTracer`.
            (if Bool
localMsgSubmissionServerTracer
               then String
-> WithBearer
     (ConnectionId ntcAddr) (TraceLocalMsgSubmission SigId)
-> WithEventType
     (WithBearer (ConnectionId ntcAddr) (TraceLocalMsgSubmission SigId))
forall a. String -> a -> WithEventType a
WithEventType String
"LocalMsgSubmission.Server" (WithBearer (ConnectionId ntcAddr) (TraceLocalMsgSubmission SigId)
 -> WithEventType
      (WithBearer
         (ConnectionId ntcAddr) (TraceLocalMsgSubmission SigId)))
-> (TraceLocalMsgSubmission SigId
    -> WithBearer
         (ConnectionId ntcAddr) (TraceLocalMsgSubmission SigId))
-> TraceLocalMsgSubmission SigId
-> WithEventType
     (WithBearer (ConnectionId ntcAddr) (TraceLocalMsgSubmission SigId))
forall b c a. (b -> c) -> (a -> b) -> a -> c
. ConnectionId ntcAddr
-> TraceLocalMsgSubmission SigId
-> WithBearer
     (ConnectionId ntcAddr) (TraceLocalMsgSubmission SigId)
forall peerid a. peerid -> a -> WithBearer peerid a
Mx.WithBearer ConnectionId ntcAddr
connId (TraceLocalMsgSubmission SigId
 -> WithEventType
      (WithBearer
         (ConnectionId ntcAddr) (TraceLocalMsgSubmission SigId)))
-> Tracer
     m
     (WithEventType
        (WithBearer
           (ConnectionId ntcAddr) (TraceLocalMsgSubmission SigId)))
-> Tracer m (TraceLocalMsgSubmission SigId)
forall (f :: * -> *) a b. Contravariant f => (a -> b) -> f b -> f a
>$< Tracer
  m
  (WithEventType
     (WithBearer
        (ConnectionId ntcAddr) (TraceLocalMsgSubmission SigId)))
forall ev. ToJSON ev => Tracer m (WithEventType ev)
tracer
               else Tracer m (TraceLocalMsgSubmission SigId)
forall (m :: * -> *) a. Applicative m => Tracer m a
nullTracer)
            TxSubmissionMempoolWriter SigId (Sig crypto) idx m
mempoolWriter)

    aLocalMsgNotification :: App ntcAddr m ()
aLocalMsgNotification NodeToClientVersion
_version ResponderContext { rcConnectionId :: forall addr. ResponderContext addr -> ConnectionId addr
rcConnectionId = ConnectionId ntcAddr
connId } Channel m ByteString
channel = do
      String -> m ()
forall (m :: * -> *). MonadThread m => String -> m ()
labelThisThread String
"LocalMsgNotification.Server"
      Tracer m (TraceSendRecv (LocalMsgNotification (Sig crypto)))
-> AnnotatedCodec
     (LocalMsgNotification (Sig crypto)) DeserialiseFailure m ByteString
-> Channel m ByteString
-> Peer
     (LocalMsgNotification (Sig crypto))
     'AsServer
     'NonPipelined
     StIdle
     m
     ()
-> m ((), Maybe ByteString)
forall ps (st :: ps) (pr :: PeerRole) failure bytes (m :: * -> *)
       a.
(MonadThrow m, Monoid bytes, ShowProxy ps,
 forall (st' :: ps) stok. (stok ~ StateToken st') => Show stok,
 Show failure) =>
Tracer m (TraceSendRecv ps)
-> AnnotatedCodec ps failure m bytes
-> Channel m bytes
-> Peer ps pr 'NonPipelined st m a
-> m (a, Maybe bytes)
runAnnotatedPeer
        (if Bool
localMsgNotificationServerProtocolTracer
           then String
-> WithBearer
     (ConnectionId ntcAddr)
     (TraceSendRecv (LocalMsgNotification (Sig crypto)))
-> WithEventType
     (WithBearer
        (ConnectionId ntcAddr)
        (TraceSendRecv (LocalMsgNotification (Sig crypto))))
forall a. String -> a -> WithEventType a
WithEventType String
"LocalMsgNotification.Protocol.Server" (WithBearer
   (ConnectionId ntcAddr)
   (TraceSendRecv (LocalMsgNotification (Sig crypto)))
 -> WithEventType
      (WithBearer
         (ConnectionId ntcAddr)
         (TraceSendRecv (LocalMsgNotification (Sig crypto)))))
-> (TraceSendRecv (LocalMsgNotification (Sig crypto))
    -> WithBearer
         (ConnectionId ntcAddr)
         (TraceSendRecv (LocalMsgNotification (Sig crypto))))
-> TraceSendRecv (LocalMsgNotification (Sig crypto))
-> WithEventType
     (WithBearer
        (ConnectionId ntcAddr)
        (TraceSendRecv (LocalMsgNotification (Sig crypto))))
forall b c a. (b -> c) -> (a -> b) -> a -> c
. ConnectionId ntcAddr
-> TraceSendRecv (LocalMsgNotification (Sig crypto))
-> WithBearer
     (ConnectionId ntcAddr)
     (TraceSendRecv (LocalMsgNotification (Sig crypto)))
forall peerid a. peerid -> a -> WithBearer peerid a
Mx.WithBearer ConnectionId ntcAddr
connId (TraceSendRecv (LocalMsgNotification (Sig crypto))
 -> WithEventType
      (WithBearer
         (ConnectionId ntcAddr)
         (TraceSendRecv (LocalMsgNotification (Sig crypto)))))
-> Tracer
     m
     (WithEventType
        (WithBearer
           (ConnectionId ntcAddr)
           (TraceSendRecv (LocalMsgNotification (Sig crypto)))))
-> Tracer m (TraceSendRecv (LocalMsgNotification (Sig crypto)))
forall (f :: * -> *) a b. Contravariant f => (a -> b) -> f b -> f a
>$< Tracer
  m
  (WithEventType
     (WithBearer
        (ConnectionId ntcAddr)
        (TraceSendRecv (LocalMsgNotification (Sig crypto)))))
forall ev. ToJSON ev => Tracer m (WithEventType ev)
tracer
           else Tracer m (TraceSendRecv (LocalMsgNotification (Sig crypto)))
forall (m :: * -> *) a. Applicative m => Tracer m a
nullTracer)
        AnnotatedCodec
  (LocalMsgNotification (Sig crypto)) DeserialiseFailure m ByteString
msgNotificationCodec
        Channel m ByteString
channel
        (LocalMsgNotificationServer m (Sig crypto) ()
-> Peer
     (LocalMsgNotification (Sig crypto))
     'AsServer
     'NonPipelined
     StIdle
     m
     ()
forall (m :: * -> *) msg a.
Monad m =>
LocalMsgNotificationServer m msg a
-> Server (LocalMsgNotification msg) 'NonPipelined StIdle m a
localMsgNotificationServerPeer (LocalMsgNotificationServer m (Sig crypto) ()
 -> Peer
      (LocalMsgNotification (Sig crypto))
      'AsServer
      'NonPipelined
      StIdle
      m
      ())
-> LocalMsgNotificationServer m (Sig crypto) ()
-> Peer
     (LocalMsgNotification (Sig crypto))
     'AsServer
     'NonPipelined
     StIdle
     m
     ()
forall a b. (a -> b) -> a -> b
$
          Tracer m (TraceMessageNotificationServer (Sig crypto))
-> m ()
-> Word16
-> TxSubmissionMempoolReader SigId (Sig crypto) idx m
-> LocalMsgNotificationServer m (Sig crypto) ()
forall (m :: * -> *) msg msgid idx a.
MonadSTM m =>
Tracer m (TraceMessageNotificationServer msg)
-> m a
-> Word16
-> TxSubmissionMempoolReader msgid msg idx m
-> LocalMsgNotificationServer m msg a
localMsgNotificationServer
            Tracer m (TraceMessageNotificationServer (Sig crypto))
forall (m :: * -> *) a. Applicative m => Tracer m a
nullTracer
            (() -> m ()
forall a. a -> m a
forall (f :: * -> *) a. Applicative f => a -> f a
pure ()) Word16
maxMsgs TxSubmissionMempoolReader SigId (Sig crypto) idx m
mempoolReader)


data Protocols appType ntcAddr bytes m a b =
  Protocols {
    forall (appType :: Mode) ntcAddr bytes (m :: * -> *) a b.
Protocols appType ntcAddr bytes m a b
-> RunMiniProtocolWithMinimalCtx appType ntcAddr bytes m a b
msgSubmissionProtocol   :: !(RunMiniProtocolWithMinimalCtx appType ntcAddr bytes m a b)
  , forall (appType :: Mode) ntcAddr bytes (m :: * -> *) a b.
Protocols appType ntcAddr bytes m a b
-> RunMiniProtocolWithMinimalCtx appType ntcAddr bytes m a b
msgNotificationProtocol :: !(RunMiniProtocolWithMinimalCtx appType ntcAddr bytes m a b)
  }

responders
  :: Apps ntcAddr m a
  -> NodeToClientVersion
  -> NodeToClientVersionData
  -> OuroborosApplicationWithMinimalCtx Mx.ResponderMode ntcAddr ByteString m Void a
responders :: forall ntcAddr (m :: * -> *) a.
Apps ntcAddr m a
-> NodeToClientVersion
-> NodeToClientVersionData
-> OuroborosApplicationWithMinimalCtx
     'ResponderMode ntcAddr ByteString m Void a
responders Apps {
             App ntcAddr m a
aLocalMsgSubmission :: forall ntcAddr (m :: * -> *) a. Apps ntcAddr m a -> App ntcAddr m a
aLocalMsgSubmission :: App ntcAddr m a
aLocalMsgSubmission
           , App ntcAddr m a
aLocalMsgNotification :: forall ntcAddr (m :: * -> *) a. Apps ntcAddr m a -> App ntcAddr m a
aLocalMsgNotification :: App ntcAddr m a
aLocalMsgNotification
           }
           NodeToClientVersion
version =
  Protocols 'ResponderMode ntcAddr ByteString m Void a
-> NodeToClientVersion
-> NodeToClientVersionData
-> OuroborosApplicationWithMinimalCtx
     'ResponderMode ntcAddr ByteString m Void a
forall (appType :: Mode) ntcAddr bytes (m :: * -> *) a b.
Protocols appType ntcAddr bytes m a b
-> NodeToClientVersion
-> NodeToClientVersionData
-> OuroborosApplicationWithMinimalCtx appType ntcAddr bytes m a b
nodeToClientProtocols
    Protocols {
      msgSubmissionProtocol :: RunMiniProtocolWithMinimalCtx
  'ResponderMode ntcAddr ByteString m Void a
msgSubmissionProtocol =
        MiniProtocolCb (ResponderContext ntcAddr) ByteString m a
-> RunMiniProtocolWithMinimalCtx
     'ResponderMode ntcAddr ByteString m Void a
forall responderCtx bytes (m :: * -> *) b initiatorCtx.
MiniProtocolCb responderCtx bytes m b
-> RunMiniProtocol
     'ResponderMode initiatorCtx responderCtx bytes m Void b
ResponderProtocolOnly (MiniProtocolCb (ResponderContext ntcAddr) ByteString m a
 -> RunMiniProtocolWithMinimalCtx
      'ResponderMode ntcAddr ByteString m Void a)
-> MiniProtocolCb (ResponderContext ntcAddr) ByteString m a
-> RunMiniProtocolWithMinimalCtx
     'ResponderMode ntcAddr ByteString m Void a
forall a b. (a -> b) -> a -> b
$
           (ResponderContext ntcAddr
 -> Channel m ByteString -> m (a, Maybe ByteString))
-> MiniProtocolCb (ResponderContext ntcAddr) ByteString m a
forall ctx bytes (m :: * -> *) a.
(ctx -> Channel m bytes -> m (a, Maybe bytes))
-> MiniProtocolCb ctx bytes m a
MiniProtocolCb ((ResponderContext ntcAddr
  -> Channel m ByteString -> m (a, Maybe ByteString))
 -> MiniProtocolCb (ResponderContext ntcAddr) ByteString m a)
-> (ResponderContext ntcAddr
    -> Channel m ByteString -> m (a, Maybe ByteString))
-> MiniProtocolCb (ResponderContext ntcAddr) ByteString m a
forall a b. (a -> b) -> a -> b
$ App ntcAddr m a
aLocalMsgSubmission NodeToClientVersion
version
    , msgNotificationProtocol :: RunMiniProtocolWithMinimalCtx
  'ResponderMode ntcAddr ByteString m Void a
msgNotificationProtocol =
        MiniProtocolCb (ResponderContext ntcAddr) ByteString m a
-> RunMiniProtocolWithMinimalCtx
     'ResponderMode ntcAddr ByteString m Void a
forall responderCtx bytes (m :: * -> *) b initiatorCtx.
MiniProtocolCb responderCtx bytes m b
-> RunMiniProtocol
     'ResponderMode initiatorCtx responderCtx bytes m Void b
ResponderProtocolOnly (MiniProtocolCb (ResponderContext ntcAddr) ByteString m a
 -> RunMiniProtocolWithMinimalCtx
      'ResponderMode ntcAddr ByteString m Void a)
-> MiniProtocolCb (ResponderContext ntcAddr) ByteString m a
-> RunMiniProtocolWithMinimalCtx
     'ResponderMode ntcAddr ByteString m Void a
forall a b. (a -> b) -> a -> b
$
           (ResponderContext ntcAddr
 -> Channel m ByteString -> m (a, Maybe ByteString))
-> MiniProtocolCb (ResponderContext ntcAddr) ByteString m a
forall ctx bytes (m :: * -> *) a.
(ctx -> Channel m bytes -> m (a, Maybe bytes))
-> MiniProtocolCb ctx bytes m a
MiniProtocolCb ((ResponderContext ntcAddr
  -> Channel m ByteString -> m (a, Maybe ByteString))
 -> MiniProtocolCb (ResponderContext ntcAddr) ByteString m a)
-> (ResponderContext ntcAddr
    -> Channel m ByteString -> m (a, Maybe ByteString))
-> MiniProtocolCb (ResponderContext ntcAddr) ByteString m a
forall a b. (a -> b) -> a -> b
$ App ntcAddr m a
aLocalMsgNotification NodeToClientVersion
version
    }
    NodeToClientVersion
version


-- | Make an 'OuroborosApplication' for the bundle of mini-protocols that
-- make up the overall node-to-client protocol.
--
-- This function specifies the wire format protocol numbers as well as the
-- protocols that run for each 'NodeToClientVersion'.
--
-- They are chosen to not overlap with the node to node protocol numbers.
-- This is not essential for correctness, but is helpful to allow a single
-- shared implementation of tools that can analyse both protocols, e.g.
-- wireshark plugins.
--
nodeToClientProtocols
  :: Protocols appType ntcAddr bytes m a b
  -> NodeToClientVersion
  -> NodeToClientVersionData
  -> OuroborosApplicationWithMinimalCtx appType ntcAddr bytes m a b
nodeToClientProtocols :: forall (appType :: Mode) ntcAddr bytes (m :: * -> *) a b.
Protocols appType ntcAddr bytes m a b
-> NodeToClientVersion
-> NodeToClientVersionData
-> OuroborosApplicationWithMinimalCtx appType ntcAddr bytes m a b
nodeToClientProtocols Protocols appType ntcAddr bytes m a b
protocols NodeToClientVersion
_version NodeToClientVersionData
_versionData =
  [MiniProtocol
   appType
   (MinimalInitiatorContext ntcAddr)
   (ResponderContext ntcAddr)
   bytes
   m
   a
   b]
-> OuroborosApplication
     appType
     (MinimalInitiatorContext ntcAddr)
     (ResponderContext ntcAddr)
     bytes
     m
     a
     b
forall (mode :: Mode) initiatorCtx responderCtx bytes (m :: * -> *)
       a b.
[MiniProtocol mode initiatorCtx responderCtx bytes m a b]
-> OuroborosApplication mode initiatorCtx responderCtx bytes m a b
OuroborosApplication ([MiniProtocol
    appType
    (MinimalInitiatorContext ntcAddr)
    (ResponderContext ntcAddr)
    bytes
    m
    a
    b]
 -> OuroborosApplication
      appType
      (MinimalInitiatorContext ntcAddr)
      (ResponderContext ntcAddr)
      bytes
      m
      a
      b)
-> [MiniProtocol
      appType
      (MinimalInitiatorContext ntcAddr)
      (ResponderContext ntcAddr)
      bytes
      m
      a
      b]
-> OuroborosApplication
     appType
     (MinimalInitiatorContext ntcAddr)
     (ResponderContext ntcAddr)
     bytes
     m
     a
     b
forall a b. (a -> b) -> a -> b
$
    case Protocols appType ntcAddr bytes m a b
protocols of
      Protocols {
        RunMiniProtocolWithMinimalCtx appType ntcAddr bytes m a b
msgSubmissionProtocol :: forall (appType :: Mode) ntcAddr bytes (m :: * -> *) a b.
Protocols appType ntcAddr bytes m a b
-> RunMiniProtocolWithMinimalCtx appType ntcAddr bytes m a b
msgSubmissionProtocol :: RunMiniProtocolWithMinimalCtx appType ntcAddr bytes m a b
msgSubmissionProtocol
      , RunMiniProtocolWithMinimalCtx appType ntcAddr bytes m a b
msgNotificationProtocol :: forall (appType :: Mode) ntcAddr bytes (m :: * -> *) a b.
Protocols appType ntcAddr bytes m a b
-> RunMiniProtocolWithMinimalCtx appType ntcAddr bytes m a b
msgNotificationProtocol :: RunMiniProtocolWithMinimalCtx appType ntcAddr bytes m a b
msgNotificationProtocol
      } ->
        [ RunMiniProtocolWithMinimalCtx appType ntcAddr bytes m a b
-> MiniProtocol
     appType
     (MinimalInitiatorContext ntcAddr)
     (ResponderContext ntcAddr)
     bytes
     m
     a
     b
forall {mode :: Mode} {initiatorCtx} {responderCtx} {bytes}
       {m :: * -> *} {a} {b}.
RunMiniProtocol mode initiatorCtx responderCtx bytes m a b
-> MiniProtocol mode initiatorCtx responderCtx bytes m a b
localMsgSubmission RunMiniProtocolWithMinimalCtx appType ntcAddr bytes m a b
msgSubmissionProtocol
        , RunMiniProtocolWithMinimalCtx appType ntcAddr bytes m a b
-> MiniProtocol
     appType
     (MinimalInitiatorContext ntcAddr)
     (ResponderContext ntcAddr)
     bytes
     m
     a
     b
forall {mode :: Mode} {initiatorCtx} {responderCtx} {bytes}
       {m :: * -> *} {a} {b}.
RunMiniProtocol mode initiatorCtx responderCtx bytes m a b
-> MiniProtocol mode initiatorCtx responderCtx bytes m a b
localMsgNotification RunMiniProtocolWithMinimalCtx appType ntcAddr bytes m a b
msgNotificationProtocol
        ]
  where
    localMsgSubmission :: RunMiniProtocol mode initiatorCtx responderCtx bytes m a b
-> MiniProtocol mode initiatorCtx responderCtx bytes m a b
localMsgSubmission RunMiniProtocol mode initiatorCtx responderCtx bytes m a b
protocol = MiniProtocol {
        miniProtocolNum :: MiniProtocolNum
miniProtocolNum    = Word16 -> MiniProtocolNum
MiniProtocolNum Word16
14,
        miniProtocolStart :: StartOnDemandOrEagerly
miniProtocolStart  = StartOnDemandOrEagerly
StartOnDemand,
        miniProtocolLimits :: MiniProtocolLimits
miniProtocolLimits = MiniProtocolLimits
maximumMiniProtocolLimits,
        miniProtocolRun :: RunMiniProtocol mode initiatorCtx responderCtx bytes m a b
miniProtocolRun    = RunMiniProtocol mode initiatorCtx responderCtx bytes m a b
protocol
      }
    localMsgNotification :: RunMiniProtocol mode initiatorCtx responderCtx bytes m a b
-> MiniProtocol mode initiatorCtx responderCtx bytes m a b
localMsgNotification RunMiniProtocol mode initiatorCtx responderCtx bytes m a b
protocol = MiniProtocol {
        miniProtocolNum :: MiniProtocolNum
miniProtocolNum    = Word16 -> MiniProtocolNum
MiniProtocolNum Word16
15,
        miniProtocolStart :: StartOnDemandOrEagerly
miniProtocolStart  = StartOnDemandOrEagerly
StartOnDemand,
        miniProtocolLimits :: MiniProtocolLimits
miniProtocolLimits = MiniProtocolLimits
maximumMiniProtocolLimits,
        miniProtocolRun :: RunMiniProtocol mode initiatorCtx responderCtx bytes m a b
miniProtocolRun    = RunMiniProtocol mode initiatorCtx responderCtx bytes m a b
protocol
    }
    maximumMiniProtocolLimits :: MiniProtocolLimits
maximumMiniProtocolLimits =
      MiniProtocolLimits {
        maximumIngressQueue :: Int
maximumIngressQueue = Int
0xffffffff
      }