{-# LANGUAGE BangPatterns        #-}
{-# LANGUAGE DataKinds           #-}
{-# LANGUAGE GADTs               #-}
{-# LANGUAGE KindSignatures      #-}
{-# LANGUAGE NamedFieldPuns      #-}
{-# LANGUAGE ScopedTypeVariables #-}
{-# LANGUAGE StandaloneDeriving  #-}

module Ouroboros.Network.Protocol.TxSubmission2.Examples
  ( txSubmissionClient
  , txSubmissionServer
  , TraceEventClient (..)
  , TraceEventServer (..)
  ) where

import Data.Foldable as Foldable (foldl', toList)
import Data.List.NonEmpty (NonEmpty (..))
import Data.List.NonEmpty qualified as NonEmpty
import Data.Map.Strict (Map)
import Data.Map.Strict qualified as Map
import Data.Sequence.Strict (StrictSeq)
import Data.Sequence.Strict qualified as Seq
import Data.Word (Word16)

import Control.Exception (assert)
import Control.Monad (when)
import Control.Tracer (Tracer, traceWith)

import Network.TypedProtocol.Pipelined (N, Nat (..))

import Ouroboros.Network.Protocol.TxSubmission2.Client
import Ouroboros.Network.Protocol.TxSubmission2.Server
import Ouroboros.Network.Protocol.TxSubmission2.Type


--
-- Example client
--

data TraceEventClient txid tx =
     EventRecvMsgRequestTxIds (StrictSeq txid) (Map txid tx) [tx] NumTxIdsToAck NumTxIdsToReq
   | EventRecvMsgRequestTxs   (StrictSeq txid) (Map txid tx) [tx] [txid]
  deriving Int -> TraceEventClient txid tx -> ShowS
[TraceEventClient txid tx] -> ShowS
TraceEventClient txid tx -> String
(Int -> TraceEventClient txid tx -> ShowS)
-> (TraceEventClient txid tx -> String)
-> ([TraceEventClient txid tx] -> ShowS)
-> Show (TraceEventClient txid tx)
forall a.
(Int -> a -> ShowS) -> (a -> String) -> ([a] -> ShowS) -> Show a
forall txid tx.
(Show txid, Show tx) =>
Int -> TraceEventClient txid tx -> ShowS
forall txid tx.
(Show txid, Show tx) =>
[TraceEventClient txid tx] -> ShowS
forall txid tx.
(Show txid, Show tx) =>
TraceEventClient txid tx -> String
$cshowsPrec :: forall txid tx.
(Show txid, Show tx) =>
Int -> TraceEventClient txid tx -> ShowS
showsPrec :: Int -> TraceEventClient txid tx -> ShowS
$cshow :: forall txid tx.
(Show txid, Show tx) =>
TraceEventClient txid tx -> String
show :: TraceEventClient txid tx -> String
$cshowList :: forall txid tx.
(Show txid, Show tx) =>
[TraceEventClient txid tx] -> ShowS
showList :: [TraceEventClient txid tx] -> ShowS
Show

-- | An example @'TxSubmissionClient'@ which sends transactions from a fixed
-- list of transactions.
--
-- It is intended to illustrate the protocol or for use in tests. The client
-- enforces aspects of the protocol. It will fail with a protocol error if
-- the peer asks for a transaction which is not in the unacknowledged set.
-- The unacknowledged set is managed such that things are removed after having
-- been requested. The net effect is that the peer can only ask for
-- * If a server will ask for
-- the same transaction twice.
--
txSubmissionClient
  :: forall txid tx m.
     (Ord txid, Show txid, Monad m)
  => Tracer m (TraceEventClient txid tx)
  -> (tx -> txid)
  -> (tx -> SizeInBytes)
  -> Word16  -- ^ Maximum number of unacknowledged txids allowed
  -> [tx]
  -> TxSubmissionClient txid tx m ()
txSubmissionClient :: forall txid tx (m :: * -> *).
(Ord txid, Show txid, Monad m) =>
Tracer m (TraceEventClient txid tx)
-> (tx -> txid)
-> (tx -> SizeInBytes)
-> Word16
-> [tx]
-> TxSubmissionClient txid tx m ()
txSubmissionClient Tracer m (TraceEventClient txid tx)
tracer tx -> txid
txId tx -> SizeInBytes
txSize Word16
maxUnacked =
    m (ClientStIdle txid tx m ()) -> TxSubmissionClient txid tx m ()
forall txid tx (m :: * -> *) a.
m (ClientStIdle txid tx m a) -> TxSubmissionClient txid tx m a
TxSubmissionClient (m (ClientStIdle txid tx m ()) -> TxSubmissionClient txid tx m ())
-> ([tx] -> m (ClientStIdle txid tx m ()))
-> [tx]
-> TxSubmissionClient txid tx m ()
forall b c a. (b -> c) -> (a -> b) -> a -> c
. ClientStIdle txid tx m () -> m (ClientStIdle txid tx m ())
forall a. a -> m a
forall (f :: * -> *) a. Applicative f => a -> f a
pure (ClientStIdle txid tx m () -> m (ClientStIdle txid tx m ()))
-> ([tx] -> ClientStIdle txid tx m ())
-> [tx]
-> m (ClientStIdle txid tx m ())
forall b c a. (b -> c) -> (a -> b) -> a -> c
. StrictSeq txid -> Map txid tx -> [tx] -> ClientStIdle txid tx m ()
client StrictSeq txid
forall a. StrictSeq a
Seq.empty Map txid tx
forall k a. Map k a
Map.empty
  where
    client :: StrictSeq txid -> Map txid tx -> [tx] -> ClientStIdle txid tx m ()
    client :: StrictSeq txid -> Map txid tx -> [tx] -> ClientStIdle txid tx m ()
client !StrictSeq txid
unackedSeq !Map txid tx
unackedMap [tx]
remainingTxs =
        Bool -> ClientStIdle txid tx m () -> ClientStIdle txid tx m ()
forall a. HasCallStack => Bool -> a -> a
assert Bool
invariant
        ClientStIdle { TokBlockingStyle blocking
-> NumTxIdsToAck
-> NumTxIdsToReq
-> m (ClientStTxIds blocking txid tx m ())
forall (blocking :: StBlockingStyle).
TokBlockingStyle blocking
-> NumTxIdsToAck
-> NumTxIdsToReq
-> m (ClientStTxIds blocking txid tx m ())
recvMsgRequestTxIds :: forall (blocking :: StBlockingStyle).
TokBlockingStyle blocking
-> NumTxIdsToAck
-> NumTxIdsToReq
-> m (ClientStTxIds blocking txid tx m ())
recvMsgRequestTxIds :: forall (blocking :: StBlockingStyle).
TokBlockingStyle blocking
-> NumTxIdsToAck
-> NumTxIdsToReq
-> m (ClientStTxIds blocking txid tx m ())
recvMsgRequestTxIds, [txid] -> m (ClientStTxs txid tx m ())
recvMsgRequestTxs :: [txid] -> m (ClientStTxs txid tx m ())
recvMsgRequestTxs :: [txid] -> m (ClientStTxs txid tx m ())
recvMsgRequestTxs }
      where
        -- The entries in the unackedMap are a subset of those in the
        -- unackedSeq. The sequence is all of them, whereas we remove
        -- entries from the map once they are requested, to enforce
        -- that each tx can be requested at most once.
        invariant :: Bool
invariant =
          (tx -> () -> Bool) -> Map txid tx -> Map txid () -> Bool
forall k a b.
Ord k =>
(a -> b -> Bool) -> Map k a -> Map k b -> Bool
Map.isSubmapOfBy
            (\tx
_ ()
_ -> Bool
True)
            Map txid tx
unackedMap
            ([(txid, ())] -> Map txid ()
forall k a. Ord k => [(k, a)] -> Map k a
Map.fromList [ (txid
x, ()) | txid
x <- StrictSeq txid -> [txid]
forall a. StrictSeq a -> [a]
forall (t :: * -> *) a. Foldable t => t a -> [a]
Foldable.toList StrictSeq txid
unackedSeq ])

        recvMsgRequestTxIds :: forall blocking.
                               TokBlockingStyle blocking
                            -> NumTxIdsToAck
                            -> NumTxIdsToReq
                            -> m (ClientStTxIds blocking txid tx m ())
        recvMsgRequestTxIds :: forall (blocking :: StBlockingStyle).
TokBlockingStyle blocking
-> NumTxIdsToAck
-> NumTxIdsToReq
-> m (ClientStTxIds blocking txid tx m ())
recvMsgRequestTxIds TokBlockingStyle blocking
blocking NumTxIdsToAck
ackNo NumTxIdsToReq
reqNo = do
          Tracer m (TraceEventClient txid tx)
-> TraceEventClient txid tx -> m ()
forall (m :: * -> *) a. Tracer m a -> a -> m ()
traceWith Tracer m (TraceEventClient txid tx)
tracer (StrictSeq txid
-> Map txid tx
-> [tx]
-> NumTxIdsToAck
-> NumTxIdsToReq
-> TraceEventClient txid tx
forall txid tx.
StrictSeq txid
-> Map txid tx
-> [tx]
-> NumTxIdsToAck
-> NumTxIdsToReq
-> TraceEventClient txid tx
EventRecvMsgRequestTxIds StrictSeq txid
unackedSeq Map txid tx
unackedMap
                                                     [tx]
remainingTxs NumTxIdsToAck
ackNo NumTxIdsToReq
reqNo)
          Bool -> m () -> m ()
forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
when (NumTxIdsToAck
ackNo NumTxIdsToAck -> NumTxIdsToAck -> Bool
forall a. Ord a => a -> a -> Bool
> Int -> NumTxIdsToAck
forall a b. (Integral a, Num b) => a -> b
fromIntegral (StrictSeq txid -> Int
forall a. StrictSeq a -> Int
Seq.length StrictSeq txid
unackedSeq)) (m () -> m ()) -> m () -> m ()
forall a b. (a -> b) -> a -> b
$
            String -> m ()
forall a. HasCallStack => String -> a
error (String -> m ()) -> String -> m ()
forall a b. (a -> b) -> a -> b
$ String
"txSubmissionClientConst.recvMsgRequestTxIds: "
                 String -> ShowS
forall a. [a] -> [a] -> [a]
++ String
"peer acknowledged more txids than possible"

          Bool -> m () -> m ()
forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
when (  Int -> Word16
forall a b. (Integral a, Num b) => a -> b
fromIntegral (StrictSeq txid -> Int
forall a. StrictSeq a -> Int
Seq.length StrictSeq txid
unackedSeq)
                Word16 -> Word16 -> Word16
forall a. Num a => a -> a -> a
- NumTxIdsToAck -> Word16
getNumTxIdsToAck NumTxIdsToAck
ackNo
                Word16 -> Word16 -> Word16
forall a. Num a => a -> a -> a
+ NumTxIdsToReq -> Word16
getNumTxIdsToReq NumTxIdsToReq
reqNo
                Word16 -> Word16 -> Bool
forall a. Ord a => a -> a -> Bool
> Word16
maxUnacked) (m () -> m ()) -> m () -> m ()
forall a b. (a -> b) -> a -> b
$
            String -> m ()
forall a. HasCallStack => String -> a
error (String -> m ()) -> String -> m ()
forall a b. (a -> b) -> a -> b
$ String
"txSubmissionClientConst.recvMsgRequestTxIds: "
                 String -> ShowS
forall a. [a] -> [a] -> [a]
++ String
"peer requested more txids than permitted"

          let unackedSeq' :: StrictSeq txid
unackedSeq' = Int -> StrictSeq txid -> StrictSeq txid
forall a. Int -> StrictSeq a -> StrictSeq a
Seq.drop (NumTxIdsToAck -> Int
forall a b. (Integral a, Num b) => a -> b
fromIntegral NumTxIdsToAck
ackNo) StrictSeq txid
unackedSeq
              unackedMap' :: Map txid tx
unackedMap' = (Map txid tx -> txid -> Map txid tx)
-> Map txid tx -> StrictSeq txid -> Map txid tx
forall b a. (b -> a -> b) -> b -> StrictSeq a -> b
forall (t :: * -> *) b a.
Foldable t =>
(b -> a -> b) -> b -> t a -> b
Foldable.foldl' ((txid -> Map txid tx -> Map txid tx)
-> Map txid tx -> txid -> Map txid tx
forall a b c. (a -> b -> c) -> b -> a -> c
flip txid -> Map txid tx -> Map txid tx
forall k a. Ord k => k -> Map k a -> Map k a
Map.delete) Map txid tx
unackedMap
                                   (Int -> StrictSeq txid -> StrictSeq txid
forall a. Int -> StrictSeq a -> StrictSeq a
Seq.take (NumTxIdsToAck -> Int
forall a b. (Integral a, Num b) => a -> b
fromIntegral NumTxIdsToAck
ackNo) StrictSeq txid
unackedSeq)

          case TokBlockingStyle blocking
blocking of
            TokBlockingStyle blocking
TokBlocking | Bool -> Bool
not (StrictSeq txid -> Bool
forall a. StrictSeq a -> Bool
Seq.null StrictSeq txid
unackedSeq')
              -> String -> m ()
forall a. HasCallStack => String -> a
error (String -> m ()) -> String -> m ()
forall a b. (a -> b) -> a -> b
$ String
"txSubmissionClientConst.recvMsgRequestTxIds: "
                      String -> ShowS
forall a. [a] -> [a] -> [a]
++ String
"peer made a blocking request for more txids when "
                      String -> ShowS
forall a. [a] -> [a] -> [a]
++ String
"there are still unacknowledged txids."
            TokBlockingStyle blocking
_ -> () -> m ()
forall a. a -> m a
forall (m :: * -> *) a. Monad m => a -> m a
return ()

          -- This example is eager, it always provides as many as asked for,
          -- up to the number remaining available.
          let unackedExtra :: [tx]
unackedExtra   = Int -> [tx] -> [tx]
forall a. Int -> [a] -> [a]
take (NumTxIdsToReq -> Int
forall a b. (Integral a, Num b) => a -> b
fromIntegral NumTxIdsToReq
reqNo) [tx]
remainingTxs
              unackedSeq'' :: StrictSeq txid
unackedSeq''   = StrictSeq txid
unackedSeq'
                            StrictSeq txid -> StrictSeq txid -> StrictSeq txid
forall a. Semigroup a => a -> a -> a
<> [txid] -> StrictSeq txid
forall a. [a] -> StrictSeq a
Seq.fromList ((tx -> txid) -> [tx] -> [txid]
forall a b. (a -> b) -> [a] -> [b]
map tx -> txid
txId [tx]
unackedExtra)
              unackedMap'' :: Map txid tx
unackedMap''   = Map txid tx
unackedMap'
                            Map txid tx -> Map txid tx -> Map txid tx
forall a. Semigroup a => a -> a -> a
<> [(txid, tx)] -> Map txid tx
forall k a. Ord k => [(k, a)] -> Map k a
Map.fromList [ (tx -> txid
txId tx
tx, tx
tx)
                                            | tx
tx <- [tx]
unackedExtra ]
              remainingTxs' :: [tx]
remainingTxs'  = Int -> [tx] -> [tx]
forall a. Int -> [a] -> [a]
drop (NumTxIdsToReq -> Int
forall a b. (Integral a, Num b) => a -> b
fromIntegral NumTxIdsToReq
reqNo) [tx]
remainingTxs
              txIdAndSize :: tx -> (txid, SizeInBytes)
txIdAndSize tx
tx = (tx -> txid
txId tx
tx, tx -> SizeInBytes
txSize tx
tx)

          ClientStTxIds blocking txid tx m ()
-> m (ClientStTxIds blocking txid tx m ())
forall a. a -> m a
forall (m :: * -> *) a. Monad m => a -> m a
return (ClientStTxIds blocking txid tx m ()
 -> m (ClientStTxIds blocking txid tx m ()))
-> ClientStTxIds blocking txid tx m ()
-> m (ClientStTxIds blocking txid tx m ())
forall a b. (a -> b) -> a -> b
$! case (TokBlockingStyle blocking
blocking, [tx]
unackedExtra) of
            (TokBlockingStyle blocking
TokBlocking, []) ->
              () -> ClientStTxIds 'StBlocking txid tx m ()
forall a txid tx (m :: * -> *).
a -> ClientStTxIds 'StBlocking txid tx m a
SendMsgDone ()

            (TokBlockingStyle blocking
TokBlocking, tx
tx:[tx]
txs) ->
              BlockingReplyList blocking (txid, SizeInBytes)
-> ClientStIdle txid tx m () -> ClientStTxIds blocking txid tx m ()
forall (blocking :: StBlockingStyle) txid tx (m :: * -> *) a.
BlockingReplyList blocking (txid, SizeInBytes)
-> ClientStIdle txid tx m a -> ClientStTxIds blocking txid tx m a
SendMsgReplyTxIds
                (NonEmpty (txid, SizeInBytes)
-> BlockingReplyList 'StBlocking (txid, SizeInBytes)
forall a. NonEmpty a -> BlockingReplyList 'StBlocking a
BlockingReply ((tx -> (txid, SizeInBytes))
-> NonEmpty tx -> NonEmpty (txid, SizeInBytes)
forall a b. (a -> b) -> NonEmpty a -> NonEmpty b
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
fmap tx -> (txid, SizeInBytes)
txIdAndSize (tx
tx tx -> [tx] -> NonEmpty tx
forall a. a -> [a] -> NonEmpty a
:| [tx]
txs)))
                (StrictSeq txid -> Map txid tx -> [tx] -> ClientStIdle txid tx m ()
client StrictSeq txid
unackedSeq'' Map txid tx
unackedMap'' [tx]
remainingTxs')

            (TokBlockingStyle blocking
TokNonBlocking, [tx]
txs) ->
              BlockingReplyList blocking (txid, SizeInBytes)
-> ClientStIdle txid tx m () -> ClientStTxIds blocking txid tx m ()
forall (blocking :: StBlockingStyle) txid tx (m :: * -> *) a.
BlockingReplyList blocking (txid, SizeInBytes)
-> ClientStIdle txid tx m a -> ClientStTxIds blocking txid tx m a
SendMsgReplyTxIds
                ([(txid, SizeInBytes)]
-> BlockingReplyList 'StNonBlocking (txid, SizeInBytes)
forall a. [a] -> BlockingReplyList 'StNonBlocking a
NonBlockingReply ((tx -> (txid, SizeInBytes)) -> [tx] -> [(txid, SizeInBytes)]
forall a b. (a -> b) -> [a] -> [b]
map tx -> (txid, SizeInBytes)
txIdAndSize [tx]
txs))
                (StrictSeq txid -> Map txid tx -> [tx] -> ClientStIdle txid tx m ()
client StrictSeq txid
unackedSeq'' Map txid tx
unackedMap'' [tx]
remainingTxs')

        recvMsgRequestTxs :: [txid]
                          -> m (ClientStTxs txid tx m ())
        recvMsgRequestTxs :: [txid] -> m (ClientStTxs txid tx m ())
recvMsgRequestTxs [txid]
txids = do
          Tracer m (TraceEventClient txid tx)
-> TraceEventClient txid tx -> m ()
forall (m :: * -> *) a. Tracer m a -> a -> m ()
traceWith Tracer m (TraceEventClient txid tx)
tracer (StrictSeq txid
-> Map txid tx -> [tx] -> [txid] -> TraceEventClient txid tx
forall txid tx.
StrictSeq txid
-> Map txid tx -> [tx] -> [txid] -> TraceEventClient txid tx
EventRecvMsgRequestTxs StrictSeq txid
unackedSeq Map txid tx
unackedMap
                                                   [tx]
remainingTxs [txid]
txids)
          case [ txid
txid | txid
txid <- [txid]
txids, txid
txid txid -> Map txid tx -> Bool
forall k a. Ord k => k -> Map k a -> Bool
`Map.notMember` Map txid tx
unackedMap ] of
            [] -> ClientStTxs txid tx m () -> m (ClientStTxs txid tx m ())
forall a. a -> m a
forall (f :: * -> *) a. Applicative f => a -> f a
pure ([tx] -> ClientStIdle txid tx m () -> ClientStTxs txid tx m ()
forall tx txid (m :: * -> *) a.
[tx] -> ClientStIdle txid tx m a -> ClientStTxs txid tx m a
SendMsgReplyTxs [tx]
txs ClientStIdle txid tx m ()
client')
              where
                txs :: [tx]
txs         = (txid -> tx) -> [txid] -> [tx]
forall a b. (a -> b) -> [a] -> [b]
map (Map txid tx
unackedMap Map txid tx -> txid -> tx
forall k a. Ord k => Map k a -> k -> a
Map.!) [txid]
txids
                client' :: ClientStIdle txid tx m ()
client'     = StrictSeq txid -> Map txid tx -> [tx] -> ClientStIdle txid tx m ()
client StrictSeq txid
unackedSeq Map txid tx
unackedMap' [tx]
remainingTxs
                unackedMap' :: Map txid tx
unackedMap' = (txid -> Map txid tx -> Map txid tx)
-> Map txid tx -> [txid] -> Map txid tx
forall a b. (a -> b -> b) -> b -> [a] -> b
forall (t :: * -> *) a b.
Foldable t =>
(a -> b -> b) -> b -> t a -> b
foldr txid -> Map txid tx -> Map txid tx
forall k a. Ord k => k -> Map k a -> Map k a
Map.delete Map txid tx
unackedMap [txid]
txids
                -- Here we remove from the map, while the seq stays unchanged.
                -- This enforces that each tx can be requested at most once.

            [txid]
missing -> String -> m (ClientStTxs txid tx m ())
forall a. HasCallStack => String -> a
error (String -> m (ClientStTxs txid tx m ()))
-> String -> m (ClientStTxs txid tx m ())
forall a b. (a -> b) -> a -> b
$ String
"txSubmissionClientConst.recvMsgRequestTxs: "
                            String -> ShowS
forall a. [a] -> [a] -> [a]
++ String
"requested missing TxIds: " String -> ShowS
forall a. [a] -> [a] -> [a]
++ [txid] -> String
forall a. Show a => a -> String
show [txid]
missing


--
-- Example server
--

data TraceEventServer txid tx =
     EventRequestTxIdsBlocking  (ServerState txid tx) NumTxIdsToAck NumTxIdsToReq
   | EventRequestTxIdsPipelined (ServerState txid tx) NumTxIdsToAck NumTxIdsToReq
   | EventRequestTxsPipelined   (ServerState txid tx) [txid]

deriving instance (Show txid, Show tx) => Show (TraceEventServer txid tx)

data ServerState txid tx = ServerState {
       -- | The number of transaction identifiers that we have requested but
       -- which have not yet been replied to. We need to track this it keep
       -- our requests within the limit on the number of unacknowledged txids.
       --
       forall txid tx. ServerState txid tx -> NumTxIdsToReq
requestedTxIdsInFlight :: NumTxIdsToReq,

       -- | Those transactions (by their identifier) that the client has told
       -- us about, and which we have not yet acknowledged. This is kept in
       -- the order in which the client gave them to us. This is the same order
       -- in which we submit them to the mempool (or for this example, the final
       -- result order). It is also the order we acknowledge in.
       forall txid tx. ServerState txid tx -> StrictSeq txid
unacknowledgedTxIds    :: StrictSeq txid,

       -- | Those transactions (by their identifier) that we can request. These
       -- are a subset of the 'unacknowledgedTxIds' that we have not yet
       -- requested. This is not ordered to illustrate the fact that we can
       -- request txs out of order. We also remember the sizes, though this
       -- example does not make use of the size information.
       forall txid tx. ServerState txid tx -> Map txid SizeInBytes
availableTxids         :: Map txid SizeInBytes,

       -- | Transactions we have successfully downloaded but have not yet added
       -- to the mempool or acknowledged. This is needed because we request
       -- transactions out of order but we must use the original order when
       -- adding to the mempool or acknowledging transactions.
       --
       forall txid tx. ServerState txid tx -> Map txid (Maybe tx)
bufferedTxs            :: Map txid (Maybe tx),

       -- | The number of transactions we can acknowledge on our next request
       -- for more transactions. The number here have already been removed from
       -- 'unacknowledgedTxIds'.
       --
       forall txid tx. ServerState txid tx -> NumTxIdsToAck
numTxsToAcknowledge    :: NumTxIdsToAck
     }
  deriving Int -> ServerState txid tx -> ShowS
[ServerState txid tx] -> ShowS
ServerState txid tx -> String
(Int -> ServerState txid tx -> ShowS)
-> (ServerState txid tx -> String)
-> ([ServerState txid tx] -> ShowS)
-> Show (ServerState txid tx)
forall a.
(Int -> a -> ShowS) -> (a -> String) -> ([a] -> ShowS) -> Show a
forall txid tx.
(Show txid, Show tx) =>
Int -> ServerState txid tx -> ShowS
forall txid tx.
(Show txid, Show tx) =>
[ServerState txid tx] -> ShowS
forall txid tx.
(Show txid, Show tx) =>
ServerState txid tx -> String
$cshowsPrec :: forall txid tx.
(Show txid, Show tx) =>
Int -> ServerState txid tx -> ShowS
showsPrec :: Int -> ServerState txid tx -> ShowS
$cshow :: forall txid tx.
(Show txid, Show tx) =>
ServerState txid tx -> String
show :: ServerState txid tx -> String
$cshowList :: forall txid tx.
(Show txid, Show tx) =>
[ServerState txid tx] -> ShowS
showList :: [ServerState txid tx] -> ShowS
Show

initialServerState :: ServerState txid tx
initialServerState :: forall txid tx. ServerState txid tx
initialServerState = NumTxIdsToReq
-> StrictSeq txid
-> Map txid SizeInBytes
-> Map txid (Maybe tx)
-> NumTxIdsToAck
-> ServerState txid tx
forall txid tx.
NumTxIdsToReq
-> StrictSeq txid
-> Map txid SizeInBytes
-> Map txid (Maybe tx)
-> NumTxIdsToAck
-> ServerState txid tx
ServerState NumTxIdsToReq
0 StrictSeq txid
forall a. StrictSeq a
Seq.empty Map txid SizeInBytes
forall k a. Map k a
Map.empty Map txid (Maybe tx)
forall k a. Map k a
Map.empty NumTxIdsToAck
0


-- | An example transaction submission server.
--
-- It collects and returns all the transactions that the client submits. This
-- is suitable for tests and using as a starting template for a full version.
--
-- Note that this example does not respect any overall byte limit on pipelining
-- and does not make any delta-Q info to optimises the pipelining decisions.
--
txSubmissionServer
  :: forall txid tx m.
     (Ord txid, Monad m)
  => Tracer m (TraceEventServer txid tx)
  -> (tx -> txid)
  -> Word16  -- ^ Maximum number of unacknowledged txids
  -> Word16  -- ^ Maximum number of txids to request in any one go
  -> Word16  -- ^ Maximum number of txs to request in any one go
  -> TxSubmissionServerPipelined txid tx m [tx]
txSubmissionServer :: forall txid tx (m :: * -> *).
(Ord txid, Monad m) =>
Tracer m (TraceEventServer txid tx)
-> (tx -> txid)
-> Word16
-> Word16
-> Word16
-> TxSubmissionServerPipelined txid tx m [tx]
txSubmissionServer Tracer m (TraceEventServer txid tx)
tracer tx -> txid
txId Word16
maxUnacked Word16
maxTxIdsToRequest Word16
maxTxToRequest =
    m (ServerStIdle 'Z txid tx m [tx])
-> TxSubmissionServerPipelined txid tx m [tx]
forall (m :: * -> *) txid tx a.
m (ServerStIdle 'Z txid tx m a)
-> TxSubmissionServerPipelined txid tx m a
TxSubmissionServerPipelined (ServerStIdle 'Z txid tx m [tx]
-> m (ServerStIdle 'Z txid tx m [tx])
forall a. a -> m a
forall (f :: * -> *) a. Applicative f => a -> f a
pure (ServerStIdle 'Z txid tx m [tx]
 -> m (ServerStIdle 'Z txid tx m [tx]))
-> ServerStIdle 'Z txid tx m [tx]
-> m (ServerStIdle 'Z txid tx m [tx])
forall a b. (a -> b) -> a -> b
$ [tx]
-> Nat 'Z -> ServerState txid tx -> ServerStIdle 'Z txid tx m [tx]
forall (n :: N).
[tx]
-> Nat n -> ServerState txid tx -> ServerStIdle n txid tx m [tx]
serverIdle [] Nat 'Z
forall (n :: N). ('Z ~ n) => Nat n
Zero ServerState txid tx
forall txid tx. ServerState txid tx
initialServerState)
  where
    serverIdle :: forall (n :: N).
                  [tx]
               -> Nat n
               -> ServerState txid tx
               -> ServerStIdle n txid tx m [tx]
    serverIdle :: forall (n :: N).
[tx]
-> Nat n -> ServerState txid tx -> ServerStIdle n txid tx m [tx]
serverIdle [tx]
accum Nat n
Zero ServerState txid tx
st
        -- There are no replies in flight, but we do know some more txs we can
        -- ask for, so lets ask for them and more txids.
      | ServerState txid tx -> Bool
forall k. ServerState k tx -> Bool
canRequestMoreTxs ServerState txid tx
st
      = [tx]
-> Nat n -> ServerState txid tx -> ServerStIdle n txid tx m [tx]
forall (n :: N).
[tx]
-> Nat n -> ServerState txid tx -> ServerStIdle n txid tx m [tx]
serverReqTxs [tx]
accum Nat n
forall (n :: N). ('Z ~ n) => Nat n
Zero ServerState txid tx
st

        -- There's no replies in flight, and we have no more txs we can ask for
        -- so the only remaining thing to do is to ask for more txids. Since
        -- this is the only thing to do now, we make this a blocking call.
      | Bool
otherwise
      , let numTxIdsToRequest :: NumTxIdsToReq
numTxIdsToRequest = Word16 -> NumTxIdsToReq
NumTxIdsToReq (Word16 -> NumTxIdsToReq) -> Word16 -> NumTxIdsToReq
forall a b. (a -> b) -> a -> b
$ Word16
maxTxIdsToRequest Word16 -> Word16 -> Word16
forall a. Ord a => a -> a -> a
`min` Word16
maxUnacked
      = Bool
-> ServerStIdle n txid tx m [tx] -> ServerStIdle n txid tx m [tx]
forall a. HasCallStack => Bool -> a -> a
assert (ServerState txid tx -> NumTxIdsToReq
forall txid tx. ServerState txid tx -> NumTxIdsToReq
requestedTxIdsInFlight ServerState txid tx
st NumTxIdsToReq -> NumTxIdsToReq -> Bool
forall a. Eq a => a -> a -> Bool
== NumTxIdsToReq
0
             Bool -> Bool -> Bool
&& StrictSeq txid -> Bool
forall a. StrictSeq a -> Bool
Seq.null (ServerState txid tx -> StrictSeq txid
forall txid tx. ServerState txid tx -> StrictSeq txid
unacknowledgedTxIds ServerState txid tx
st)
             Bool -> Bool -> Bool
&& Map txid SizeInBytes -> Bool
forall k a. Map k a -> Bool
Map.null (ServerState txid tx -> Map txid SizeInBytes
forall txid tx. ServerState txid tx -> Map txid SizeInBytes
availableTxids ServerState txid tx
st)
             Bool -> Bool -> Bool
&& Map txid (Maybe tx) -> Bool
forall k a. Map k a -> Bool
Map.null (ServerState txid tx -> Map txid (Maybe tx)
forall txid tx. ServerState txid tx -> Map txid (Maybe tx)
bufferedTxs ServerState txid tx
st)) (ServerStIdle n txid tx m [tx] -> ServerStIdle n txid tx m [tx])
-> ServerStIdle n txid tx m [tx] -> ServerStIdle n txid tx m [tx]
forall a b. (a -> b) -> a -> b
$
        NumTxIdsToAck
-> NumTxIdsToReq
-> m [tx]
-> (NonEmpty (txid, SizeInBytes)
    -> m (ServerStIdle 'Z txid tx m [tx]))
-> ServerStIdle 'Z txid tx m [tx]
forall (m :: * -> *) a txid tx.
NumTxIdsToAck
-> NumTxIdsToReq
-> m a
-> (NonEmpty (txid, SizeInBytes)
    -> m (ServerStIdle 'Z txid tx m a))
-> ServerStIdle 'Z txid tx m a
SendMsgRequestTxIdsBlocking
          (ServerState txid tx -> NumTxIdsToAck
forall txid tx. ServerState txid tx -> NumTxIdsToAck
numTxsToAcknowledge ServerState txid tx
st)
          NumTxIdsToReq
numTxIdsToRequest
          ([tx] -> m [tx]
forall a. a -> m a
forall (f :: * -> *) a. Applicative f => a -> f a
pure [tx]
accum)               -- result if the client reports we're done
          (\NonEmpty (txid, SizeInBytes)
txids -> do
              Tracer m (TraceEventServer txid tx)
-> TraceEventServer txid tx -> m ()
forall (m :: * -> *) a. Tracer m a -> a -> m ()
traceWith Tracer m (TraceEventServer txid tx)
tracer (ServerState txid tx
-> NumTxIdsToAck -> NumTxIdsToReq -> TraceEventServer txid tx
forall txid tx.
ServerState txid tx
-> NumTxIdsToAck -> NumTxIdsToReq -> TraceEventServer txid tx
EventRequestTxIdsBlocking ServerState txid tx
st (ServerState txid tx -> NumTxIdsToAck
forall txid tx. ServerState txid tx -> NumTxIdsToAck
numTxsToAcknowledge ServerState txid tx
st) NumTxIdsToReq
numTxIdsToRequest)
              [tx]
-> Nat 'Z
-> ServerState txid tx
-> Collect txid tx
-> m (ServerStIdle 'Z txid tx m [tx])
forall (n :: N).
[tx]
-> Nat n
-> ServerState txid tx
-> Collect txid tx
-> m (ServerStIdle n txid tx m [tx])
handleReply [tx]
accum Nat 'Z
forall (n :: N). ('Z ~ n) => Nat n
Zero ServerState txid tx
st {
                 numTxsToAcknowledge    = 0,
                 requestedTxIdsInFlight = numTxIdsToRequest
               }
               (Collect txid tx -> m (ServerStIdle 'Z txid tx m [tx]))
-> (NonEmpty (txid, SizeInBytes) -> Collect txid tx)
-> NonEmpty (txid, SizeInBytes)
-> m (ServerStIdle 'Z txid tx m [tx])
forall b c a. (b -> c) -> (a -> b) -> a -> c
. NumTxIdsToReq -> [(txid, SizeInBytes)] -> Collect txid tx
forall txid tx.
NumTxIdsToReq -> [(txid, SizeInBytes)] -> Collect txid tx
CollectTxIds NumTxIdsToReq
numTxIdsToRequest
               ([(txid, SizeInBytes)] -> Collect txid tx)
-> (NonEmpty (txid, SizeInBytes) -> [(txid, SizeInBytes)])
-> NonEmpty (txid, SizeInBytes)
-> Collect txid tx
forall b c a. (b -> c) -> (a -> b) -> a -> c
. NonEmpty (txid, SizeInBytes) -> [(txid, SizeInBytes)]
forall a. NonEmpty a -> [a]
NonEmpty.toList (NonEmpty (txid, SizeInBytes)
 -> m (ServerStIdle 'Z txid tx m [tx]))
-> NonEmpty (txid, SizeInBytes)
-> m (ServerStIdle 'Z txid tx m [tx])
forall a b. (a -> b) -> a -> b
$ NonEmpty (txid, SizeInBytes)
txids)

    serverIdle [tx]
accum (Succ Nat n
n) ServerState txid tx
st
        -- We have replies in flight and we should eagerly collect them if
        -- available, but there are transactions to request too so we should
        -- not block waiting for replies.
        --
        -- Having requested more transactions, we opportunistically ask for
        -- more txids in a non-blocking way. This is how we pipeline asking for
        -- both txs and txids.
        --
        -- It's important not to pipeline more requests for txids when we have
        -- no txs to ask for, since (with no other guard) this will put us into
        -- a busy-polling loop.
        --
      | ServerState txid tx -> Bool
forall k. ServerState k tx -> Bool
canRequestMoreTxs ServerState txid tx
st
      = Maybe (ServerStIdle ('S n) txid tx m [tx])
-> (Collect txid tx -> m (ServerStIdle n txid tx m [tx]))
-> ServerStIdle ('S n) txid tx m [tx]
forall (n1 :: N) txid tx (m :: * -> *) a.
Maybe (ServerStIdle ('S n1) txid tx m a)
-> (Collect txid tx -> m (ServerStIdle n1 txid tx m a))
-> ServerStIdle ('S n1) txid tx m a
CollectPipelined
          (ServerStIdle ('S n) txid tx m [tx]
-> Maybe (ServerStIdle ('S n) txid tx m [tx])
forall a. a -> Maybe a
Just ([tx]
-> Nat ('S n)
-> ServerState txid tx
-> ServerStIdle ('S n) txid tx m [tx]
forall (n :: N).
[tx]
-> Nat n -> ServerState txid tx -> ServerStIdle n txid tx m [tx]
serverReqTxs [tx]
accum (Nat n -> Nat ('S n)
forall (m :: N) (n :: N). (m ~ 'S n) => Nat n -> Nat m
Succ Nat n
n) ServerState txid tx
st))
          ([tx]
-> Nat n
-> ServerState txid tx
-> Collect txid tx
-> m (ServerStIdle n txid tx m [tx])
forall (n :: N).
[tx]
-> Nat n
-> ServerState txid tx
-> Collect txid tx
-> m (ServerStIdle n txid tx m [tx])
handleReply [tx]
accum Nat n
n ServerState txid tx
st)

        -- In this case there is nothing else to do so we block until we
        -- collect a reply.
      | Bool
otherwise
      = Maybe (ServerStIdle ('S n) txid tx m [tx])
-> (Collect txid tx -> m (ServerStIdle n txid tx m [tx]))
-> ServerStIdle ('S n) txid tx m [tx]
forall (n1 :: N) txid tx (m :: * -> *) a.
Maybe (ServerStIdle ('S n1) txid tx m a)
-> (Collect txid tx -> m (ServerStIdle n1 txid tx m a))
-> ServerStIdle ('S n1) txid tx m a
CollectPipelined
          Maybe (ServerStIdle ('S n) txid tx m [tx])
forall a. Maybe a
Nothing
          ([tx]
-> Nat n
-> ServerState txid tx
-> Collect txid tx
-> m (ServerStIdle n txid tx m [tx])
forall (n :: N).
[tx]
-> Nat n
-> ServerState txid tx
-> Collect txid tx
-> m (ServerStIdle n txid tx m [tx])
handleReply [tx]
accum Nat n
n ServerState txid tx
st)

    canRequestMoreTxs :: ServerState k tx -> Bool
    canRequestMoreTxs :: forall k. ServerState k tx -> Bool
canRequestMoreTxs ServerState k tx
st =
        Bool -> Bool
not (Map k SizeInBytes -> Bool
forall k a. Map k a -> Bool
Map.null (ServerState k tx -> Map k SizeInBytes
forall txid tx. ServerState txid tx -> Map txid SizeInBytes
availableTxids ServerState k tx
st))

    handleReply :: forall (n :: N).
                   [tx]
                -> Nat n
                -> ServerState txid tx
                -> Collect txid tx
                -> m (ServerStIdle n txid tx m [tx])
    handleReply :: forall (n :: N).
[tx]
-> Nat n
-> ServerState txid tx
-> Collect txid tx
-> m (ServerStIdle n txid tx m [tx])
handleReply [tx]
accum Nat n
n ServerState txid tx
st (CollectTxIds NumTxIdsToReq
reqNo [(txid, SizeInBytes)]
txids) =
      -- Upon receiving a batch of new txids we extend our available set,
      -- and extended the unacknowledged sequence.
      ServerStIdle n txid tx m [tx] -> m (ServerStIdle n txid tx m [tx])
forall a. a -> m a
forall (m :: * -> *) a. Monad m => a -> m a
return (ServerStIdle n txid tx m [tx]
 -> m (ServerStIdle n txid tx m [tx]))
-> ServerStIdle n txid tx m [tx]
-> m (ServerStIdle n txid tx m [tx])
forall a b. (a -> b) -> a -> b
$ [tx]
-> Nat n -> ServerState txid tx -> ServerStIdle n txid tx m [tx]
forall (n :: N).
[tx]
-> Nat n -> ServerState txid tx -> ServerStIdle n txid tx m [tx]
serverIdle [tx]
accum Nat n
n ServerState txid tx
st {
        requestedTxIdsInFlight = requestedTxIdsInFlight st - reqNo,
        unacknowledgedTxIds    = unacknowledgedTxIds st
                              <> Seq.fromList (map fst txids),
        availableTxids         = availableTxids st
                              <> Map.fromList txids
      }

    handleReply [tx]
accum Nat n
n ServerState txid tx
st (CollectTxs [txid]
txids [tx]
txs) =
      -- When we receive a batch of transactions, in general we get a subset of
      -- those that we asked for, with the remainder now deemed unnecessary.
      -- But we still have to acknowledge the txids we were given. This combined
      -- with the fact that we request txs out of order means our bufferedTxs
      -- has to track all the txids we asked for, even though not all have
      -- replies.
      --
      -- We have to update the unacknowledgedTxIds here eagerly and not delay it
      -- to serverReqTxs, otherwise we could end up blocking in serverIdle on
      -- more pipelined results rather than being able to move on.
      ServerStIdle n txid tx m [tx] -> m (ServerStIdle n txid tx m [tx])
forall a. a -> m a
forall (m :: * -> *) a. Monad m => a -> m a
return (ServerStIdle n txid tx m [tx]
 -> m (ServerStIdle n txid tx m [tx]))
-> ServerStIdle n txid tx m [tx]
-> m (ServerStIdle n txid tx m [tx])
forall a b. (a -> b) -> a -> b
$ [tx]
-> Nat n -> ServerState txid tx -> ServerStIdle n txid tx m [tx]
forall (n :: N).
[tx]
-> Nat n -> ServerState txid tx -> ServerStIdle n txid tx m [tx]
serverIdle [tx]
accum' Nat n
n ServerState txid tx
st {
        bufferedTxs         = bufferedTxs'',
        unacknowledgedTxIds = unacknowledgedTxIds',
        numTxsToAcknowledge = numTxsToAcknowledge st
                            + fromIntegral (Seq.length acknowledgedTxIds)
      }
      where
        txIdsRequestedWithTxsReceived :: [(txid, Maybe tx)]
        txIdsRequestedWithTxsReceived :: [(txid, Maybe tx)]
txIdsRequestedWithTxsReceived =
          [ (txid
txid, Maybe tx
mbTx)
          | let txsMap :: Map txid tx
                txsMap :: Map txid tx
txsMap = [(txid, tx)] -> Map txid tx
forall k a. Ord k => [(k, a)] -> Map k a
Map.fromList [ (tx -> txid
txId tx
tx, tx
tx) | tx
tx <- [tx]
txs ]
          , txid
txid <- [txid]
txids
          , let !mbTx :: Maybe tx
mbTx = txid -> Map txid tx -> Maybe tx
forall k a. Ord k => k -> Map k a -> Maybe a
Map.lookup txid
txid Map txid tx
txsMap
          ]

        bufferedTxs' :: Map txid (Maybe tx)
bufferedTxs'  = ServerState txid tx -> Map txid (Maybe tx)
forall txid tx. ServerState txid tx -> Map txid (Maybe tx)
bufferedTxs ServerState txid tx
st
                     Map txid (Maybe tx) -> Map txid (Maybe tx) -> Map txid (Maybe tx)
forall a. Semigroup a => a -> a -> a
<> [(txid, Maybe tx)] -> Map txid (Maybe tx)
forall k a. Ord k => [(k, a)] -> Map k a
Map.fromList [(txid, Maybe tx)]
txIdsRequestedWithTxsReceived

        -- Check if having received more txs we can now confirm any (in strict
        -- order in the unacknowledgedTxIds sequence).
        (StrictSeq txid
acknowledgedTxIds, StrictSeq txid
unacknowledgedTxIds') =
          (txid -> Bool)
-> StrictSeq txid -> (StrictSeq txid, StrictSeq txid)
forall a. (a -> Bool) -> StrictSeq a -> (StrictSeq a, StrictSeq a)
Seq.spanl (txid -> Map txid (Maybe tx) -> Bool
forall k a. Ord k => k -> Map k a -> Bool
`Map.member` Map txid (Maybe tx)
bufferedTxs') (ServerState txid tx -> StrictSeq txid
forall txid tx. ServerState txid tx -> StrictSeq txid
unacknowledgedTxIds ServerState txid tx
st)

        -- If so we can add the acknowledged txs to our accumulating result
        accum' :: [tx]
accum' = [tx]
accum
              [tx] -> [tx] -> [tx]
forall a. [a] -> [a] -> [a]
++ (txid -> [tx] -> [tx]) -> [tx] -> StrictSeq txid -> [tx]
forall a b. (a -> b -> b) -> b -> StrictSeq a -> b
forall (t :: * -> *) a b.
Foldable t =>
(a -> b -> b) -> b -> t a -> b
foldr (\txid
txid [tx]
r -> [tx] -> (tx -> [tx]) -> Maybe tx -> [tx]
forall b a. b -> (a -> b) -> Maybe a -> b
maybe [tx]
r (tx -> [tx] -> [tx]
forall a. a -> [a] -> [a]
:[tx]
r) (Map txid (Maybe tx)
bufferedTxs' Map txid (Maybe tx) -> txid -> Maybe tx
forall k a. Ord k => Map k a -> k -> a
Map.! txid
txid)) []
                       StrictSeq txid
acknowledgedTxIds

        -- And remove acknowledged txs from our buffer
        bufferedTxs'' :: Map txid (Maybe tx)
bufferedTxs'' = (Map txid (Maybe tx) -> txid -> Map txid (Maybe tx))
-> Map txid (Maybe tx) -> StrictSeq txid -> Map txid (Maybe tx)
forall b a. (b -> a -> b) -> b -> StrictSeq a -> b
forall (t :: * -> *) b a.
Foldable t =>
(b -> a -> b) -> b -> t a -> b
Foldable.foldl' ((txid -> Map txid (Maybe tx) -> Map txid (Maybe tx))
-> Map txid (Maybe tx) -> txid -> Map txid (Maybe tx)
forall a b c. (a -> b -> c) -> b -> a -> c
flip txid -> Map txid (Maybe tx) -> Map txid (Maybe tx)
forall k a. Ord k => k -> Map k a -> Map k a
Map.delete) Map txid (Maybe tx)
bufferedTxs' StrictSeq txid
acknowledgedTxIds


    serverReqTxs :: forall (n :: N).
                    [tx]
                 -> Nat n
                 -> ServerState txid tx
                 -> ServerStIdle n txid tx m [tx]
    serverReqTxs :: forall (n :: N).
[tx]
-> Nat n -> ServerState txid tx -> ServerStIdle n txid tx m [tx]
serverReqTxs [tx]
accum Nat n
n ServerState txid tx
st =
        [txid]
-> m (ServerStIdle ('S n) txid tx m [tx])
-> ServerStIdle n txid tx m [tx]
forall txid (m :: * -> *) (n :: N) tx a.
[txid]
-> m (ServerStIdle ('S n) txid tx m a)
-> ServerStIdle n txid tx m a
SendMsgRequestTxsPipelined
          (Map txid SizeInBytes -> [txid]
forall k a. Map k a -> [k]
Map.keys Map txid SizeInBytes
txsToRequest)
          (do Tracer m (TraceEventServer txid tx)
-> TraceEventServer txid tx -> m ()
forall (m :: * -> *) a. Tracer m a -> a -> m ()
traceWith Tracer m (TraceEventServer txid tx)
tracer (ServerState txid tx -> [txid] -> TraceEventServer txid tx
forall txid tx.
ServerState txid tx -> [txid] -> TraceEventServer txid tx
EventRequestTxsPipelined ServerState txid tx
st (Map txid SizeInBytes -> [txid]
forall k a. Map k a -> [k]
Map.keys Map txid SizeInBytes
txsToRequest))
              ServerStIdle ('S n) txid tx m [tx]
-> m (ServerStIdle ('S n) txid tx m [tx])
forall a. a -> m a
forall (f :: * -> *) a. Applicative f => a -> f a
pure (ServerStIdle ('S n) txid tx m [tx]
 -> m (ServerStIdle ('S n) txid tx m [tx]))
-> ServerStIdle ('S n) txid tx m [tx]
-> m (ServerStIdle ('S n) txid tx m [tx])
forall a b. (a -> b) -> a -> b
$ [tx]
-> Nat ('S n)
-> ServerState txid tx
-> ServerStIdle ('S n) txid tx m [tx]
forall (n :: N).
[tx]
-> Nat n -> ServerState txid tx -> ServerStIdle n txid tx m [tx]
serverReqTxIds [tx]
accum (Nat n -> Nat ('S n)
forall (m :: N) (n :: N). (m ~ 'S n) => Nat n -> Nat m
Succ Nat n
n) ServerState txid tx
st {
                availableTxids = availableTxids'
              })
      where
        -- This implementation is deliberately naive, we pick in an arbitrary
        -- order and up to a fixed limit. The real thing should take account of
        -- the expected transaction sizes, to pipeline well and keep within
        -- pipelining byte limits.
        (Map txid SizeInBytes
txsToRequest, Map txid SizeInBytes
availableTxids') =
          Int
-> Map txid SizeInBytes
-> (Map txid SizeInBytes, Map txid SizeInBytes)
forall k a. Int -> Map k a -> (Map k a, Map k a)
Map.splitAt (Word16 -> Int
forall a b. (Integral a, Num b) => a -> b
fromIntegral Word16
maxTxToRequest) (ServerState txid tx -> Map txid SizeInBytes
forall txid tx. ServerState txid tx -> Map txid SizeInBytes
availableTxids ServerState txid tx
st)

    serverReqTxIds :: forall (n :: N).
                      [tx]
                   -> Nat n
                   -> ServerState txid tx
                   -> ServerStIdle n txid tx m [tx]
    serverReqTxIds :: forall (n :: N).
[tx]
-> Nat n -> ServerState txid tx -> ServerStIdle n txid tx m [tx]
serverReqTxIds [tx]
accum Nat n
n ServerState txid tx
st
      | NumTxIdsToReq
numTxIdsToRequest NumTxIdsToReq -> NumTxIdsToReq -> Bool
forall a. Ord a => a -> a -> Bool
> NumTxIdsToReq
0
      = NumTxIdsToAck
-> NumTxIdsToReq
-> m (ServerStIdle ('S n) txid tx m [tx])
-> ServerStIdle n txid tx m [tx]
forall (m :: * -> *) (n :: N) txid tx a.
NumTxIdsToAck
-> NumTxIdsToReq
-> m (ServerStIdle ('S n) txid tx m a)
-> ServerStIdle n txid tx m a
SendMsgRequestTxIdsPipelined
          (ServerState txid tx -> NumTxIdsToAck
forall txid tx. ServerState txid tx -> NumTxIdsToAck
numTxsToAcknowledge ServerState txid tx
st)
          NumTxIdsToReq
numTxIdsToRequest
          (do Tracer m (TraceEventServer txid tx)
-> TraceEventServer txid tx -> m ()
forall (m :: * -> *) a. Tracer m a -> a -> m ()
traceWith Tracer m (TraceEventServer txid tx)
tracer (ServerState txid tx
-> NumTxIdsToAck -> NumTxIdsToReq -> TraceEventServer txid tx
forall txid tx.
ServerState txid tx
-> NumTxIdsToAck -> NumTxIdsToReq -> TraceEventServer txid tx
EventRequestTxIdsPipelined ServerState txid tx
st (ServerState txid tx -> NumTxIdsToAck
forall txid tx. ServerState txid tx -> NumTxIdsToAck
numTxsToAcknowledge ServerState txid tx
st) NumTxIdsToReq
numTxIdsToRequest)
              ServerStIdle ('S n) txid tx m [tx]
-> m (ServerStIdle ('S n) txid tx m [tx])
forall a. a -> m a
forall (f :: * -> *) a. Applicative f => a -> f a
pure (ServerStIdle ('S n) txid tx m [tx]
 -> m (ServerStIdle ('S n) txid tx m [tx]))
-> ServerStIdle ('S n) txid tx m [tx]
-> m (ServerStIdle ('S n) txid tx m [tx])
forall a b. (a -> b) -> a -> b
$ [tx]
-> Nat ('S n)
-> ServerState txid tx
-> ServerStIdle ('S n) txid tx m [tx]
forall (n :: N).
[tx]
-> Nat n -> ServerState txid tx -> ServerStIdle n txid tx m [tx]
serverIdle [tx]
accum (Nat n -> Nat ('S n)
forall (m :: N) (n :: N). (m ~ 'S n) => Nat n -> Nat m
Succ Nat n
n) ServerState txid tx
st {
                requestedTxIdsInFlight = requestedTxIdsInFlight st
                                       + numTxIdsToRequest,
                numTxsToAcknowledge    = 0
              })

      | Bool
otherwise
      = [tx]
-> Nat n -> ServerState txid tx -> ServerStIdle n txid tx m [tx]
forall (n :: N).
[tx]
-> Nat n -> ServerState txid tx -> ServerStIdle n txid tx m [tx]
serverIdle [tx]
accum Nat n
n ServerState txid tx
st
      where
        -- This definition is justified by the fact that the
        -- 'numTxsToAcknowledge' are not included in the 'unacknowledgedTxIds'.
        numTxIdsToRequest :: NumTxIdsToReq
numTxIdsToRequest =
          Word16 -> NumTxIdsToReq
NumTxIdsToReq (Word16 -> NumTxIdsToReq) -> Word16 -> NumTxIdsToReq
forall a b. (a -> b) -> a -> b
$
                (Word16
maxUnacked
                  Word16 -> Word16 -> Word16
forall a. Num a => a -> a -> a
- Int -> Word16
forall a b. (Integral a, Num b) => a -> b
fromIntegral (StrictSeq txid -> Int
forall a. StrictSeq a -> Int
Seq.length (ServerState txid tx -> StrictSeq txid
forall txid tx. ServerState txid tx -> StrictSeq txid
unacknowledgedTxIds ServerState txid tx
st))
                  Word16 -> Word16 -> Word16
forall a. Num a => a -> a -> a
- NumTxIdsToReq -> Word16
getNumTxIdsToReq (ServerState txid tx -> NumTxIdsToReq
forall txid tx. ServerState txid tx -> NumTxIdsToReq
requestedTxIdsInFlight ServerState txid tx
st))
          Word16 -> Word16 -> Word16
forall a. Ord a => a -> a -> a
`min` Word16
maxTxIdsToRequest