{-# LANGUAGE BangPatterns        #-}
{-# LANGUAGE CPP                 #-}
{-# LANGUAGE DataKinds           #-}
{-# LANGUAGE DeriveGeneric       #-}
{-# LANGUAGE GADTs               #-}
{-# LANGUAGE KindSignatures      #-}
{-# LANGUAGE NamedFieldPuns      #-}
{-# LANGUAGE ScopedTypeVariables #-}

{-# OPTIONS_GHC -Wno-partial-fields #-}

module Ouroboros.Network.TxSubmission.Inbound
  ( txSubmissionInbound
  , TxSubmissionMempoolWriter (..)
  , TraceTxSubmissionInbound (..)
  , TxSubmissionProtocolError (..)
  , ProcessedTxCount (..)
  ) where

import Data.Foldable as Foldable (foldl', toList)
import Data.List.NonEmpty qualified as NonEmpty
import Data.Map.Strict (Map)
import Data.Map.Strict qualified as Map
import Data.Maybe
import Data.Sequence.Strict (StrictSeq)
import Data.Sequence.Strict qualified as Seq
import Data.Set qualified as Set
import Data.Word (Word16)
import GHC.Generics (Generic)
import NoThunks.Class (NoThunks (..), unsafeNoThunks)

import Cardano.Prelude (forceElemsToWHNF)

import Control.Concurrent.Class.MonadSTM.Strict.TVar.Checked
import Control.Exception (assert)
import Control.Monad (unless)
import Control.Monad.Class.MonadSTM
import Control.Monad.Class.MonadThrow
import Control.Monad.Class.MonadTimer.SI
import Control.Tracer (Tracer, traceWith)

import Network.TypedProtocol.Core (N, Nat (..), natToInt)

import Ouroboros.Network.NodeToNode.Version (NodeToNodeVersion)
import Ouroboros.Network.Protocol.Limits
import Ouroboros.Network.Protocol.TxSubmission2.Server
import Ouroboros.Network.Protocol.TxSubmission2.Type
import Ouroboros.Network.TxSubmission.Mempool.Reader (MempoolSnapshot (..),
           TxSubmissionMempoolReader (..))

-- | The consensus layer functionality that the inbound side of the tx
-- submission logic requires.
--
-- This is provided to the tx submission logic by the consensus layer.
--
data TxSubmissionMempoolWriter txid tx idx m =
     TxSubmissionMempoolWriter {

       -- | Compute the transaction id from a transaction.
       --
       -- This is used in the protocol handler to verify a full transaction
       -- matches a previously given transaction id.
       --
       forall txid tx idx (m :: * -> *).
TxSubmissionMempoolWriter txid tx idx m -> tx -> txid
txId          :: tx -> txid,

       -- | Supply a batch of transactions to the mempool. They are either
       -- accepted or rejected individually, but in the order supplied.
       --
       -- The 'txid's of all transactions that were added successfully are
       -- returned.
       forall txid tx idx (m :: * -> *).
TxSubmissionMempoolWriter txid tx idx m -> [tx] -> m [txid]
mempoolAddTxs :: [tx] -> m [txid]
    }

data ProcessedTxCount = ProcessedTxCount {
      -- | Just accepted this many transactions.
      ProcessedTxCount -> Int
ptxcAccepted :: Int
      -- | Just rejected this many transactions.
    , ProcessedTxCount -> Int
ptxcRejected :: Int
    }
  deriving (ProcessedTxCount -> ProcessedTxCount -> Bool
(ProcessedTxCount -> ProcessedTxCount -> Bool)
-> (ProcessedTxCount -> ProcessedTxCount -> Bool)
-> Eq ProcessedTxCount
forall a. (a -> a -> Bool) -> (a -> a -> Bool) -> Eq a
$c== :: ProcessedTxCount -> ProcessedTxCount -> Bool
== :: ProcessedTxCount -> ProcessedTxCount -> Bool
$c/= :: ProcessedTxCount -> ProcessedTxCount -> Bool
/= :: ProcessedTxCount -> ProcessedTxCount -> Bool
Eq, Int -> ProcessedTxCount -> ShowS
[ProcessedTxCount] -> ShowS
ProcessedTxCount -> String
(Int -> ProcessedTxCount -> ShowS)
-> (ProcessedTxCount -> String)
-> ([ProcessedTxCount] -> ShowS)
-> Show ProcessedTxCount
forall a.
(Int -> a -> ShowS) -> (a -> String) -> ([a] -> ShowS) -> Show a
$cshowsPrec :: Int -> ProcessedTxCount -> ShowS
showsPrec :: Int -> ProcessedTxCount -> ShowS
$cshow :: ProcessedTxCount -> String
show :: ProcessedTxCount -> String
$cshowList :: [ProcessedTxCount] -> ShowS
showList :: [ProcessedTxCount] -> ShowS
Show)

data TraceTxSubmissionInbound txid tx =
    -- | Number of transactions just about to be inserted.
    TraceTxSubmissionCollected Int
    -- | Just processed transaction pass/fail breakdown.
  | TraceTxSubmissionProcessed ProcessedTxCount
    -- | Server received 'MsgDone'
  | TraceTxInboundTerminated
  | TraceTxInboundCanRequestMoreTxs Int
  | TraceTxInboundCannotRequestMoreTxs Int
  deriving (TraceTxSubmissionInbound txid tx
-> TraceTxSubmissionInbound txid tx -> Bool
(TraceTxSubmissionInbound txid tx
 -> TraceTxSubmissionInbound txid tx -> Bool)
-> (TraceTxSubmissionInbound txid tx
    -> TraceTxSubmissionInbound txid tx -> Bool)
-> Eq (TraceTxSubmissionInbound txid tx)
forall a. (a -> a -> Bool) -> (a -> a -> Bool) -> Eq a
forall txid tx.
TraceTxSubmissionInbound txid tx
-> TraceTxSubmissionInbound txid tx -> Bool
$c== :: forall txid tx.
TraceTxSubmissionInbound txid tx
-> TraceTxSubmissionInbound txid tx -> Bool
== :: TraceTxSubmissionInbound txid tx
-> TraceTxSubmissionInbound txid tx -> Bool
$c/= :: forall txid tx.
TraceTxSubmissionInbound txid tx
-> TraceTxSubmissionInbound txid tx -> Bool
/= :: TraceTxSubmissionInbound txid tx
-> TraceTxSubmissionInbound txid tx -> Bool
Eq, Int -> TraceTxSubmissionInbound txid tx -> ShowS
[TraceTxSubmissionInbound txid tx] -> ShowS
TraceTxSubmissionInbound txid tx -> String
(Int -> TraceTxSubmissionInbound txid tx -> ShowS)
-> (TraceTxSubmissionInbound txid tx -> String)
-> ([TraceTxSubmissionInbound txid tx] -> ShowS)
-> Show (TraceTxSubmissionInbound txid tx)
forall a.
(Int -> a -> ShowS) -> (a -> String) -> ([a] -> ShowS) -> Show a
forall txid tx. Int -> TraceTxSubmissionInbound txid tx -> ShowS
forall txid tx. [TraceTxSubmissionInbound txid tx] -> ShowS
forall txid tx. TraceTxSubmissionInbound txid tx -> String
$cshowsPrec :: forall txid tx. Int -> TraceTxSubmissionInbound txid tx -> ShowS
showsPrec :: Int -> TraceTxSubmissionInbound txid tx -> ShowS
$cshow :: forall txid tx. TraceTxSubmissionInbound txid tx -> String
show :: TraceTxSubmissionInbound txid tx -> String
$cshowList :: forall txid tx. [TraceTxSubmissionInbound txid tx] -> ShowS
showList :: [TraceTxSubmissionInbound txid tx] -> ShowS
Show)

data TxSubmissionProtocolError =
       ProtocolErrorTxNotRequested
     | ProtocolErrorTxIdsNotRequested
  deriving Int -> TxSubmissionProtocolError -> ShowS
[TxSubmissionProtocolError] -> ShowS
TxSubmissionProtocolError -> String
(Int -> TxSubmissionProtocolError -> ShowS)
-> (TxSubmissionProtocolError -> String)
-> ([TxSubmissionProtocolError] -> ShowS)
-> Show TxSubmissionProtocolError
forall a.
(Int -> a -> ShowS) -> (a -> String) -> ([a] -> ShowS) -> Show a
$cshowsPrec :: Int -> TxSubmissionProtocolError -> ShowS
showsPrec :: Int -> TxSubmissionProtocolError -> ShowS
$cshow :: TxSubmissionProtocolError -> String
show :: TxSubmissionProtocolError -> String
$cshowList :: [TxSubmissionProtocolError] -> ShowS
showList :: [TxSubmissionProtocolError] -> ShowS
Show

instance Exception TxSubmissionProtocolError where
  displayException :: TxSubmissionProtocolError -> String
displayException TxSubmissionProtocolError
ProtocolErrorTxNotRequested =
      String
"The peer replied with a transaction we did not ask for."
  displayException TxSubmissionProtocolError
ProtocolErrorTxIdsNotRequested =
      String
"The peer replied with more txids than we asked for."


-- | Information maintained internally in the 'txSubmissionInbound' server
-- implementation.
--
data ServerState txid tx = ServerState {
       -- | The number of transaction identifiers that we have requested but
       -- which have not yet been replied to. We need to track this it keep
       -- our requests within the limit on the number of unacknowledged txids.
       --
       forall txid tx. ServerState txid tx -> Word16
requestedTxIdsInFlight :: !Word16,

       -- | Those transactions (by their identifier) that the client has told
       -- us about, and which we have not yet acknowledged. This is kept in
       -- the order in which the client gave them to us. This is the same order
       -- in which we submit them to the mempool (or for this example, the final
       -- result order). It is also the order we acknowledge in.
       forall txid tx. ServerState txid tx -> StrictSeq txid
unacknowledgedTxIds    :: !(StrictSeq txid),

       -- | Those transactions (by their identifier) that we can request. These
       -- are a subset of the 'unacknowledgedTxIds' that we have not yet
       -- requested. This is not ordered to illustrate the fact that we can
       -- request txs out of order. We also remember the size.
       forall txid tx. ServerState txid tx -> Map txid SizeInBytes
availableTxids         :: !(Map txid SizeInBytes),

       -- | Transactions we have successfully downloaded but have not yet added
       -- to the mempool or acknowledged. This needed because we can request
       -- transactions out of order but must use the original order when adding
       -- to the mempool or acknowledging transactions.
       --
       -- However, it's worth noting that, in a few situations, some of the
       -- transaction IDs in this 'Map' may be mapped to 'Nothing':
       --
       -- * Transaction IDs mapped to 'Nothing' can represent transaction IDs
       --   that were requested, but not received. This can occur because the
       --   client will not necessarily send all of the transactions that we
       --   asked for, but we still need to acknowledge those transactions.
       --
       --   For example, if we request a transaction that no longer exists in
       --   the client's mempool, the client will just exclude it from the
       --   response. However, we still need to acknowledge it (i.e. remove it
       --   from the 'unacknowledgedTxIds') in order to note that we're no
       --   longer awaiting receipt of that transaction.
       --
       -- * Transaction IDs mapped to 'Nothing' can represent transactions
       --   that were not requested from the client because they're already
       --   in the mempool.
       --
       --   For example, if we request some transaction IDs and notice that
       --   some subset of them have are already in the mempool, we wouldn't
       --   want to bother asking for those specific transactions. Therefore,
       --   we would just insert those transaction IDs mapped to 'Nothing' to
       --   the 'bufferedTxs' such that those transactions are acknowledged,
       --   but never actually requested.
       --
       forall txid tx. ServerState txid tx -> Map txid (Maybe tx)
bufferedTxs            :: !(Map txid (Maybe tx)),

       -- | The number of transactions we can acknowledge on our next request
       -- for more transactions. The number here have already been removed from
       -- 'unacknowledgedTxIds'.
       --
       forall txid tx. ServerState txid tx -> Word16
numTxsToAcknowledge    :: !Word16
     }
  deriving (Int -> ServerState txid tx -> ShowS
[ServerState txid tx] -> ShowS
ServerState txid tx -> String
(Int -> ServerState txid tx -> ShowS)
-> (ServerState txid tx -> String)
-> ([ServerState txid tx] -> ShowS)
-> Show (ServerState txid tx)
forall a.
(Int -> a -> ShowS) -> (a -> String) -> ([a] -> ShowS) -> Show a
forall txid tx.
(Show txid, Show tx) =>
Int -> ServerState txid tx -> ShowS
forall txid tx.
(Show txid, Show tx) =>
[ServerState txid tx] -> ShowS
forall txid tx.
(Show txid, Show tx) =>
ServerState txid tx -> String
$cshowsPrec :: forall txid tx.
(Show txid, Show tx) =>
Int -> ServerState txid tx -> ShowS
showsPrec :: Int -> ServerState txid tx -> ShowS
$cshow :: forall txid tx.
(Show txid, Show tx) =>
ServerState txid tx -> String
show :: ServerState txid tx -> String
$cshowList :: forall txid tx.
(Show txid, Show tx) =>
[ServerState txid tx] -> ShowS
showList :: [ServerState txid tx] -> ShowS
Show, (forall x. ServerState txid tx -> Rep (ServerState txid tx) x)
-> (forall x. Rep (ServerState txid tx) x -> ServerState txid tx)
-> Generic (ServerState txid tx)
forall x. Rep (ServerState txid tx) x -> ServerState txid tx
forall x. ServerState txid tx -> Rep (ServerState txid tx) x
forall a.
(forall x. a -> Rep a x) -> (forall x. Rep a x -> a) -> Generic a
forall txid tx x.
Rep (ServerState txid tx) x -> ServerState txid tx
forall txid tx x.
ServerState txid tx -> Rep (ServerState txid tx) x
$cfrom :: forall txid tx x.
ServerState txid tx -> Rep (ServerState txid tx) x
from :: forall x. ServerState txid tx -> Rep (ServerState txid tx) x
$cto :: forall txid tx x.
Rep (ServerState txid tx) x -> ServerState txid tx
to :: forall x. Rep (ServerState txid tx) x -> ServerState txid tx
Generic)

instance ( NoThunks txid
         , NoThunks tx
         ) => NoThunks (ServerState txid tx)

initialServerState :: ServerState txid tx
initialServerState :: forall txid tx. ServerState txid tx
initialServerState = Word16
-> StrictSeq txid
-> Map txid SizeInBytes
-> Map txid (Maybe tx)
-> Word16
-> ServerState txid tx
forall txid tx.
Word16
-> StrictSeq txid
-> Map txid SizeInBytes
-> Map txid (Maybe tx)
-> Word16
-> ServerState txid tx
ServerState Word16
0 StrictSeq txid
forall a. StrictSeq a
Seq.empty Map txid SizeInBytes
forall k a. Map k a
Map.empty Map txid (Maybe tx)
forall k a. Map k a
Map.empty Word16
0


txSubmissionInbound
  :: forall txid tx idx m.
     ( Ord txid
     , NoThunks txid
     , NoThunks tx
     , MonadSTM m
     , MonadThrow m
     , MonadDelay m
     )
  => Tracer m (TraceTxSubmissionInbound txid tx)
  -> NumTxIdsToAck  -- ^ Maximum number of unacknowledged txids allowed
  -> TxSubmissionMempoolReader txid tx idx m
  -> TxSubmissionMempoolWriter txid tx idx m
  -> NodeToNodeVersion
  -> TxSubmissionServerPipelined txid tx m ()
txSubmissionInbound :: forall txid tx idx (m :: * -> *).
(Ord txid, NoThunks txid, NoThunks tx, MonadSTM m, MonadThrow m,
 MonadDelay m) =>
Tracer m (TraceTxSubmissionInbound txid tx)
-> NumTxIdsToAck
-> TxSubmissionMempoolReader txid tx idx m
-> TxSubmissionMempoolWriter txid tx idx m
-> NodeToNodeVersion
-> TxSubmissionServerPipelined txid tx m ()
txSubmissionInbound Tracer m (TraceTxSubmissionInbound txid tx)
tracer (NumTxIdsToAck Word16
maxUnacked) TxSubmissionMempoolReader txid tx idx m
mpReader TxSubmissionMempoolWriter txid tx idx m
mpWriter NodeToNodeVersion
_version =
    m (ServerStIdle 'Z txid tx m ())
-> TxSubmissionServerPipelined txid tx m ()
forall (m :: * -> *) txid tx a.
m (ServerStIdle 'Z txid tx m a)
-> TxSubmissionServerPipelined txid tx m a
TxSubmissionServerPipelined (m (ServerStIdle 'Z txid tx m ())
 -> TxSubmissionServerPipelined txid tx m ())
-> m (ServerStIdle 'Z txid tx m ())
-> TxSubmissionServerPipelined txid tx m ()
forall a b. (a -> b) -> a -> b
$ do
#ifdef TXSUBMISSION_DELAY
      -- make the client linger before asking for tx's and expending
      -- our resources as well, as he may disconnect for some reason
      DiffTime -> m ()
forall (m :: * -> *). MonadDelay m => DiffTime -> m ()
threadDelay (DiffTime -> Maybe DiffTime -> DiffTime
forall a. a -> Maybe a -> a
fromMaybe (-DiffTime
1) Maybe DiffTime
longWait)
#endif
      StatefulM (ServerState txid tx) 'Z txid tx m
-> ServerState txid tx -> m (ServerStIdle 'Z txid tx m ())
forall s (n :: N) txid tx (m :: * -> *).
NoThunks s =>
StatefulM s n txid tx m -> s -> m (ServerStIdle n txid tx m ())
continueWithStateM (Nat 'Z -> StatefulM (ServerState txid tx) 'Z txid tx m
forall (n :: N).
Nat n -> StatefulM (ServerState txid tx) n txid tx m
serverIdle Nat 'Z
forall (n :: N). ('Z ~ n) => Nat n
Zero) ServerState txid tx
forall txid tx. ServerState txid tx
initialServerState
  where
    -- TODO #1656: replace these fixed limits by policies based on
    -- SizeInBytes and delta-Q and the bandwidth/delay product.
    -- These numbers are for demo purposes only, the throughput will be low.
    maxTxIdsToRequest :: Word16
maxTxIdsToRequest = Word16
3 :: Word16
    maxTxToRequest :: Word16
maxTxToRequest    = Word16
2 :: Word16

    TxSubmissionMempoolReader{STM m (MempoolSnapshot txid tx idx)
mempoolGetSnapshot :: STM m (MempoolSnapshot txid tx idx)
mempoolGetSnapshot :: forall txid tx idx (m :: * -> *).
TxSubmissionMempoolReader txid tx idx m
-> STM m (MempoolSnapshot txid tx idx)
mempoolGetSnapshot} = TxSubmissionMempoolReader txid tx idx m
mpReader

    TxSubmissionMempoolWriter
      { tx -> txid
txId :: forall txid tx idx (m :: * -> *).
TxSubmissionMempoolWriter txid tx idx m -> tx -> txid
txId :: tx -> txid
txId
      , [tx] -> m [txid]
mempoolAddTxs :: forall txid tx idx (m :: * -> *).
TxSubmissionMempoolWriter txid tx idx m -> [tx] -> m [txid]
mempoolAddTxs :: [tx] -> m [txid]
mempoolAddTxs
      } = TxSubmissionMempoolWriter txid tx idx m
mpWriter

    serverIdle :: forall (n :: N).
                  Nat n
               -> StatefulM (ServerState txid tx) n txid tx m
    serverIdle :: forall (n :: N).
Nat n -> StatefulM (ServerState txid tx) n txid tx m
serverIdle Nat n
n = (ServerState txid tx -> m (ServerStIdle n txid tx m ()))
-> StatefulM (ServerState txid tx) n txid tx m
forall s (n :: N) txid tx (m :: * -> *).
(s -> m (ServerStIdle n txid tx m ())) -> StatefulM s n txid tx m
StatefulM ((ServerState txid tx -> m (ServerStIdle n txid tx m ()))
 -> StatefulM (ServerState txid tx) n txid tx m)
-> (ServerState txid tx -> m (ServerStIdle n txid tx m ()))
-> StatefulM (ServerState txid tx) n txid tx m
forall a b. (a -> b) -> a -> b
$ \ServerState txid tx
st -> case Nat n
n of
        Nat n
Zero -> do
          if ServerState txid tx -> Bool
forall k. ServerState k tx -> Bool
canRequestMoreTxs ServerState txid tx
st
          then do
            -- There are no replies in flight, but we do know some more txs we
            -- can ask for, so lets ask for them and more txids.
            Tracer m (TraceTxSubmissionInbound txid tx)
-> TraceTxSubmissionInbound txid tx -> m ()
forall (m :: * -> *) a. Tracer m a -> a -> m ()
traceWith Tracer m (TraceTxSubmissionInbound txid tx)
tracer (Int -> TraceTxSubmissionInbound txid tx
forall txid tx. Int -> TraceTxSubmissionInbound txid tx
TraceTxInboundCanRequestMoreTxs (Nat n -> Int
forall (n :: N). Nat n -> Int
natToInt Nat n
n))
            ServerStIdle n txid tx m () -> m (ServerStIdle n txid tx m ())
forall a. a -> m a
forall (f :: * -> *) a. Applicative f => a -> f a
pure (ServerStIdle n txid tx m () -> m (ServerStIdle n txid tx m ()))
-> ServerStIdle n txid tx m () -> m (ServerStIdle n txid tx m ())
forall a b. (a -> b) -> a -> b
$ Stateful (ServerState txid tx) n txid tx m
-> ServerState txid tx -> ServerStIdle n txid tx m ()
forall s (n :: N) txid tx (m :: * -> *).
NoThunks s =>
Stateful s n txid tx m -> s -> ServerStIdle n txid tx m ()
continueWithState (Nat n -> Stateful (ServerState txid tx) n txid tx m
forall (n :: N).
Nat n -> Stateful (ServerState txid tx) n txid tx m
serverReqTxs Nat n
forall (n :: N). ('Z ~ n) => Nat n
Zero) ServerState txid tx
st

          else do
            Tracer m (TraceTxSubmissionInbound txid tx)
-> TraceTxSubmissionInbound txid tx -> m ()
forall (m :: * -> *) a. Tracer m a -> a -> m ()
traceWith Tracer m (TraceTxSubmissionInbound txid tx)
tracer (Int -> TraceTxSubmissionInbound txid tx
forall txid tx. Int -> TraceTxSubmissionInbound txid tx
TraceTxInboundCannotRequestMoreTxs (Nat n -> Int
forall (n :: N). Nat n -> Int
natToInt Nat n
n))
            -- There's no replies in flight, and we have no more txs we can
            -- ask for so the only remaining thing to do is to ask for more
            -- txids. Since this is the only thing to do now, we make this a
            -- blocking call.
            let numTxIdsToRequest :: Word16
numTxIdsToRequest = Word16
maxTxIdsToRequest Word16 -> Word16 -> Word16
forall a. Ord a => a -> a -> a
`min` Word16
maxUnacked
            Bool
-> m (ServerStIdle n txid tx m ())
-> m (ServerStIdle n txid tx m ())
forall a. (?callStack::CallStack) => Bool -> a -> a
assert (ServerState txid tx -> Word16
forall txid tx. ServerState txid tx -> Word16
requestedTxIdsInFlight ServerState txid tx
st Word16 -> Word16 -> Bool
forall a. Eq a => a -> a -> Bool
== Word16
0
                  Bool -> Bool -> Bool
&& StrictSeq txid -> Bool
forall a. StrictSeq a -> Bool
Seq.null (ServerState txid tx -> StrictSeq txid
forall txid tx. ServerState txid tx -> StrictSeq txid
unacknowledgedTxIds ServerState txid tx
st)
                  Bool -> Bool -> Bool
&& Map txid SizeInBytes -> Bool
forall k a. Map k a -> Bool
Map.null (ServerState txid tx -> Map txid SizeInBytes
forall txid tx. ServerState txid tx -> Map txid SizeInBytes
availableTxids ServerState txid tx
st)
                  Bool -> Bool -> Bool
&& Map txid (Maybe tx) -> Bool
forall k a. Map k a -> Bool
Map.null (ServerState txid tx -> Map txid (Maybe tx)
forall txid tx. ServerState txid tx -> Map txid (Maybe tx)
bufferedTxs ServerState txid tx
st)) (m (ServerStIdle n txid tx m ())
 -> m (ServerStIdle n txid tx m ()))
-> m (ServerStIdle n txid tx m ())
-> m (ServerStIdle n txid tx m ())
forall a b. (a -> b) -> a -> b
$
              ServerStIdle n txid tx m () -> m (ServerStIdle n txid tx m ())
forall a. a -> m a
forall (f :: * -> *) a. Applicative f => a -> f a
pure (ServerStIdle n txid tx m () -> m (ServerStIdle n txid tx m ()))
-> ServerStIdle n txid tx m () -> m (ServerStIdle n txid tx m ())
forall a b. (a -> b) -> a -> b
$
              NumTxIdsToAck
-> NumTxIdsToReq
-> m ()
-> (NonEmpty (txid, SizeInBytes)
    -> m (ServerStIdle 'Z txid tx m ()))
-> ServerStIdle 'Z txid tx m ()
forall (m :: * -> *) a txid tx.
NumTxIdsToAck
-> NumTxIdsToReq
-> m a
-> (NonEmpty (txid, SizeInBytes)
    -> m (ServerStIdle 'Z txid tx m a))
-> ServerStIdle 'Z txid tx m a
SendMsgRequestTxIdsBlocking
                (Word16 -> NumTxIdsToAck
NumTxIdsToAck (ServerState txid tx -> Word16
forall txid tx. ServerState txid tx -> Word16
numTxsToAcknowledge ServerState txid tx
st))
                (Word16 -> NumTxIdsToReq
NumTxIdsToReq Word16
numTxIdsToRequest)
                -- Our result if the client terminates the protocol
                (Tracer m (TraceTxSubmissionInbound txid tx)
-> TraceTxSubmissionInbound txid tx -> m ()
forall (m :: * -> *) a. Tracer m a -> a -> m ()
traceWith Tracer m (TraceTxSubmissionInbound txid tx)
tracer TraceTxSubmissionInbound txid tx
forall txid tx. TraceTxSubmissionInbound txid tx
TraceTxInboundTerminated)
                ( StatefulCollect (ServerState txid tx) 'Z txid tx m
-> ServerState txid tx
-> Collect txid tx
-> m (ServerStIdle 'Z txid tx m ())
forall s (n :: N) txid tx (m :: * -> *).
NoThunks s =>
StatefulCollect s n txid tx m
-> s -> Collect txid tx -> m (ServerStIdle n txid tx m ())
collectAndContinueWithState (Nat 'Z -> StatefulCollect (ServerState txid tx) 'Z txid tx m
forall (n :: N).
Nat n -> StatefulCollect (ServerState txid tx) n txid tx m
handleReply Nat 'Z
forall (n :: N). ('Z ~ n) => Nat n
Zero) ServerState txid tx
st {
                    numTxsToAcknowledge    = 0,
                    requestedTxIdsInFlight = numTxIdsToRequest
                  }
                (Collect txid tx -> m (ServerStIdle 'Z txid tx m ()))
-> (NonEmpty (txid, SizeInBytes) -> Collect txid tx)
-> NonEmpty (txid, SizeInBytes)
-> m (ServerStIdle 'Z txid tx m ())
forall b c a. (b -> c) -> (a -> b) -> a -> c
. NumTxIdsToReq -> [(txid, SizeInBytes)] -> Collect txid tx
forall txid tx.
NumTxIdsToReq -> [(txid, SizeInBytes)] -> Collect txid tx
CollectTxIds (Word16 -> NumTxIdsToReq
NumTxIdsToReq Word16
numTxIdsToRequest)
                ([(txid, SizeInBytes)] -> Collect txid tx)
-> (NonEmpty (txid, SizeInBytes) -> [(txid, SizeInBytes)])
-> NonEmpty (txid, SizeInBytes)
-> Collect txid tx
forall b c a. (b -> c) -> (a -> b) -> a -> c
. NonEmpty (txid, SizeInBytes) -> [(txid, SizeInBytes)]
forall a. NonEmpty a -> [a]
NonEmpty.toList)

        Succ Nat n
n' -> if ServerState txid tx -> Bool
forall k. ServerState k tx -> Bool
canRequestMoreTxs ServerState txid tx
st
          then do
            -- We have replies in flight and we should eagerly collect them if
            -- available, but there are transactions to request too so we
            -- should not block waiting for replies.
            --
            -- Having requested more transactions, we opportunistically ask
            -- for more txids in a non-blocking way. This is how we pipeline
            -- asking for both txs and txids.
            --
            -- It's important not to pipeline more requests for txids when we
            -- have no txs to ask for, since (with no other guard) this will
            -- put us into a busy-polling loop.
            --
            Tracer m (TraceTxSubmissionInbound txid tx)
-> TraceTxSubmissionInbound txid tx -> m ()
forall (m :: * -> *) a. Tracer m a -> a -> m ()
traceWith Tracer m (TraceTxSubmissionInbound txid tx)
tracer (Int -> TraceTxSubmissionInbound txid tx
forall txid tx. Int -> TraceTxSubmissionInbound txid tx
TraceTxInboundCanRequestMoreTxs (Nat n -> Int
forall (n :: N). Nat n -> Int
natToInt Nat n
n))
            ServerStIdle n txid tx m () -> m (ServerStIdle n txid tx m ())
forall a. a -> m a
forall (f :: * -> *) a. Applicative f => a -> f a
pure (ServerStIdle n txid tx m () -> m (ServerStIdle n txid tx m ()))
-> ServerStIdle n txid tx m () -> m (ServerStIdle n txid tx m ())
forall a b. (a -> b) -> a -> b
$ Maybe (ServerStIdle ('S n) txid tx m ())
-> (Collect txid tx -> m (ServerStIdle n txid tx m ()))
-> ServerStIdle ('S n) txid tx m ()
forall (n1 :: N) txid tx (m :: * -> *) a.
Maybe (ServerStIdle ('S n1) txid tx m a)
-> (Collect txid tx -> m (ServerStIdle n1 txid tx m a))
-> ServerStIdle ('S n1) txid tx m a
CollectPipelined
              (ServerStIdle ('S n) txid tx m ()
-> Maybe (ServerStIdle ('S n) txid tx m ())
forall a. a -> Maybe a
Just (Stateful (ServerState txid tx) ('S n) txid tx m
-> ServerState txid tx -> ServerStIdle ('S n) txid tx m ()
forall s (n :: N) txid tx (m :: * -> *).
NoThunks s =>
Stateful s n txid tx m -> s -> ServerStIdle n txid tx m ()
continueWithState (Nat ('S n) -> Stateful (ServerState txid tx) ('S n) txid tx m
forall (n :: N).
Nat n -> Stateful (ServerState txid tx) n txid tx m
serverReqTxs (Nat n -> Nat ('S n)
forall (m :: N) (n :: N). (m ~ 'S n) => Nat n -> Nat m
Succ Nat n
n')) ServerState txid tx
st))
              (StatefulCollect (ServerState txid tx) n txid tx m
-> ServerState txid tx
-> Collect txid tx
-> m (ServerStIdle n txid tx m ())
forall s (n :: N) txid tx (m :: * -> *).
NoThunks s =>
StatefulCollect s n txid tx m
-> s -> Collect txid tx -> m (ServerStIdle n txid tx m ())
collectAndContinueWithState (Nat n -> StatefulCollect (ServerState txid tx) n txid tx m
forall (n :: N).
Nat n -> StatefulCollect (ServerState txid tx) n txid tx m
handleReply Nat n
n') ServerState txid tx
st)

          else do
            Tracer m (TraceTxSubmissionInbound txid tx)
-> TraceTxSubmissionInbound txid tx -> m ()
forall (m :: * -> *) a. Tracer m a -> a -> m ()
traceWith Tracer m (TraceTxSubmissionInbound txid tx)
tracer (Int -> TraceTxSubmissionInbound txid tx
forall txid tx. Int -> TraceTxSubmissionInbound txid tx
TraceTxInboundCannotRequestMoreTxs (Nat n -> Int
forall (n :: N). Nat n -> Int
natToInt Nat n
n))
            -- In this case there is nothing else to do so we block until we
            -- collect a reply.
            ServerStIdle n txid tx m () -> m (ServerStIdle n txid tx m ())
forall a. a -> m a
forall (f :: * -> *) a. Applicative f => a -> f a
pure (ServerStIdle n txid tx m () -> m (ServerStIdle n txid tx m ()))
-> ServerStIdle n txid tx m () -> m (ServerStIdle n txid tx m ())
forall a b. (a -> b) -> a -> b
$ Maybe (ServerStIdle ('S n) txid tx m ())
-> (Collect txid tx -> m (ServerStIdle n txid tx m ()))
-> ServerStIdle ('S n) txid tx m ()
forall (n1 :: N) txid tx (m :: * -> *) a.
Maybe (ServerStIdle ('S n1) txid tx m a)
-> (Collect txid tx -> m (ServerStIdle n1 txid tx m a))
-> ServerStIdle ('S n1) txid tx m a
CollectPipelined
              Maybe (ServerStIdle ('S n) txid tx m ())
forall a. Maybe a
Nothing
              (StatefulCollect (ServerState txid tx) n txid tx m
-> ServerState txid tx
-> Collect txid tx
-> m (ServerStIdle n txid tx m ())
forall s (n :: N) txid tx (m :: * -> *).
NoThunks s =>
StatefulCollect s n txid tx m
-> s -> Collect txid tx -> m (ServerStIdle n txid tx m ())
collectAndContinueWithState (Nat n -> StatefulCollect (ServerState txid tx) n txid tx m
forall (n :: N).
Nat n -> StatefulCollect (ServerState txid tx) n txid tx m
handleReply Nat n
n') ServerState txid tx
st)
      where
        canRequestMoreTxs :: ServerState k tx -> Bool
        canRequestMoreTxs :: forall k. ServerState k tx -> Bool
canRequestMoreTxs ServerState k tx
st =
            Bool -> Bool
not (Map k SizeInBytes -> Bool
forall k a. Map k a -> Bool
Map.null (ServerState k tx -> Map k SizeInBytes
forall txid tx. ServerState txid tx -> Map txid SizeInBytes
availableTxids ServerState k tx
st))

    handleReply :: forall (n :: N).
                   Nat n
                -> StatefulCollect (ServerState txid tx) n txid tx m
    handleReply :: forall (n :: N).
Nat n -> StatefulCollect (ServerState txid tx) n txid tx m
handleReply Nat n
n = (ServerState txid tx
 -> Collect txid tx -> m (ServerStIdle n txid tx m ()))
-> StatefulCollect (ServerState txid tx) n txid tx m
forall s (n :: N) txid tx (m :: * -> *).
(s -> Collect txid tx -> m (ServerStIdle n txid tx m ()))
-> StatefulCollect s n txid tx m
StatefulCollect ((ServerState txid tx
  -> Collect txid tx -> m (ServerStIdle n txid tx m ()))
 -> StatefulCollect (ServerState txid tx) n txid tx m)
-> (ServerState txid tx
    -> Collect txid tx -> m (ServerStIdle n txid tx m ()))
-> StatefulCollect (ServerState txid tx) n txid tx m
forall a b. (a -> b) -> a -> b
$ \ServerState txid tx
st Collect txid tx
collect -> case Collect txid tx
collect of
      CollectTxIds (NumTxIdsToReq Word16
reqNo) [(txid, SizeInBytes)]
txids -> do
        -- Check they didn't send more than we asked for. We don't need to
        -- check for a minimum: the blocking case checks for non-zero
        -- elsewhere, and for the non-blocking case it is quite normal for
        -- them to send us none.
        let txidsSeq :: StrictSeq txid
txidsSeq = [txid] -> StrictSeq txid
forall a. [a] -> StrictSeq a
Seq.fromList (((txid, SizeInBytes) -> txid) -> [(txid, SizeInBytes)] -> [txid]
forall a b. (a -> b) -> [a] -> [b]
map (txid, SizeInBytes) -> txid
forall a b. (a, b) -> a
fst [(txid, SizeInBytes)]
txids)
            txidsMap :: Map txid SizeInBytes
txidsMap = [(txid, SizeInBytes)] -> Map txid SizeInBytes
forall k a. Ord k => [(k, a)] -> Map k a
Map.fromList [(txid, SizeInBytes)]
txids

        Bool -> m () -> m ()
forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
unless (StrictSeq txid -> Int
forall a. StrictSeq a -> Int
Seq.length StrictSeq txid
txidsSeq Int -> Int -> Bool
forall a. Ord a => a -> a -> Bool
<= Word16 -> Int
forall a b. (Integral a, Num b) => a -> b
fromIntegral Word16
reqNo) (m () -> m ()) -> m () -> m ()
forall a b. (a -> b) -> a -> b
$
          TxSubmissionProtocolError -> m ()
forall e a. Exception e => e -> m a
forall (m :: * -> *) e a. (MonadThrow m, Exception e) => e -> m a
throwIO TxSubmissionProtocolError
ProtocolErrorTxIdsNotRequested

        -- Upon receiving a batch of new txids we extend our available set,
        -- and extended the unacknowledged sequence.
        --
        -- We also pre-emptively acknowledge those txids that are already in
        -- the mempool. This prevents us from requesting their corresponding
        -- transactions again in the future.
        let st' :: ServerState txid tx
st' = ServerState txid tx
st {
          requestedTxIdsInFlight = requestedTxIdsInFlight st - reqNo
        }
        mpSnapshot <- STM m (MempoolSnapshot txid tx idx)
-> m (MempoolSnapshot txid tx idx)
forall a. (?callStack::CallStack) => STM m a -> m a
forall (m :: * -> *) a.
(MonadSTM m, ?callStack::CallStack) =>
STM m a -> m a
atomically STM m (MempoolSnapshot txid tx idx)
mempoolGetSnapshot
        continueWithStateM
          (serverIdle n)
          (acknowledgeTxIds st' txidsSeq txidsMap mpSnapshot)

      CollectTxs [txid]
txids [tx]
txs -> do
        -- To start with we have to verify that the txs they have sent us do
        -- correspond to the txs we asked for. This is slightly complicated by
        -- the fact that in general we get a subset of the txs that we asked
        -- for. We should never get a tx we did not ask for. We take a strict
        -- approach to this and check it.
        --
        let txsMap :: Map txid tx
            txsMap :: Map txid tx
txsMap = [(txid, tx)] -> Map txid tx
forall k a. Ord k => [(k, a)] -> Map k a
Map.fromList [ (tx -> txid
txId tx
tx, tx
tx) | tx
tx <- [tx]
txs ]

            txidsReceived :: Set txid
txidsReceived  = Map txid tx -> Set txid
forall k a. Map k a -> Set k
Map.keysSet Map txid tx
txsMap
            txidsRequested :: Set txid
txidsRequested = [txid] -> Set txid
forall a. Ord a => [a] -> Set a
Set.fromList [txid]
txids

        Bool -> m () -> m ()
forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
unless (Set txid
txidsReceived Set txid -> Set txid -> Bool
forall a. Ord a => Set a -> Set a -> Bool
`Set.isSubsetOf` Set txid
txidsRequested) (m () -> m ()) -> m () -> m ()
forall a b. (a -> b) -> a -> b
$
          TxSubmissionProtocolError -> m ()
forall e a. Exception e => e -> m a
forall (m :: * -> *) e a. (MonadThrow m, Exception e) => e -> m a
throwIO TxSubmissionProtocolError
ProtocolErrorTxNotRequested

            -- We can match up all the txids we requested, with those we
            -- received.
        let txIdsRequestedWithTxsReceived :: Map txid (Maybe tx)
            txIdsRequestedWithTxsReceived :: Map txid (Maybe tx)
txIdsRequestedWithTxsReceived =
                (tx -> Maybe tx) -> Map txid tx -> Map txid (Maybe tx)
forall a b k. (a -> b) -> Map k a -> Map k b
Map.map tx -> Maybe tx
forall a. a -> Maybe a
Just Map txid tx
txsMap
             Map txid (Maybe tx) -> Map txid (Maybe tx) -> Map txid (Maybe tx)
forall a. Semigroup a => a -> a -> a
<> (txid -> Maybe tx) -> Set txid -> Map txid (Maybe tx)
forall k a. (k -> a) -> Set k -> Map k a
Map.fromSet (Maybe tx -> txid -> Maybe tx
forall a b. a -> b -> a
const Maybe tx
forall a. Maybe a
Nothing) Set txid
txidsRequested

            -- We still have to acknowledge the txids we were given. This
            -- combined with the fact that we request txs out of order means
            -- our bufferedTxs has to track all the txids we asked for, even
            -- though not all have replies.
            bufferedTxs1 :: Map txid (Maybe tx)
bufferedTxs1 = ServerState txid tx -> Map txid (Maybe tx)
forall txid tx. ServerState txid tx -> Map txid (Maybe tx)
bufferedTxs ServerState txid tx
st Map txid (Maybe tx) -> Map txid (Maybe tx) -> Map txid (Maybe tx)
forall a. Semigroup a => a -> a -> a
<> Map txid (Maybe tx)
txIdsRequestedWithTxsReceived

            -- We have to update the unacknowledgedTxIds here eagerly and not
            -- delay it to serverReqTxs, otherwise we could end up blocking in
            -- serverIdle on more pipelined results rather than being able to
            -- move on.

            -- Check if having received more txs we can now confirm any (in
            -- strict order in the unacknowledgedTxIds sequence).
            (StrictSeq txid
acknowledgedTxIds, StrictSeq txid
unacknowledgedTxIds') =
              (txid -> Bool)
-> StrictSeq txid -> (StrictSeq txid, StrictSeq txid)
forall a. (a -> Bool) -> StrictSeq a -> (StrictSeq a, StrictSeq a)
Seq.spanl (txid -> Map txid (Maybe tx) -> Bool
forall k a. Ord k => k -> Map k a -> Bool
`Map.member` Map txid (Maybe tx)
bufferedTxs1) (ServerState txid tx -> StrictSeq txid
forall txid tx. ServerState txid tx -> StrictSeq txid
unacknowledgedTxIds ServerState txid tx
st)

            -- If so we can submit the acknowledged txs to our local mempool
            txsReady :: [tx]
txsReady = (txid -> [tx] -> [tx]) -> [tx] -> StrictSeq txid -> [tx]
forall a b. (a -> b -> b) -> b -> StrictSeq a -> b
forall (t :: * -> *) a b.
Foldable t =>
(a -> b -> b) -> b -> t a -> b
foldr (\txid
txid [tx]
r -> [tx] -> (tx -> [tx]) -> Maybe tx -> [tx]
forall b a. b -> (a -> b) -> Maybe a -> b
maybe [tx]
r (tx -> [tx] -> [tx]
forall a. a -> [a] -> [a]
:[tx]
r) (Map txid (Maybe tx)
bufferedTxs1 Map txid (Maybe tx) -> txid -> Maybe tx
forall k a. Ord k => Map k a -> k -> a
Map.! txid
txid))
                             [] StrictSeq txid
acknowledgedTxIds

            -- And remove acknowledged txs from our buffer
            bufferedTxs2 :: Map txid (Maybe tx)
bufferedTxs2 = (Map txid (Maybe tx) -> txid -> Map txid (Maybe tx))
-> Map txid (Maybe tx) -> StrictSeq txid -> Map txid (Maybe tx)
forall b a. (b -> a -> b) -> b -> StrictSeq a -> b
forall (t :: * -> *) b a.
Foldable t =>
(b -> a -> b) -> b -> t a -> b
Foldable.foldl' ((txid -> Map txid (Maybe tx) -> Map txid (Maybe tx))
-> Map txid (Maybe tx) -> txid -> Map txid (Maybe tx)
forall a b c. (a -> b -> c) -> b -> a -> c
flip txid -> Map txid (Maybe tx) -> Map txid (Maybe tx)
forall k a. Ord k => k -> Map k a -> Map k a
Map.delete)
                                   Map txid (Maybe tx)
bufferedTxs1 StrictSeq txid
acknowledgedTxIds

            -- If we are acknowledging transactions that are still in
            -- unacknowledgedTxIds' we need to re-add them so that we also can
            -- acknowledge them again later. This will happen in case of
            -- duplicate txids within the same window.
            live :: [txid]
live = (txid -> Bool) -> [txid] -> [txid]
forall a. (a -> Bool) -> [a] -> [a]
filter (txid -> StrictSeq txid -> Bool
forall a. Eq a => a -> StrictSeq a -> Bool
forall (t :: * -> *) a. (Foldable t, Eq a) => a -> t a -> Bool
`elem` StrictSeq txid
unacknowledgedTxIds') ([txid] -> [txid]) -> [txid] -> [txid]
forall a b. (a -> b) -> a -> b
$ StrictSeq txid -> [txid]
forall a. StrictSeq a -> [a]
forall (t :: * -> *) a. Foldable t => t a -> [a]
toList StrictSeq txid
acknowledgedTxIds
            bufferedTxs3 :: Map txid (Maybe tx)
bufferedTxs3 = Map txid (Maybe tx) -> Map txid (Maybe tx)
forall (t :: * -> *) a. Foldable t => t a -> t a
forceElemsToWHNF (Map txid (Maybe tx) -> Map txid (Maybe tx))
-> Map txid (Maybe tx) -> Map txid (Maybe tx)
forall a b. (a -> b) -> a -> b
$ Map txid (Maybe tx)
bufferedTxs2 Map txid (Maybe tx) -> Map txid (Maybe tx) -> Map txid (Maybe tx)
forall a. Semigroup a => a -> a -> a
<>
                               [(txid, Maybe tx)] -> Map txid (Maybe tx)
forall k a. Ord k => [(k, a)] -> Map k a
Map.fromList ([txid] -> [Maybe tx] -> [(txid, Maybe tx)]
forall a b. [a] -> [b] -> [(a, b)]
zip [txid]
live (Maybe tx -> [Maybe tx]
forall a. a -> [a]
repeat Maybe tx
forall a. Maybe a
Nothing))

        let !collected :: Int
collected = [tx] -> Int
forall a. [a] -> Int
forall (t :: * -> *) a. Foldable t => t a -> Int
length [tx]
txs
        Tracer m (TraceTxSubmissionInbound txid tx)
-> TraceTxSubmissionInbound txid tx -> m ()
forall (m :: * -> *) a. Tracer m a -> a -> m ()
traceWith Tracer m (TraceTxSubmissionInbound txid tx)
tracer (TraceTxSubmissionInbound txid tx -> m ())
-> TraceTxSubmissionInbound txid tx -> m ()
forall a b. (a -> b) -> a -> b
$
          Int -> TraceTxSubmissionInbound txid tx
forall txid tx. Int -> TraceTxSubmissionInbound txid tx
TraceTxSubmissionCollected Int
collected

        txidsAccepted <- [tx] -> m [txid]
mempoolAddTxs [tx]
txsReady

        let !accepted = [txid] -> Int
forall a. [a] -> Int
forall (t :: * -> *) a. Foldable t => t a -> Int
length [txid]
txidsAccepted

        traceWith tracer $ TraceTxSubmissionProcessed ProcessedTxCount {
            ptxcAccepted = accepted
          , ptxcRejected = collected - accepted
          }

        continueWithStateM (serverIdle n) st {
          bufferedTxs         = bufferedTxs3,
          unacknowledgedTxIds = unacknowledgedTxIds',
          numTxsToAcknowledge = numTxsToAcknowledge st
                              + fromIntegral (Seq.length acknowledgedTxIds)
        }

    -- Pre-emptively acknowledge those of the available transaction IDs that
    -- are already in the mempool and return the updated 'ServerState'.
    --
    -- This enables us to effectively filter out transactions that we don't
    -- need to bother requesting from the client since they're already in the
    -- mempool.
    --
    acknowledgeTxIds :: ServerState txid tx
                     -> StrictSeq txid
                     -> Map txid SizeInBytes
                     -> MempoolSnapshot txid tx idx
                     -> ServerState txid tx
    acknowledgeTxIds :: ServerState txid tx
-> StrictSeq txid
-> Map txid SizeInBytes
-> MempoolSnapshot txid tx idx
-> ServerState txid tx
acknowledgeTxIds ServerState txid tx
st StrictSeq txid
txidsSeq Map txid SizeInBytes
_ MempoolSnapshot txid tx idx
_ | StrictSeq txid -> Bool
forall a. StrictSeq a -> Bool
Seq.null StrictSeq txid
txidsSeq  = ServerState txid tx
st
    acknowledgeTxIds ServerState txid tx
st StrictSeq txid
txidsSeq Map txid SizeInBytes
txidsMap MempoolSnapshot{txid -> Bool
mempoolHasTx :: txid -> Bool
mempoolHasTx :: forall txid tx idx. MempoolSnapshot txid tx idx -> txid -> Bool
mempoolHasTx} =
        -- Return the next 'ServerState'
        ServerState txid tx
st {
          availableTxids      = availableTxids',
          bufferedTxs         = bufferedTxs'',
          unacknowledgedTxIds = unacknowledgedTxIds'',
          numTxsToAcknowledge = numTxsToAcknowledge st
                              + fromIntegral (Seq.length acknowledgedTxIds)
        }
      where

        -- Divide the new txids in two: those that are already in the
        -- mempool or in flight and those that are not. We'll request some txs from the
        -- latter.
        (Map txid SizeInBytes
ignoredTxids, Map txid SizeInBytes
availableTxidsMp) =
              (txid -> SizeInBytes -> Bool)
-> Map txid SizeInBytes
-> (Map txid SizeInBytes, Map txid SizeInBytes)
forall k a. (k -> a -> Bool) -> Map k a -> (Map k a, Map k a)
Map.partitionWithKey
                (\txid
txid SizeInBytes
_ -> txid -> Bool
mempoolHasTx txid
txid)
                Map txid SizeInBytes
txidsMap

        availableTxidsU :: Map txid SizeInBytes
availableTxidsU =
              (txid -> SizeInBytes -> Bool)
-> Map txid SizeInBytes -> Map txid SizeInBytes
forall k a. (k -> a -> Bool) -> Map k a -> Map k a
Map.filterWithKey
                (\txid
txid SizeInBytes
_ -> txid -> StrictSeq txid -> Bool
forall (t :: * -> *) a. (Foldable t, Eq a) => a -> t a -> Bool
notElem txid
txid (ServerState txid tx -> StrictSeq txid
forall txid tx. ServerState txid tx -> StrictSeq txid
unacknowledgedTxIds ServerState txid tx
st))
                Map txid SizeInBytes
txidsMap

        availableTxids' :: Map txid SizeInBytes
availableTxids' = ServerState txid tx -> Map txid SizeInBytes
forall txid tx. ServerState txid tx -> Map txid SizeInBytes
availableTxids ServerState txid tx
st Map txid SizeInBytes
-> Map txid SizeInBytes -> Map txid SizeInBytes
forall a. Semigroup a => a -> a -> a
<> Map txid SizeInBytes
-> Map txid SizeInBytes -> Map txid SizeInBytes
forall k a b. Ord k => Map k a -> Map k b -> Map k a
Map.intersection Map txid SizeInBytes
availableTxidsMp Map txid SizeInBytes
availableTxidsU

        -- The txs that we intentionally don't request, because they are
        -- already in the mempool, need to be acknowledged.
        --
        -- So we extend bufferedTxs with those txs (so of course they have
        -- no corresponding reply).
        bufferedTxs' :: Map txid (Maybe tx)
bufferedTxs' = ServerState txid tx -> Map txid (Maybe tx)
forall txid tx. ServerState txid tx -> Map txid (Maybe tx)
bufferedTxs ServerState txid tx
st
                    Map txid (Maybe tx) -> Map txid (Maybe tx) -> Map txid (Maybe tx)
forall a. Semigroup a => a -> a -> a
<> (SizeInBytes -> Maybe tx)
-> Map txid SizeInBytes -> Map txid (Maybe tx)
forall a b k. (a -> b) -> Map k a -> Map k b
Map.map (Maybe tx -> SizeInBytes -> Maybe tx
forall a b. a -> b -> a
const Maybe tx
forall a. Maybe a
Nothing) Map txid SizeInBytes
ignoredTxids

        unacknowledgedTxIds' :: StrictSeq txid
unacknowledgedTxIds' = ServerState txid tx -> StrictSeq txid
forall txid tx. ServerState txid tx -> StrictSeq txid
unacknowledgedTxIds ServerState txid tx
st StrictSeq txid -> StrictSeq txid -> StrictSeq txid
forall a. Semigroup a => a -> a -> a
<> StrictSeq txid
txidsSeq

        -- Check if having decided not to request more txs we can now
        -- confirm any txids (in strict order in the unacknowledgedTxIds
        -- sequence). This is used in the 'numTxsToAcknowledge' below
        -- which will then be used next time we SendMsgRequestTxIds.
        --
        (StrictSeq txid
acknowledgedTxIds, StrictSeq txid
unacknowledgedTxIds'') =
          (txid -> Bool)
-> StrictSeq txid -> (StrictSeq txid, StrictSeq txid)
forall a. (a -> Bool) -> StrictSeq a -> (StrictSeq a, StrictSeq a)
Seq.spanl (txid -> Map txid (Maybe tx) -> Bool
forall k a. Ord k => k -> Map k a -> Bool
`Map.member` Map txid (Maybe tx)
bufferedTxs') StrictSeq txid
unacknowledgedTxIds'


        -- If so we can remove acknowledged txs from our buffer provided that they
        -- are not still in unacknowledgedTxIds''. This happens in case of duplicate
        -- txids.
        bufferedTxs'' :: Map txid (Maybe tx)
bufferedTxs'' = Map txid (Maybe tx) -> Map txid (Maybe tx)
forall (t :: * -> *) a. Foldable t => t a -> t a
forceElemsToWHNF (Map txid (Maybe tx) -> Map txid (Maybe tx))
-> Map txid (Maybe tx) -> Map txid (Maybe tx)
forall a b. (a -> b) -> a -> b
$ (Map txid (Maybe tx) -> txid -> Map txid (Maybe tx))
-> Map txid (Maybe tx) -> StrictSeq txid -> Map txid (Maybe tx)
forall b a. (b -> a -> b) -> b -> StrictSeq a -> b
forall (t :: * -> *) b a.
Foldable t =>
(b -> a -> b) -> b -> t a -> b
Foldable.foldl' (\Map txid (Maybe tx)
m txid
txid -> if txid -> StrictSeq txid -> Bool
forall a. Eq a => a -> StrictSeq a -> Bool
forall (t :: * -> *) a. (Foldable t, Eq a) => a -> t a -> Bool
elem txid
txid StrictSeq txid
unacknowledgedTxIds''
                                              then Map txid (Maybe tx)
m
                                              else txid -> Map txid (Maybe tx) -> Map txid (Maybe tx)
forall k a. Ord k => k -> Map k a -> Map k a
Map.delete txid
txid Map txid (Maybe tx)
m)
                                Map txid (Maybe tx)
bufferedTxs' StrictSeq txid
acknowledgedTxIds

    serverReqTxs :: forall (n :: N).
                    Nat n
                 -> Stateful (ServerState txid tx) n txid tx m
    serverReqTxs :: forall (n :: N).
Nat n -> Stateful (ServerState txid tx) n txid tx m
serverReqTxs Nat n
n = (ServerState txid tx -> ServerStIdle n txid tx m ())
-> Stateful (ServerState txid tx) n txid tx m
forall s (n :: N) txid tx (m :: * -> *).
(s -> ServerStIdle n txid tx m ()) -> Stateful s n txid tx m
Stateful ((ServerState txid tx -> ServerStIdle n txid tx m ())
 -> Stateful (ServerState txid tx) n txid tx m)
-> (ServerState txid tx -> ServerStIdle n txid tx m ())
-> Stateful (ServerState txid tx) n txid tx m
forall a b. (a -> b) -> a -> b
$ \ServerState txid tx
st -> do
        -- TODO: This implementation is deliberately naive, we pick in an
        -- arbitrary order and up to a fixed limit. This is to illustrate
        -- that we can request txs out of order. In the final version we will
        -- try to pick in-order and only pick out of order when we have to.
        -- We will also uses the size of txs in bytes as our limit for
        -- upper and lower watermarks for pipelining. We'll also use the
        -- amount in flight and delta-Q to estimate when we're in danger of
        -- becoming idle, and need to request stalled txs.
        --
        let (Map txid SizeInBytes
txsToRequest, Map txid SizeInBytes
availableTxids') =
              Int
-> Map txid SizeInBytes
-> (Map txid SizeInBytes, Map txid SizeInBytes)
forall k a. Int -> Map k a -> (Map k a, Map k a)
Map.splitAt (Word16 -> Int
forall a b. (Integral a, Num b) => a -> b
fromIntegral Word16
maxTxToRequest) (ServerState txid tx -> Map txid SizeInBytes
forall txid tx. ServerState txid tx -> Map txid SizeInBytes
availableTxids ServerState txid tx
st)

        [txid]
-> m (ServerStIdle ('S n) txid tx m ())
-> ServerStIdle n txid tx m ()
forall txid (m :: * -> *) (n :: N) tx a.
[txid]
-> m (ServerStIdle ('S n) txid tx m a)
-> ServerStIdle n txid tx m a
SendMsgRequestTxsPipelined
          (Map txid SizeInBytes -> [txid]
forall k a. Map k a -> [k]
Map.keys Map txid SizeInBytes
txsToRequest)
          (StatefulM (ServerState txid tx) ('S n) txid tx m
-> ServerState txid tx -> m (ServerStIdle ('S n) txid tx m ())
forall s (n :: N) txid tx (m :: * -> *).
NoThunks s =>
StatefulM s n txid tx m -> s -> m (ServerStIdle n txid tx m ())
continueWithStateM (Nat ('S n) -> StatefulM (ServerState txid tx) ('S n) txid tx m
forall (n :: N).
Nat n -> StatefulM (ServerState txid tx) n txid tx m
serverReqTxIds (Nat n -> Nat ('S n)
forall (m :: N) (n :: N). (m ~ 'S n) => Nat n -> Nat m
Succ Nat n
n)) ServerState txid tx
st {
             availableTxids = availableTxids'
           })

    serverReqTxIds :: forall (n :: N).
                      Nat n
                   -> StatefulM (ServerState txid tx) n txid tx m
    serverReqTxIds :: forall (n :: N).
Nat n -> StatefulM (ServerState txid tx) n txid tx m
serverReqTxIds Nat n
n = (ServerState txid tx -> m (ServerStIdle n txid tx m ()))
-> StatefulM (ServerState txid tx) n txid tx m
forall s (n :: N) txid tx (m :: * -> *).
(s -> m (ServerStIdle n txid tx m ())) -> StatefulM s n txid tx m
StatefulM ((ServerState txid tx -> m (ServerStIdle n txid tx m ()))
 -> StatefulM (ServerState txid tx) n txid tx m)
-> (ServerState txid tx -> m (ServerStIdle n txid tx m ()))
-> StatefulM (ServerState txid tx) n txid tx m
forall a b. (a -> b) -> a -> b
$ \ServerState txid tx
st -> do
          -- This definition is justified by the fact that the
          -- 'numTxsToAcknowledge' are not included in the
          -- 'unacknowledgedTxIds'.
      let numTxIdsToRequest :: Word16
numTxIdsToRequest =
                  (Word16
maxUnacked
                    Word16 -> Word16 -> Word16
forall a. Num a => a -> a -> a
- Int -> Word16
forall a b. (Integral a, Num b) => a -> b
fromIntegral (StrictSeq txid -> Int
forall a. StrictSeq a -> Int
Seq.length (ServerState txid tx -> StrictSeq txid
forall txid tx. ServerState txid tx -> StrictSeq txid
unacknowledgedTxIds ServerState txid tx
st))
                    Word16 -> Word16 -> Word16
forall a. Num a => a -> a -> a
- ServerState txid tx -> Word16
forall txid tx. ServerState txid tx -> Word16
requestedTxIdsInFlight ServerState txid tx
st)
            Word16 -> Word16 -> Word16
forall a. Ord a => a -> a -> a
`min` Word16
maxTxIdsToRequest

      if Word16
numTxIdsToRequest Word16 -> Word16 -> Bool
forall a. Ord a => a -> a -> Bool
> Word16
0
        then ServerStIdle n txid tx m () -> m (ServerStIdle n txid tx m ())
forall a. a -> m a
forall (f :: * -> *) a. Applicative f => a -> f a
pure (ServerStIdle n txid tx m () -> m (ServerStIdle n txid tx m ()))
-> ServerStIdle n txid tx m () -> m (ServerStIdle n txid tx m ())
forall a b. (a -> b) -> a -> b
$ NumTxIdsToAck
-> NumTxIdsToReq
-> m (ServerStIdle ('S n) txid tx m ())
-> ServerStIdle n txid tx m ()
forall (m :: * -> *) (n :: N) txid tx a.
NumTxIdsToAck
-> NumTxIdsToReq
-> m (ServerStIdle ('S n) txid tx m a)
-> ServerStIdle n txid tx m a
SendMsgRequestTxIdsPipelined
          (Word16 -> NumTxIdsToAck
NumTxIdsToAck (ServerState txid tx -> Word16
forall txid tx. ServerState txid tx -> Word16
numTxsToAcknowledge ServerState txid tx
st))
          (Word16 -> NumTxIdsToReq
NumTxIdsToReq Word16
numTxIdsToRequest)
          (StatefulM (ServerState txid tx) ('S n) txid tx m
-> ServerState txid tx -> m (ServerStIdle ('S n) txid tx m ())
forall s (n :: N) txid tx (m :: * -> *).
NoThunks s =>
StatefulM s n txid tx m -> s -> m (ServerStIdle n txid tx m ())
continueWithStateM (Nat ('S n) -> StatefulM (ServerState txid tx) ('S n) txid tx m
forall (n :: N).
Nat n -> StatefulM (ServerState txid tx) n txid tx m
serverIdle (Nat n -> Nat ('S n)
forall (m :: N) (n :: N). (m ~ 'S n) => Nat n -> Nat m
Succ Nat n
n)) ServerState txid tx
st {
                requestedTxIdsInFlight = requestedTxIdsInFlight st
                                       + numTxIdsToRequest,
                numTxsToAcknowledge    = 0
              })
        else StatefulM (ServerState txid tx) n txid tx m
-> ServerState txid tx -> m (ServerStIdle n txid tx m ())
forall s (n :: N) txid tx (m :: * -> *).
NoThunks s =>
StatefulM s n txid tx m -> s -> m (ServerStIdle n txid tx m ())
continueWithStateM (Nat n -> StatefulM (ServerState txid tx) n txid tx m
forall (n :: N).
Nat n -> StatefulM (ServerState txid tx) n txid tx m
serverIdle Nat n
n) ServerState txid tx
st

newtype Stateful s n txid tx m = Stateful (s -> ServerStIdle n txid tx m ())

newtype StatefulM s n txid tx m
  = StatefulM (s -> m (ServerStIdle n txid tx m ()))

newtype StatefulCollect s n txid tx m
  = StatefulCollect (s -> Collect txid tx -> m (ServerStIdle n txid tx m ()))

-- | After checking that there are no unexpected thunks in the provided state,
-- pass it to the provided function.
--
-- See 'checkInvariant' and 'unsafeNoThunks'.
continueWithState :: NoThunks s
                  => Stateful s n txid tx m
                  -> s
                  -> ServerStIdle n txid tx m ()
continueWithState :: forall s (n :: N) txid tx (m :: * -> *).
NoThunks s =>
Stateful s n txid tx m -> s -> ServerStIdle n txid tx m ()
continueWithState (Stateful s -> ServerStIdle n txid tx m ()
f) !s
st =
    Maybe String
-> ServerStIdle n txid tx m () -> ServerStIdle n txid tx m ()
forall a. (?callStack::CallStack) => Maybe String -> a -> a
checkInvariant (ThunkInfo -> String
forall a. Show a => a -> String
show (ThunkInfo -> String) -> Maybe ThunkInfo -> Maybe String
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> s -> Maybe ThunkInfo
forall a. NoThunks a => a -> Maybe ThunkInfo
unsafeNoThunks s
st) (s -> ServerStIdle n txid tx m ()
f s
st)

-- | A variant of 'continueWithState' to be more easily utilized with
-- 'serverIdle' and 'serverReqTxIds'.
continueWithStateM :: NoThunks s
                   => StatefulM s n txid tx m
                   -> s
                   -> m (ServerStIdle n txid tx m ())
continueWithStateM :: forall s (n :: N) txid tx (m :: * -> *).
NoThunks s =>
StatefulM s n txid tx m -> s -> m (ServerStIdle n txid tx m ())
continueWithStateM (StatefulM s -> m (ServerStIdle n txid tx m ())
f) !s
st =
    Maybe String
-> m (ServerStIdle n txid tx m ())
-> m (ServerStIdle n txid tx m ())
forall a. (?callStack::CallStack) => Maybe String -> a -> a
checkInvariant (ThunkInfo -> String
forall a. Show a => a -> String
show (ThunkInfo -> String) -> Maybe ThunkInfo -> Maybe String
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> s -> Maybe ThunkInfo
forall a. NoThunks a => a -> Maybe ThunkInfo
unsafeNoThunks s
st) (s -> m (ServerStIdle n txid tx m ())
f s
st)
{-# NOINLINE continueWithStateM #-}

-- | A variant of 'continueWithState' to be more easily utilized with
-- 'handleReply'.
collectAndContinueWithState :: NoThunks s
                            => StatefulCollect s n txid tx m
                            -> s
                            -> Collect txid tx
                            -> m (ServerStIdle n txid tx m ())
collectAndContinueWithState :: forall s (n :: N) txid tx (m :: * -> *).
NoThunks s =>
StatefulCollect s n txid tx m
-> s -> Collect txid tx -> m (ServerStIdle n txid tx m ())
collectAndContinueWithState (StatefulCollect s -> Collect txid tx -> m (ServerStIdle n txid tx m ())
f) !s
st Collect txid tx
c =
    Maybe String
-> m (ServerStIdle n txid tx m ())
-> m (ServerStIdle n txid tx m ())
forall a. (?callStack::CallStack) => Maybe String -> a -> a
checkInvariant (ThunkInfo -> String
forall a. Show a => a -> String
show (ThunkInfo -> String) -> Maybe ThunkInfo -> Maybe String
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> s -> Maybe ThunkInfo
forall a. NoThunks a => a -> Maybe ThunkInfo
unsafeNoThunks s
st) (s -> Collect txid tx -> m (ServerStIdle n txid tx m ())
f s
st Collect txid tx
c)
{-# NOINLINE collectAndContinueWithState #-}