{-# LANGUAGE BangPatterns        #-}
{-# LANGUAGE DataKinds           #-}
{-# LANGUAGE GADTs               #-}
{-# LANGUAGE KindSignatures      #-}
{-# LANGUAGE LambdaCase          #-}
{-# LANGUAGE NamedFieldPuns      #-}
{-# LANGUAGE ScopedTypeVariables #-}

module Ouroboros.Network.TxSubmission.Inbound.V2
  ( -- * TxSubmision Inbound client
    txSubmissionInboundV2
    -- * PeerTxAPI
  , withPeer
  , PeerTxAPI
    -- * Supporting types
  , module V2
  , TxChannelsVar
  , newTxChannelsVar
  , TxMempoolSem
  , newTxMempoolSem
  , SharedTxStateVar
  , newSharedTxStateVar
  , TxDecisionPolicy (..)
  , defaultTxDecisionPolicy
  ) where

import Data.List.NonEmpty qualified as NonEmpty
import Data.Map.Strict qualified as Map
import Data.Sequence.Strict qualified as StrictSeq
import Data.Set qualified as Set

import Control.Exception (assert)
import Control.Monad (unless, when)
import Control.Monad.Class.MonadThrow
import Control.Monad.Class.MonadTimer.SI
import Control.Tracer (Tracer, traceWith)

import Network.TypedProtocol

import Ouroboros.Network.Protocol.TxSubmission2.Server
import Ouroboros.Network.TxSubmission.Inbound.V2.Policy
import Ouroboros.Network.TxSubmission.Inbound.V2.Registry
import Ouroboros.Network.TxSubmission.Inbound.V2.State
import Ouroboros.Network.TxSubmission.Inbound.V2.Types as V2

-- | A tx-submission inbound side (server, sic!).
--
-- The server blocks on receiving `TxDecision` from the decision logic. If
-- there are tx's to download it pipelines two requests: first for tx's second
-- for txid's. If there are no tx's to download, it either sends a blocking or
-- non-blocking request for txid's.
--
txSubmissionInboundV2
  :: forall txid tx idx m.
     ( MonadDelay m
     , MonadThrow m
     , Ord txid
     )
  => Tracer m (TraceTxSubmissionInbound txid tx)
  -> TxSubmissionInitDelay
  -> TxSubmissionMempoolWriter txid tx idx m
  -> PeerTxAPI m txid tx
  -> TxSubmissionServerPipelined txid tx m ()
txSubmissionInboundV2 :: forall txid tx idx (m :: * -> *).
(MonadDelay m, MonadThrow m, Ord txid) =>
Tracer m (TraceTxSubmissionInbound txid tx)
-> TxSubmissionInitDelay
-> TxSubmissionMempoolWriter txid tx idx m
-> PeerTxAPI m txid tx
-> TxSubmissionServerPipelined txid tx m ()
txSubmissionInboundV2
    Tracer m (TraceTxSubmissionInbound txid tx)
tracer
    TxSubmissionInitDelay
initDelay
    TxSubmissionMempoolWriter { tx -> txid
txId :: tx -> txid
txId :: forall txid tx idx (m :: * -> *).
TxSubmissionMempoolWriter txid tx idx m -> tx -> txid
txId }
    PeerTxAPI {
      m (TxDecision txid tx)
readTxDecision :: m (TxDecision txid tx)
readTxDecision :: forall (m :: * -> *) txid tx.
PeerTxAPI m txid tx -> m (TxDecision txid tx)
readTxDecision,
      NumTxIdsToReq -> StrictSeq txid -> Map txid SizeInBytes -> m ()
handleReceivedTxIds :: NumTxIdsToReq -> StrictSeq txid -> Map txid SizeInBytes -> m ()
handleReceivedTxIds :: forall (m :: * -> *) txid tx.
PeerTxAPI m txid tx
-> NumTxIdsToReq -> StrictSeq txid -> Map txid SizeInBytes -> m ()
handleReceivedTxIds,
      Map txid SizeInBytes
-> Map txid tx -> m (Maybe TxSubmissionProtocolError)
handleReceivedTxs :: Map txid SizeInBytes
-> Map txid tx -> m (Maybe TxSubmissionProtocolError)
handleReceivedTxs :: forall (m :: * -> *) txid tx.
PeerTxAPI m txid tx
-> Map txid SizeInBytes
-> Map txid tx
-> m (Maybe TxSubmissionProtocolError)
handleReceivedTxs,
      Tracer m (TraceTxSubmissionInbound txid tx) -> txid -> tx -> m ()
submitTxToMempool :: Tracer m (TraceTxSubmissionInbound txid tx) -> txid -> tx -> m ()
submitTxToMempool :: forall (m :: * -> *) txid tx.
PeerTxAPI m txid tx
-> Tracer m (TraceTxSubmissionInbound txid tx)
-> txid
-> tx
-> m ()
submitTxToMempool
    }
    =
    m (ServerStIdle 'Z txid tx m ())
-> TxSubmissionServerPipelined txid tx m ()
forall (m :: * -> *) txid tx a.
m (ServerStIdle 'Z txid tx m a)
-> TxSubmissionServerPipelined txid tx m a
TxSubmissionServerPipelined (m (ServerStIdle 'Z txid tx m ())
 -> TxSubmissionServerPipelined txid tx m ())
-> m (ServerStIdle 'Z txid tx m ())
-> TxSubmissionServerPipelined txid tx m ()
forall a b. (a -> b) -> a -> b
$ do
      case TxSubmissionInitDelay
initDelay of
        TxSubmissionInitDelay DiffTime
delay -> DiffTime -> m ()
forall (m :: * -> *). MonadDelay m => DiffTime -> m ()
threadDelay DiffTime
delay
        TxSubmissionInitDelay
NoTxSubmissionInitDelay     -> () -> m ()
forall a. a -> m a
forall (m :: * -> *) a. Monad m => a -> m a
return ()
      m (ServerStIdle 'Z txid tx m ())
serverIdle
  where
    serverIdle
      :: m (ServerStIdle Z txid tx m ())
    serverIdle :: m (ServerStIdle 'Z txid tx m ())
serverIdle = do
        -- Block on next decision.
        txd@TxDecision { txdTxsToRequest = txsToRequest,
                         txdTxsToMempool = TxsToMempool { listOfTxsToMempool } }
          <- m (TxDecision txid tx)
readTxDecision
        traceWith tracer (TraceTxInboundDecision txd)

        let !collected = [(txid, tx)] -> Int
forall a. [a] -> Int
forall (t :: * -> *) a. Foldable t => t a -> Int
length [(txid, tx)]
listOfTxsToMempool

        -- Only attempt to add TXs if we have some work to do
        when (collected > 0) $ do
          -- submitTxToMempool traces:
          -- * `TraceTxSubmissionProcessed`,
          -- * `TraceTxInboundAddedToMempool`, and
          -- * `TraceTxInboundRejectedFromMempool`
          -- events.
          mapM_ (uncurry $ submitTxToMempool tracer) listOfTxsToMempool

        -- TODO:
        -- We can update the state so that other `tx-submission` servers will
        -- not try to add these txs to the mempool.
        if Map.null txsToRequest
          then serverReqTxIds Zero txd
          else serverReqTxs txd

    -- Pipelined request of txs
    serverReqTxs :: TxDecision txid tx
                 -> m (ServerStIdle Z txid tx m ())
    serverReqTxs :: TxDecision txid tx -> m (ServerStIdle 'Z txid tx m ())
serverReqTxs txd :: TxDecision txid tx
txd@TxDecision { txdTxsToRequest :: forall txid tx. TxDecision txid tx -> Map txid SizeInBytes
txdTxsToRequest = Map txid SizeInBytes
txdTxsToRequest } =
      ServerStIdle 'Z txid tx m () -> m (ServerStIdle 'Z txid tx m ())
forall a. a -> m a
forall (f :: * -> *) a. Applicative f => a -> f a
pure (ServerStIdle 'Z txid tx m () -> m (ServerStIdle 'Z txid tx m ()))
-> ServerStIdle 'Z txid tx m () -> m (ServerStIdle 'Z txid tx m ())
forall a b. (a -> b) -> a -> b
$ Map txid SizeInBytes
-> m (ServerStIdle ('S 'Z) txid tx m ())
-> ServerStIdle 'Z txid tx m ()
forall txid (m :: * -> *) (n :: N) tx a.
Map txid SizeInBytes
-> m (ServerStIdle ('S n) txid tx m a)
-> ServerStIdle n txid tx m a
SendMsgRequestTxsPipelined Map txid SizeInBytes
txdTxsToRequest
                                        (Nat ('S 'Z)
-> TxDecision txid tx -> m (ServerStIdle ('S 'Z) txid tx m ())
forall (n :: N).
Nat n -> TxDecision txid tx -> m (ServerStIdle n txid tx m ())
serverReqTxIds (Nat 'Z -> Nat ('S 'Z)
forall (m :: N) (n :: N). (m ~ 'S n) => Nat n -> Nat m
Succ Nat 'Z
forall (n :: N). ('Z ~ n) => Nat n
Zero) TxDecision txid tx
txd)


    serverReqTxIds :: forall (n :: N).
                      Nat n
                   -> TxDecision txid tx
                   -> m (ServerStIdle n txid tx m ())
    serverReqTxIds :: forall (n :: N).
Nat n -> TxDecision txid tx -> m (ServerStIdle n txid tx m ())
serverReqTxIds
      Nat n
n TxDecision { txdTxIdsToRequest :: forall txid tx. TxDecision txid tx -> NumTxIdsToReq
txdTxIdsToRequest = NumTxIdsToReq
0 }
      =
      case Nat n
n of
        Nat n
Zero   -> m (ServerStIdle n txid tx m ())
m (ServerStIdle 'Z txid tx m ())
serverIdle
        Succ Nat n
_ -> Nat ('S n) -> m (ServerStIdle ('S n) txid tx m ())
forall (n :: N). Nat ('S n) -> m (ServerStIdle ('S n) txid tx m ())
handleReplies Nat n
Nat ('S n)
n

    serverReqTxIds
      -- if there are no unacknowledged txids, the protocol requires sending
      -- a blocking `MsgRequestTxIds` request.  This is important, as otherwise
      -- the client side wouldn't have a chance to terminate the
      -- mini-protocol.
      Nat n
Zero TxDecision { txdTxIdsToAcknowledge :: forall txid tx. TxDecision txid tx -> NumTxIdsToAck
txdTxIdsToAcknowledge = NumTxIdsToAck
txIdsToAck,
                        txdPipelineTxIds :: forall txid tx. TxDecision txid tx -> Bool
txdPipelineTxIds      = Bool
False,
                        txdTxIdsToRequest :: forall txid tx. TxDecision txid tx -> NumTxIdsToReq
txdTxIdsToRequest     = NumTxIdsToReq
txIdsToReq
                      }
      =
      ServerStIdle n txid tx m () -> m (ServerStIdle n txid tx m ())
forall a. a -> m a
forall (f :: * -> *) a. Applicative f => a -> f a
pure (ServerStIdle n txid tx m () -> m (ServerStIdle n txid tx m ()))
-> ServerStIdle n txid tx m () -> m (ServerStIdle n txid tx m ())
forall a b. (a -> b) -> a -> b
$ NumTxIdsToAck
-> NumTxIdsToReq
-> m ()
-> (NonEmpty (txid, SizeInBytes)
    -> m (ServerStIdle 'Z txid tx m ()))
-> ServerStIdle 'Z txid tx m ()
forall (m :: * -> *) a txid tx.
NumTxIdsToAck
-> NumTxIdsToReq
-> m a
-> (NonEmpty (txid, SizeInBytes)
    -> m (ServerStIdle 'Z txid tx m a))
-> ServerStIdle 'Z txid tx m a
SendMsgRequestTxIdsBlocking
                NumTxIdsToAck
txIdsToAck NumTxIdsToReq
txIdsToReq
                -- Our result if the client terminates the protocol
                (Tracer m (TraceTxSubmissionInbound txid tx)
-> TraceTxSubmissionInbound txid tx -> m ()
forall (m :: * -> *) a. Tracer m a -> a -> m ()
traceWith Tracer m (TraceTxSubmissionInbound txid tx)
tracer TraceTxSubmissionInbound txid tx
forall txid tx. TraceTxSubmissionInbound txid tx
TraceTxInboundTerminated)
                (\NonEmpty (txid, SizeInBytes)
txids -> do
                   let txids' :: [(txid, SizeInBytes)]
txids' = NonEmpty (txid, SizeInBytes) -> [(txid, SizeInBytes)]
forall a. NonEmpty a -> [a]
NonEmpty.toList NonEmpty (txid, SizeInBytes)
txids
                       txidsSeq :: StrictSeq txid
txidsSeq = [txid] -> StrictSeq txid
forall a. [a] -> StrictSeq a
StrictSeq.fromList ([txid] -> StrictSeq txid) -> [txid] -> StrictSeq txid
forall a b. (a -> b) -> a -> b
$ (txid, SizeInBytes) -> txid
forall a b. (a, b) -> a
fst ((txid, SizeInBytes) -> txid) -> [(txid, SizeInBytes)] -> [txid]
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> [(txid, SizeInBytes)]
txids'
                       txidsMap :: Map txid SizeInBytes
txidsMap = [(txid, SizeInBytes)] -> Map txid SizeInBytes
forall k a. Ord k => [(k, a)] -> Map k a
Map.fromList [(txid, SizeInBytes)]
txids'
                   Bool -> m () -> m ()
forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
unless (StrictSeq txid -> Int
forall a. StrictSeq a -> Int
StrictSeq.length StrictSeq txid
txidsSeq Int -> Int -> Bool
forall a. Ord a => a -> a -> Bool
<= NumTxIdsToReq -> Int
forall a b. (Integral a, Num b) => a -> b
fromIntegral NumTxIdsToReq
txIdsToReq) (m () -> m ()) -> m () -> m ()
forall a b. (a -> b) -> a -> b
$
                     TxSubmissionProtocolError -> m ()
forall e a. Exception e => e -> m a
forall (m :: * -> *) e a. (MonadThrow m, Exception e) => e -> m a
throwIO TxSubmissionProtocolError
ProtocolErrorTxIdsNotRequested
                   NumTxIdsToReq -> StrictSeq txid -> Map txid SizeInBytes -> m ()
handleReceivedTxIds NumTxIdsToReq
txIdsToReq StrictSeq txid
txidsSeq Map txid SizeInBytes
txidsMap
                   m (ServerStIdle 'Z txid tx m ())
serverIdle
                )

    serverReqTxIds
      n :: Nat n
n@Nat n
Zero TxDecision { txdTxIdsToAcknowledge :: forall txid tx. TxDecision txid tx -> NumTxIdsToAck
txdTxIdsToAcknowledge = NumTxIdsToAck
txIdsToAck,
                          txdPipelineTxIds :: forall txid tx. TxDecision txid tx -> Bool
txdPipelineTxIds      = Bool
True,
                          txdTxIdsToRequest :: forall txid tx. TxDecision txid tx -> NumTxIdsToReq
txdTxIdsToRequest     = NumTxIdsToReq
txIdsToReq
                        }
      =
      ServerStIdle n txid tx m () -> m (ServerStIdle n txid tx m ())
forall a. a -> m a
forall (f :: * -> *) a. Applicative f => a -> f a
pure (ServerStIdle n txid tx m () -> m (ServerStIdle n txid tx m ()))
-> ServerStIdle n txid tx m () -> m (ServerStIdle n txid tx m ())
forall a b. (a -> b) -> a -> b
$ NumTxIdsToAck
-> NumTxIdsToReq
-> m (ServerStIdle ('S n) txid tx m ())
-> ServerStIdle n txid tx m ()
forall (m :: * -> *) (n :: N) txid tx a.
NumTxIdsToAck
-> NumTxIdsToReq
-> m (ServerStIdle ('S n) txid tx m a)
-> ServerStIdle n txid tx m a
SendMsgRequestTxIdsPipelined
                NumTxIdsToAck
txIdsToAck NumTxIdsToReq
txIdsToReq
                (Nat ('S n) -> m (ServerStIdle ('S n) txid tx m ())
forall (n :: N). Nat ('S n) -> m (ServerStIdle ('S n) txid tx m ())
handleReplies (Nat n -> Nat ('S n)
forall (m :: N) (n :: N). (m ~ 'S n) => Nat n -> Nat m
Succ Nat n
n))

    serverReqTxIds
      n :: Nat n
n@Succ{} TxDecision { txdTxIdsToAcknowledge :: forall txid tx. TxDecision txid tx -> NumTxIdsToAck
txdTxIdsToAcknowledge = NumTxIdsToAck
txIdsToAck,
                            Bool
txdPipelineTxIds :: forall txid tx. TxDecision txid tx -> Bool
txdPipelineTxIds :: Bool
txdPipelineTxIds,
                            txdTxIdsToRequest :: forall txid tx. TxDecision txid tx -> NumTxIdsToReq
txdTxIdsToRequest     = NumTxIdsToReq
txIdsToReq
                          }
      =
      -- it is impossible that we have had `tx`'s to request (Succ{} - is an
      -- evidence for that), but no unacknowledged `txid`s.
      Bool
-> m (ServerStIdle n txid tx m ())
-> m (ServerStIdle n txid tx m ())
forall a. (?callStack::CallStack) => Bool -> a -> a
assert Bool
txdPipelineTxIds (m (ServerStIdle n txid tx m ())
 -> m (ServerStIdle n txid tx m ()))
-> m (ServerStIdle n txid tx m ())
-> m (ServerStIdle n txid tx m ())
forall a b. (a -> b) -> a -> b
$
      ServerStIdle n txid tx m () -> m (ServerStIdle n txid tx m ())
forall a. a -> m a
forall (f :: * -> *) a. Applicative f => a -> f a
pure (ServerStIdle n txid tx m () -> m (ServerStIdle n txid tx m ()))
-> ServerStIdle n txid tx m () -> m (ServerStIdle n txid tx m ())
forall a b. (a -> b) -> a -> b
$ NumTxIdsToAck
-> NumTxIdsToReq
-> m (ServerStIdle ('S n) txid tx m ())
-> ServerStIdle n txid tx m ()
forall (m :: * -> *) (n :: N) txid tx a.
NumTxIdsToAck
-> NumTxIdsToReq
-> m (ServerStIdle ('S n) txid tx m a)
-> ServerStIdle n txid tx m a
SendMsgRequestTxIdsPipelined
               NumTxIdsToAck
txIdsToAck NumTxIdsToReq
txIdsToReq
               (Nat ('S n) -> m (ServerStIdle ('S n) txid tx m ())
forall (n :: N). Nat ('S n) -> m (ServerStIdle ('S n) txid tx m ())
handleReplies (Nat n -> Nat ('S n)
forall (m :: N) (n :: N). (m ~ 'S n) => Nat n -> Nat m
Succ Nat n
n))


    handleReplies :: forall (n :: N).
                   Nat (S n)
                -> m (ServerStIdle (S n) txid tx m ())
    handleReplies :: forall (n :: N). Nat ('S n) -> m (ServerStIdle ('S n) txid tx m ())
handleReplies (Succ n' :: Nat n
n'@Succ{}) =
      ServerStIdle ('S n) txid tx m ()
-> m (ServerStIdle ('S n) txid tx m ())
forall a. a -> m a
forall (f :: * -> *) a. Applicative f => a -> f a
pure (ServerStIdle ('S n) txid tx m ()
 -> m (ServerStIdle ('S n) txid tx m ()))
-> ServerStIdle ('S n) txid tx m ()
-> m (ServerStIdle ('S n) txid tx m ())
forall a b. (a -> b) -> a -> b
$ Maybe (m (ServerStIdle ('S n) txid tx m ()))
-> (Collect txid tx -> m (ServerStIdle n txid tx m ()))
-> ServerStIdle ('S n) txid tx m ()
forall (m :: * -> *) (n1 :: N) txid tx a.
Maybe (m (ServerStIdle ('S n1) txid tx m a))
-> (Collect txid tx -> m (ServerStIdle n1 txid tx m a))
-> ServerStIdle ('S n1) txid tx m a
CollectPipelined
                Maybe (m (ServerStIdle ('S n) txid tx m ()))
forall a. Maybe a
Nothing
                (m (ServerStIdle n txid tx m ())
-> Collect txid tx -> m (ServerStIdle n txid tx m ())
forall (n :: N).
m (ServerStIdle n txid tx m ())
-> Collect txid tx -> m (ServerStIdle n txid tx m ())
handleReply (Nat ('S n) -> m (ServerStIdle ('S n) txid tx m ())
forall (n :: N). Nat ('S n) -> m (ServerStIdle ('S n) txid tx m ())
handleReplies Nat n
Nat ('S n)
n'))

    handleReplies (Succ Nat n
Zero) =
      ServerStIdle ('S n) txid tx m ()
-> m (ServerStIdle ('S n) txid tx m ())
forall a. a -> m a
forall (f :: * -> *) a. Applicative f => a -> f a
pure (ServerStIdle ('S n) txid tx m ()
 -> m (ServerStIdle ('S n) txid tx m ()))
-> ServerStIdle ('S n) txid tx m ()
-> m (ServerStIdle ('S n) txid tx m ())
forall a b. (a -> b) -> a -> b
$ Maybe (m (ServerStIdle ('S n) txid tx m ()))
-> (Collect txid tx -> m (ServerStIdle n txid tx m ()))
-> ServerStIdle ('S n) txid tx m ()
forall (m :: * -> *) (n1 :: N) txid tx a.
Maybe (m (ServerStIdle ('S n1) txid tx m a))
-> (Collect txid tx -> m (ServerStIdle n1 txid tx m a))
-> ServerStIdle ('S n1) txid tx m a
CollectPipelined
                Maybe (m (ServerStIdle ('S n) txid tx m ()))
forall a. Maybe a
Nothing
                (m (ServerStIdle n txid tx m ())
-> Collect txid tx -> m (ServerStIdle n txid tx m ())
forall (n :: N).
m (ServerStIdle n txid tx m ())
-> Collect txid tx -> m (ServerStIdle n txid tx m ())
handleReply m (ServerStIdle n txid tx m ())
m (ServerStIdle 'Z txid tx m ())
serverIdle)

    handleReply :: forall (n :: N).
                   m (ServerStIdle n txid tx m ())
                   -- continuation
                -> Collect txid tx
                -> m (ServerStIdle n txid tx m ())
    handleReply :: forall (n :: N).
m (ServerStIdle n txid tx m ())
-> Collect txid tx -> m (ServerStIdle n txid tx m ())
handleReply m (ServerStIdle n txid tx m ())
k = \case
      CollectTxIds NumTxIdsToReq
txIdsToReq [(txid, SizeInBytes)]
txids -> do
        let txidsSeq :: StrictSeq txid
txidsSeq = [txid] -> StrictSeq txid
forall a. [a] -> StrictSeq a
StrictSeq.fromList ([txid] -> StrictSeq txid) -> [txid] -> StrictSeq txid
forall a b. (a -> b) -> a -> b
$ (txid, SizeInBytes) -> txid
forall a b. (a, b) -> a
fst ((txid, SizeInBytes) -> txid) -> [(txid, SizeInBytes)] -> [txid]
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> [(txid, SizeInBytes)]
txids
            txidsMap :: Map txid SizeInBytes
txidsMap = [(txid, SizeInBytes)] -> Map txid SizeInBytes
forall k a. Ord k => [(k, a)] -> Map k a
Map.fromList [(txid, SizeInBytes)]
txids
        Bool -> m () -> m ()
forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
unless (StrictSeq txid -> Int
forall a. StrictSeq a -> Int
StrictSeq.length StrictSeq txid
txidsSeq Int -> Int -> Bool
forall a. Ord a => a -> a -> Bool
<= NumTxIdsToReq -> Int
forall a b. (Integral a, Num b) => a -> b
fromIntegral NumTxIdsToReq
txIdsToReq) (m () -> m ()) -> m () -> m ()
forall a b. (a -> b) -> a -> b
$
          TxSubmissionProtocolError -> m ()
forall e a. Exception e => e -> m a
forall (m :: * -> *) e a. (MonadThrow m, Exception e) => e -> m a
throwIO TxSubmissionProtocolError
ProtocolErrorTxIdsNotRequested
        NumTxIdsToReq -> StrictSeq txid -> Map txid SizeInBytes -> m ()
handleReceivedTxIds NumTxIdsToReq
txIdsToReq StrictSeq txid
txidsSeq Map txid SizeInBytes
txidsMap
        m (ServerStIdle n txid tx m ())
k
      CollectTxs Map txid SizeInBytes
txids [tx]
txs -> do
        let requested :: Set txid
requested = Map txid SizeInBytes -> Set txid
forall k a. Map k a -> Set k
Map.keysSet Map txid SizeInBytes
txids
            received :: Map txid tx
received  = [(txid, tx)] -> Map txid tx
forall k a. Ord k => [(k, a)] -> Map k a
Map.fromList [ (tx -> txid
txId tx
tx, tx
tx) | tx
tx <- [tx]
txs ]

        Bool -> m () -> m ()
forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
unless (Map txid tx -> Set txid
forall k a. Map k a -> Set k
Map.keysSet Map txid tx
received Set txid -> Set txid -> Bool
forall a. Ord a => Set a -> Set a -> Bool
`Set.isSubsetOf` Set txid
requested) (m () -> m ()) -> m () -> m ()
forall a b. (a -> b) -> a -> b
$
          TxSubmissionProtocolError -> m ()
forall e a. Exception e => e -> m a
forall (m :: * -> *) e a. (MonadThrow m, Exception e) => e -> m a
throwIO TxSubmissionProtocolError
ProtocolErrorTxNotRequested

        mbe <- Map txid SizeInBytes
-> Map txid tx -> m (Maybe TxSubmissionProtocolError)
handleReceivedTxs Map txid SizeInBytes
txids Map txid tx
received
        traceWith tracer $ TraceTxSubmissionCollected (txId `map` txs)
        case mbe of
          -- one of `tx`s had a wrong size
          Just TxSubmissionProtocolError
e  -> Tracer m (TraceTxSubmissionInbound txid tx)
-> TraceTxSubmissionInbound txid tx -> m ()
forall (m :: * -> *) a. Tracer m a -> a -> m ()
traceWith Tracer m (TraceTxSubmissionInbound txid tx)
tracer (TxSubmissionProtocolError -> TraceTxSubmissionInbound txid tx
forall txid tx.
TxSubmissionProtocolError -> TraceTxSubmissionInbound txid tx
TraceTxInboundError TxSubmissionProtocolError
e)
                  m ()
-> m (ServerStIdle n txid tx m ())
-> m (ServerStIdle n txid tx m ())
forall a b. m a -> m b -> m b
forall (m :: * -> *) a b. Monad m => m a -> m b -> m b
>> TxSubmissionProtocolError -> m (ServerStIdle n txid tx m ())
forall e a. Exception e => e -> m a
forall (m :: * -> *) e a. (MonadThrow m, Exception e) => e -> m a
throwIO TxSubmissionProtocolError
e
          Maybe TxSubmissionProtocolError
Nothing -> m (ServerStIdle n txid tx m ())
k