{-# LANGUAGE BangPatterns #-}
{-# LANGUAGE CPP #-}
{-# LANGUAGE DataKinds #-}
{-# LANGUAGE DeriveGeneric #-}
{-# LANGUAGE GADTs #-}
{-# LANGUAGE KindSignatures #-}
{-# LANGUAGE NamedFieldPuns #-}
{-# LANGUAGE ScopedTypeVariables #-}
{-# OPTIONS_GHC -Wno-partial-fields #-}
module Ouroboros.Network.TxSubmission.Inbound
( txSubmissionInbound
, TxSubmissionMempoolWriter (..)
, TraceTxSubmissionInbound (..)
, TxSubmissionProtocolError (..)
, ProcessedTxCount (..)
) where
import Data.Foldable as Foldable (foldl', toList)
import Data.List.NonEmpty qualified as NonEmpty
import Data.Map.Strict (Map)
import Data.Map.Strict qualified as Map
import Data.Maybe
import Data.Sequence.Strict (StrictSeq)
import Data.Sequence.Strict qualified as Seq
import Data.Set qualified as Set
import Data.Word (Word16)
import GHC.Generics (Generic)
import NoThunks.Class (NoThunks (..), unsafeNoThunks)
import Cardano.Prelude (forceElemsToWHNF)
import Control.Concurrent.Class.MonadSTM.Strict.TVar.Checked
import Control.Exception (assert)
import Control.Monad (unless)
import Control.Monad.Class.MonadSTM
import Control.Monad.Class.MonadThrow
import Control.Monad.Class.MonadTimer.SI
import Control.Tracer (Tracer, traceWith)
import Network.TypedProtocol.Core (N, Nat (..), natToInt)
import Ouroboros.Network.NodeToNode.Version (NodeToNodeVersion)
import Ouroboros.Network.Protocol.Limits
import Ouroboros.Network.Protocol.TxSubmission2.Server
import Ouroboros.Network.Protocol.TxSubmission2.Type
import Ouroboros.Network.TxSubmission.Mempool.Reader (MempoolSnapshot (..),
TxSubmissionMempoolReader (..))
data TxSubmissionMempoolWriter txid tx idx m =
TxSubmissionMempoolWriter {
forall txid tx idx (m :: * -> *).
TxSubmissionMempoolWriter txid tx idx m -> tx -> txid
txId :: tx -> txid,
forall txid tx idx (m :: * -> *).
TxSubmissionMempoolWriter txid tx idx m -> [tx] -> m [txid]
mempoolAddTxs :: [tx] -> m [txid]
}
data ProcessedTxCount = ProcessedTxCount {
ProcessedTxCount -> Int
ptxcAccepted :: Int
, ProcessedTxCount -> Int
ptxcRejected :: Int
}
deriving (ProcessedTxCount -> ProcessedTxCount -> Bool
(ProcessedTxCount -> ProcessedTxCount -> Bool)
-> (ProcessedTxCount -> ProcessedTxCount -> Bool)
-> Eq ProcessedTxCount
forall a. (a -> a -> Bool) -> (a -> a -> Bool) -> Eq a
$c== :: ProcessedTxCount -> ProcessedTxCount -> Bool
== :: ProcessedTxCount -> ProcessedTxCount -> Bool
$c/= :: ProcessedTxCount -> ProcessedTxCount -> Bool
/= :: ProcessedTxCount -> ProcessedTxCount -> Bool
Eq, Int -> ProcessedTxCount -> ShowS
[ProcessedTxCount] -> ShowS
ProcessedTxCount -> String
(Int -> ProcessedTxCount -> ShowS)
-> (ProcessedTxCount -> String)
-> ([ProcessedTxCount] -> ShowS)
-> Show ProcessedTxCount
forall a.
(Int -> a -> ShowS) -> (a -> String) -> ([a] -> ShowS) -> Show a
$cshowsPrec :: Int -> ProcessedTxCount -> ShowS
showsPrec :: Int -> ProcessedTxCount -> ShowS
$cshow :: ProcessedTxCount -> String
show :: ProcessedTxCount -> String
$cshowList :: [ProcessedTxCount] -> ShowS
showList :: [ProcessedTxCount] -> ShowS
Show)
data TraceTxSubmissionInbound txid tx =
TraceTxSubmissionCollected Int
| TraceTxSubmissionProcessed ProcessedTxCount
| TraceTxInboundTerminated
| TraceTxInboundCanRequestMoreTxs Int
| TraceTxInboundCannotRequestMoreTxs Int
deriving (TraceTxSubmissionInbound txid tx
-> TraceTxSubmissionInbound txid tx -> Bool
(TraceTxSubmissionInbound txid tx
-> TraceTxSubmissionInbound txid tx -> Bool)
-> (TraceTxSubmissionInbound txid tx
-> TraceTxSubmissionInbound txid tx -> Bool)
-> Eq (TraceTxSubmissionInbound txid tx)
forall a. (a -> a -> Bool) -> (a -> a -> Bool) -> Eq a
forall txid tx.
TraceTxSubmissionInbound txid tx
-> TraceTxSubmissionInbound txid tx -> Bool
$c== :: forall txid tx.
TraceTxSubmissionInbound txid tx
-> TraceTxSubmissionInbound txid tx -> Bool
== :: TraceTxSubmissionInbound txid tx
-> TraceTxSubmissionInbound txid tx -> Bool
$c/= :: forall txid tx.
TraceTxSubmissionInbound txid tx
-> TraceTxSubmissionInbound txid tx -> Bool
/= :: TraceTxSubmissionInbound txid tx
-> TraceTxSubmissionInbound txid tx -> Bool
Eq, Int -> TraceTxSubmissionInbound txid tx -> ShowS
[TraceTxSubmissionInbound txid tx] -> ShowS
TraceTxSubmissionInbound txid tx -> String
(Int -> TraceTxSubmissionInbound txid tx -> ShowS)
-> (TraceTxSubmissionInbound txid tx -> String)
-> ([TraceTxSubmissionInbound txid tx] -> ShowS)
-> Show (TraceTxSubmissionInbound txid tx)
forall a.
(Int -> a -> ShowS) -> (a -> String) -> ([a] -> ShowS) -> Show a
forall txid tx. Int -> TraceTxSubmissionInbound txid tx -> ShowS
forall txid tx. [TraceTxSubmissionInbound txid tx] -> ShowS
forall txid tx. TraceTxSubmissionInbound txid tx -> String
$cshowsPrec :: forall txid tx. Int -> TraceTxSubmissionInbound txid tx -> ShowS
showsPrec :: Int -> TraceTxSubmissionInbound txid tx -> ShowS
$cshow :: forall txid tx. TraceTxSubmissionInbound txid tx -> String
show :: TraceTxSubmissionInbound txid tx -> String
$cshowList :: forall txid tx. [TraceTxSubmissionInbound txid tx] -> ShowS
showList :: [TraceTxSubmissionInbound txid tx] -> ShowS
Show)
data TxSubmissionProtocolError =
ProtocolErrorTxNotRequested
| ProtocolErrorTxIdsNotRequested
deriving Int -> TxSubmissionProtocolError -> ShowS
[TxSubmissionProtocolError] -> ShowS
TxSubmissionProtocolError -> String
(Int -> TxSubmissionProtocolError -> ShowS)
-> (TxSubmissionProtocolError -> String)
-> ([TxSubmissionProtocolError] -> ShowS)
-> Show TxSubmissionProtocolError
forall a.
(Int -> a -> ShowS) -> (a -> String) -> ([a] -> ShowS) -> Show a
$cshowsPrec :: Int -> TxSubmissionProtocolError -> ShowS
showsPrec :: Int -> TxSubmissionProtocolError -> ShowS
$cshow :: TxSubmissionProtocolError -> String
show :: TxSubmissionProtocolError -> String
$cshowList :: [TxSubmissionProtocolError] -> ShowS
showList :: [TxSubmissionProtocolError] -> ShowS
Show
instance Exception TxSubmissionProtocolError where
displayException :: TxSubmissionProtocolError -> String
displayException TxSubmissionProtocolError
ProtocolErrorTxNotRequested =
String
"The peer replied with a transaction we did not ask for."
displayException TxSubmissionProtocolError
ProtocolErrorTxIdsNotRequested =
String
"The peer replied with more txids than we asked for."
data ServerState txid tx = ServerState {
forall txid tx. ServerState txid tx -> Word16
requestedTxIdsInFlight :: !Word16,
forall txid tx. ServerState txid tx -> StrictSeq txid
unacknowledgedTxIds :: !(StrictSeq txid),
forall txid tx. ServerState txid tx -> Map txid SizeInBytes
availableTxids :: !(Map txid SizeInBytes),
forall txid tx. ServerState txid tx -> Map txid (Maybe tx)
bufferedTxs :: !(Map txid (Maybe tx)),
forall txid tx. ServerState txid tx -> Word16
numTxsToAcknowledge :: !Word16
}
deriving (Int -> ServerState txid tx -> ShowS
[ServerState txid tx] -> ShowS
ServerState txid tx -> String
(Int -> ServerState txid tx -> ShowS)
-> (ServerState txid tx -> String)
-> ([ServerState txid tx] -> ShowS)
-> Show (ServerState txid tx)
forall a.
(Int -> a -> ShowS) -> (a -> String) -> ([a] -> ShowS) -> Show a
forall txid tx.
(Show txid, Show tx) =>
Int -> ServerState txid tx -> ShowS
forall txid tx.
(Show txid, Show tx) =>
[ServerState txid tx] -> ShowS
forall txid tx.
(Show txid, Show tx) =>
ServerState txid tx -> String
$cshowsPrec :: forall txid tx.
(Show txid, Show tx) =>
Int -> ServerState txid tx -> ShowS
showsPrec :: Int -> ServerState txid tx -> ShowS
$cshow :: forall txid tx.
(Show txid, Show tx) =>
ServerState txid tx -> String
show :: ServerState txid tx -> String
$cshowList :: forall txid tx.
(Show txid, Show tx) =>
[ServerState txid tx] -> ShowS
showList :: [ServerState txid tx] -> ShowS
Show, (forall x. ServerState txid tx -> Rep (ServerState txid tx) x)
-> (forall x. Rep (ServerState txid tx) x -> ServerState txid tx)
-> Generic (ServerState txid tx)
forall x. Rep (ServerState txid tx) x -> ServerState txid tx
forall x. ServerState txid tx -> Rep (ServerState txid tx) x
forall a.
(forall x. a -> Rep a x) -> (forall x. Rep a x -> a) -> Generic a
forall txid tx x.
Rep (ServerState txid tx) x -> ServerState txid tx
forall txid tx x.
ServerState txid tx -> Rep (ServerState txid tx) x
$cfrom :: forall txid tx x.
ServerState txid tx -> Rep (ServerState txid tx) x
from :: forall x. ServerState txid tx -> Rep (ServerState txid tx) x
$cto :: forall txid tx x.
Rep (ServerState txid tx) x -> ServerState txid tx
to :: forall x. Rep (ServerState txid tx) x -> ServerState txid tx
Generic)
instance ( NoThunks txid
, NoThunks tx
) => NoThunks (ServerState txid tx)
initialServerState :: ServerState txid tx
initialServerState :: forall txid tx. ServerState txid tx
initialServerState = Word16
-> StrictSeq txid
-> Map txid SizeInBytes
-> Map txid (Maybe tx)
-> Word16
-> ServerState txid tx
forall txid tx.
Word16
-> StrictSeq txid
-> Map txid SizeInBytes
-> Map txid (Maybe tx)
-> Word16
-> ServerState txid tx
ServerState Word16
0 StrictSeq txid
forall a. StrictSeq a
Seq.empty Map txid SizeInBytes
forall k a. Map k a
Map.empty Map txid (Maybe tx)
forall k a. Map k a
Map.empty Word16
0
txSubmissionInbound
:: forall txid tx idx m.
( Ord txid
, NoThunks txid
, NoThunks tx
, MonadSTM m
, MonadThrow m
, MonadDelay m
)
=> Tracer m (TraceTxSubmissionInbound txid tx)
-> NumTxIdsToAck
-> TxSubmissionMempoolReader txid tx idx m
-> TxSubmissionMempoolWriter txid tx idx m
-> NodeToNodeVersion
-> TxSubmissionServerPipelined txid tx m ()
txSubmissionInbound :: forall txid tx idx (m :: * -> *).
(Ord txid, NoThunks txid, NoThunks tx, MonadSTM m, MonadThrow m,
MonadDelay m) =>
Tracer m (TraceTxSubmissionInbound txid tx)
-> NumTxIdsToAck
-> TxSubmissionMempoolReader txid tx idx m
-> TxSubmissionMempoolWriter txid tx idx m
-> NodeToNodeVersion
-> TxSubmissionServerPipelined txid tx m ()
txSubmissionInbound Tracer m (TraceTxSubmissionInbound txid tx)
tracer (NumTxIdsToAck Word16
maxUnacked) TxSubmissionMempoolReader txid tx idx m
mpReader TxSubmissionMempoolWriter txid tx idx m
mpWriter NodeToNodeVersion
_version =
m (ServerStIdle 'Z txid tx m ())
-> TxSubmissionServerPipelined txid tx m ()
forall (m :: * -> *) txid tx a.
m (ServerStIdle 'Z txid tx m a)
-> TxSubmissionServerPipelined txid tx m a
TxSubmissionServerPipelined (m (ServerStIdle 'Z txid tx m ())
-> TxSubmissionServerPipelined txid tx m ())
-> m (ServerStIdle 'Z txid tx m ())
-> TxSubmissionServerPipelined txid tx m ()
forall a b. (a -> b) -> a -> b
$ do
#ifdef TXSUBMISSION_DELAY
DiffTime -> m ()
forall (m :: * -> *). MonadDelay m => DiffTime -> m ()
threadDelay (DiffTime -> Maybe DiffTime -> DiffTime
forall a. a -> Maybe a -> a
fromMaybe (-DiffTime
1) Maybe DiffTime
longWait)
#endif
StatefulM (ServerState txid tx) 'Z txid tx m
-> ServerState txid tx -> m (ServerStIdle 'Z txid tx m ())
forall s (n :: N) txid tx (m :: * -> *).
NoThunks s =>
StatefulM s n txid tx m -> s -> m (ServerStIdle n txid tx m ())
continueWithStateM (Nat 'Z -> StatefulM (ServerState txid tx) 'Z txid tx m
forall (n :: N).
Nat n -> StatefulM (ServerState txid tx) n txid tx m
serverIdle Nat 'Z
forall (n :: N). ('Z ~ n) => Nat n
Zero) ServerState txid tx
forall txid tx. ServerState txid tx
initialServerState
where
maxTxIdsToRequest :: Word16
maxTxIdsToRequest = Word16
3 :: Word16
maxTxToRequest :: Word16
maxTxToRequest = Word16
2 :: Word16
TxSubmissionMempoolReader{STM m (MempoolSnapshot txid tx idx)
mempoolGetSnapshot :: STM m (MempoolSnapshot txid tx idx)
mempoolGetSnapshot :: forall txid tx idx (m :: * -> *).
TxSubmissionMempoolReader txid tx idx m
-> STM m (MempoolSnapshot txid tx idx)
mempoolGetSnapshot} = TxSubmissionMempoolReader txid tx idx m
mpReader
TxSubmissionMempoolWriter
{ tx -> txid
txId :: forall txid tx idx (m :: * -> *).
TxSubmissionMempoolWriter txid tx idx m -> tx -> txid
txId :: tx -> txid
txId
, [tx] -> m [txid]
mempoolAddTxs :: forall txid tx idx (m :: * -> *).
TxSubmissionMempoolWriter txid tx idx m -> [tx] -> m [txid]
mempoolAddTxs :: [tx] -> m [txid]
mempoolAddTxs
} = TxSubmissionMempoolWriter txid tx idx m
mpWriter
serverIdle :: forall (n :: N).
Nat n
-> StatefulM (ServerState txid tx) n txid tx m
serverIdle :: forall (n :: N).
Nat n -> StatefulM (ServerState txid tx) n txid tx m
serverIdle Nat n
n = (ServerState txid tx -> m (ServerStIdle n txid tx m ()))
-> StatefulM (ServerState txid tx) n txid tx m
forall s (n :: N) txid tx (m :: * -> *).
(s -> m (ServerStIdle n txid tx m ())) -> StatefulM s n txid tx m
StatefulM ((ServerState txid tx -> m (ServerStIdle n txid tx m ()))
-> StatefulM (ServerState txid tx) n txid tx m)
-> (ServerState txid tx -> m (ServerStIdle n txid tx m ()))
-> StatefulM (ServerState txid tx) n txid tx m
forall a b. (a -> b) -> a -> b
$ \ServerState txid tx
st -> case Nat n
n of
Nat n
Zero -> do
if ServerState txid tx -> Bool
forall k. ServerState k tx -> Bool
canRequestMoreTxs ServerState txid tx
st
then do
Tracer m (TraceTxSubmissionInbound txid tx)
-> TraceTxSubmissionInbound txid tx -> m ()
forall (m :: * -> *) a. Tracer m a -> a -> m ()
traceWith Tracer m (TraceTxSubmissionInbound txid tx)
tracer (Int -> TraceTxSubmissionInbound txid tx
forall txid tx. Int -> TraceTxSubmissionInbound txid tx
TraceTxInboundCanRequestMoreTxs (Nat n -> Int
forall (n :: N). Nat n -> Int
natToInt Nat n
n))
ServerStIdle n txid tx m () -> m (ServerStIdle n txid tx m ())
forall a. a -> m a
forall (f :: * -> *) a. Applicative f => a -> f a
pure (ServerStIdle n txid tx m () -> m (ServerStIdle n txid tx m ()))
-> ServerStIdle n txid tx m () -> m (ServerStIdle n txid tx m ())
forall a b. (a -> b) -> a -> b
$ Stateful (ServerState txid tx) n txid tx m
-> ServerState txid tx -> ServerStIdle n txid tx m ()
forall s (n :: N) txid tx (m :: * -> *).
NoThunks s =>
Stateful s n txid tx m -> s -> ServerStIdle n txid tx m ()
continueWithState (Nat n -> Stateful (ServerState txid tx) n txid tx m
forall (n :: N).
Nat n -> Stateful (ServerState txid tx) n txid tx m
serverReqTxs Nat n
forall (n :: N). ('Z ~ n) => Nat n
Zero) ServerState txid tx
st
else do
Tracer m (TraceTxSubmissionInbound txid tx)
-> TraceTxSubmissionInbound txid tx -> m ()
forall (m :: * -> *) a. Tracer m a -> a -> m ()
traceWith Tracer m (TraceTxSubmissionInbound txid tx)
tracer (Int -> TraceTxSubmissionInbound txid tx
forall txid tx. Int -> TraceTxSubmissionInbound txid tx
TraceTxInboundCannotRequestMoreTxs (Nat n -> Int
forall (n :: N). Nat n -> Int
natToInt Nat n
n))
let numTxIdsToRequest :: Word16
numTxIdsToRequest = Word16
maxTxIdsToRequest Word16 -> Word16 -> Word16
forall a. Ord a => a -> a -> a
`min` Word16
maxUnacked
Bool
-> m (ServerStIdle n txid tx m ())
-> m (ServerStIdle n txid tx m ())
forall a. (?callStack::CallStack) => Bool -> a -> a
assert (ServerState txid tx -> Word16
forall txid tx. ServerState txid tx -> Word16
requestedTxIdsInFlight ServerState txid tx
st Word16 -> Word16 -> Bool
forall a. Eq a => a -> a -> Bool
== Word16
0
Bool -> Bool -> Bool
&& StrictSeq txid -> Bool
forall a. StrictSeq a -> Bool
Seq.null (ServerState txid tx -> StrictSeq txid
forall txid tx. ServerState txid tx -> StrictSeq txid
unacknowledgedTxIds ServerState txid tx
st)
Bool -> Bool -> Bool
&& Map txid SizeInBytes -> Bool
forall k a. Map k a -> Bool
Map.null (ServerState txid tx -> Map txid SizeInBytes
forall txid tx. ServerState txid tx -> Map txid SizeInBytes
availableTxids ServerState txid tx
st)
Bool -> Bool -> Bool
&& Map txid (Maybe tx) -> Bool
forall k a. Map k a -> Bool
Map.null (ServerState txid tx -> Map txid (Maybe tx)
forall txid tx. ServerState txid tx -> Map txid (Maybe tx)
bufferedTxs ServerState txid tx
st)) (m (ServerStIdle n txid tx m ())
-> m (ServerStIdle n txid tx m ()))
-> m (ServerStIdle n txid tx m ())
-> m (ServerStIdle n txid tx m ())
forall a b. (a -> b) -> a -> b
$
ServerStIdle n txid tx m () -> m (ServerStIdle n txid tx m ())
forall a. a -> m a
forall (f :: * -> *) a. Applicative f => a -> f a
pure (ServerStIdle n txid tx m () -> m (ServerStIdle n txid tx m ()))
-> ServerStIdle n txid tx m () -> m (ServerStIdle n txid tx m ())
forall a b. (a -> b) -> a -> b
$
NumTxIdsToAck
-> NumTxIdsToReq
-> m ()
-> (NonEmpty (txid, SizeInBytes)
-> m (ServerStIdle 'Z txid tx m ()))
-> ServerStIdle 'Z txid tx m ()
forall (m :: * -> *) a txid tx.
NumTxIdsToAck
-> NumTxIdsToReq
-> m a
-> (NonEmpty (txid, SizeInBytes)
-> m (ServerStIdle 'Z txid tx m a))
-> ServerStIdle 'Z txid tx m a
SendMsgRequestTxIdsBlocking
(Word16 -> NumTxIdsToAck
NumTxIdsToAck (ServerState txid tx -> Word16
forall txid tx. ServerState txid tx -> Word16
numTxsToAcknowledge ServerState txid tx
st))
(Word16 -> NumTxIdsToReq
NumTxIdsToReq Word16
numTxIdsToRequest)
(Tracer m (TraceTxSubmissionInbound txid tx)
-> TraceTxSubmissionInbound txid tx -> m ()
forall (m :: * -> *) a. Tracer m a -> a -> m ()
traceWith Tracer m (TraceTxSubmissionInbound txid tx)
tracer TraceTxSubmissionInbound txid tx
forall txid tx. TraceTxSubmissionInbound txid tx
TraceTxInboundTerminated)
( StatefulCollect (ServerState txid tx) 'Z txid tx m
-> ServerState txid tx
-> Collect txid tx
-> m (ServerStIdle 'Z txid tx m ())
forall s (n :: N) txid tx (m :: * -> *).
NoThunks s =>
StatefulCollect s n txid tx m
-> s -> Collect txid tx -> m (ServerStIdle n txid tx m ())
collectAndContinueWithState (Nat 'Z -> StatefulCollect (ServerState txid tx) 'Z txid tx m
forall (n :: N).
Nat n -> StatefulCollect (ServerState txid tx) n txid tx m
handleReply Nat 'Z
forall (n :: N). ('Z ~ n) => Nat n
Zero) ServerState txid tx
st {
numTxsToAcknowledge = 0,
requestedTxIdsInFlight = numTxIdsToRequest
}
(Collect txid tx -> m (ServerStIdle 'Z txid tx m ()))
-> (NonEmpty (txid, SizeInBytes) -> Collect txid tx)
-> NonEmpty (txid, SizeInBytes)
-> m (ServerStIdle 'Z txid tx m ())
forall b c a. (b -> c) -> (a -> b) -> a -> c
. NumTxIdsToReq -> [(txid, SizeInBytes)] -> Collect txid tx
forall txid tx.
NumTxIdsToReq -> [(txid, SizeInBytes)] -> Collect txid tx
CollectTxIds (Word16 -> NumTxIdsToReq
NumTxIdsToReq Word16
numTxIdsToRequest)
([(txid, SizeInBytes)] -> Collect txid tx)
-> (NonEmpty (txid, SizeInBytes) -> [(txid, SizeInBytes)])
-> NonEmpty (txid, SizeInBytes)
-> Collect txid tx
forall b c a. (b -> c) -> (a -> b) -> a -> c
. NonEmpty (txid, SizeInBytes) -> [(txid, SizeInBytes)]
forall a. NonEmpty a -> [a]
NonEmpty.toList)
Succ Nat n
n' -> if ServerState txid tx -> Bool
forall k. ServerState k tx -> Bool
canRequestMoreTxs ServerState txid tx
st
then do
Tracer m (TraceTxSubmissionInbound txid tx)
-> TraceTxSubmissionInbound txid tx -> m ()
forall (m :: * -> *) a. Tracer m a -> a -> m ()
traceWith Tracer m (TraceTxSubmissionInbound txid tx)
tracer (Int -> TraceTxSubmissionInbound txid tx
forall txid tx. Int -> TraceTxSubmissionInbound txid tx
TraceTxInboundCanRequestMoreTxs (Nat n -> Int
forall (n :: N). Nat n -> Int
natToInt Nat n
n))
ServerStIdle n txid tx m () -> m (ServerStIdle n txid tx m ())
forall a. a -> m a
forall (f :: * -> *) a. Applicative f => a -> f a
pure (ServerStIdle n txid tx m () -> m (ServerStIdle n txid tx m ()))
-> ServerStIdle n txid tx m () -> m (ServerStIdle n txid tx m ())
forall a b. (a -> b) -> a -> b
$ Maybe (ServerStIdle ('S n) txid tx m ())
-> (Collect txid tx -> m (ServerStIdle n txid tx m ()))
-> ServerStIdle ('S n) txid tx m ()
forall (n1 :: N) txid tx (m :: * -> *) a.
Maybe (ServerStIdle ('S n1) txid tx m a)
-> (Collect txid tx -> m (ServerStIdle n1 txid tx m a))
-> ServerStIdle ('S n1) txid tx m a
CollectPipelined
(ServerStIdle ('S n) txid tx m ()
-> Maybe (ServerStIdle ('S n) txid tx m ())
forall a. a -> Maybe a
Just (Stateful (ServerState txid tx) ('S n) txid tx m
-> ServerState txid tx -> ServerStIdle ('S n) txid tx m ()
forall s (n :: N) txid tx (m :: * -> *).
NoThunks s =>
Stateful s n txid tx m -> s -> ServerStIdle n txid tx m ()
continueWithState (Nat ('S n) -> Stateful (ServerState txid tx) ('S n) txid tx m
forall (n :: N).
Nat n -> Stateful (ServerState txid tx) n txid tx m
serverReqTxs (Nat n -> Nat ('S n)
forall (m :: N) (n :: N). (m ~ 'S n) => Nat n -> Nat m
Succ Nat n
n')) ServerState txid tx
st))
(StatefulCollect (ServerState txid tx) n txid tx m
-> ServerState txid tx
-> Collect txid tx
-> m (ServerStIdle n txid tx m ())
forall s (n :: N) txid tx (m :: * -> *).
NoThunks s =>
StatefulCollect s n txid tx m
-> s -> Collect txid tx -> m (ServerStIdle n txid tx m ())
collectAndContinueWithState (Nat n -> StatefulCollect (ServerState txid tx) n txid tx m
forall (n :: N).
Nat n -> StatefulCollect (ServerState txid tx) n txid tx m
handleReply Nat n
n') ServerState txid tx
st)
else do
Tracer m (TraceTxSubmissionInbound txid tx)
-> TraceTxSubmissionInbound txid tx -> m ()
forall (m :: * -> *) a. Tracer m a -> a -> m ()
traceWith Tracer m (TraceTxSubmissionInbound txid tx)
tracer (Int -> TraceTxSubmissionInbound txid tx
forall txid tx. Int -> TraceTxSubmissionInbound txid tx
TraceTxInboundCannotRequestMoreTxs (Nat n -> Int
forall (n :: N). Nat n -> Int
natToInt Nat n
n))
ServerStIdle n txid tx m () -> m (ServerStIdle n txid tx m ())
forall a. a -> m a
forall (f :: * -> *) a. Applicative f => a -> f a
pure (ServerStIdle n txid tx m () -> m (ServerStIdle n txid tx m ()))
-> ServerStIdle n txid tx m () -> m (ServerStIdle n txid tx m ())
forall a b. (a -> b) -> a -> b
$ Maybe (ServerStIdle ('S n) txid tx m ())
-> (Collect txid tx -> m (ServerStIdle n txid tx m ()))
-> ServerStIdle ('S n) txid tx m ()
forall (n1 :: N) txid tx (m :: * -> *) a.
Maybe (ServerStIdle ('S n1) txid tx m a)
-> (Collect txid tx -> m (ServerStIdle n1 txid tx m a))
-> ServerStIdle ('S n1) txid tx m a
CollectPipelined
Maybe (ServerStIdle ('S n) txid tx m ())
forall a. Maybe a
Nothing
(StatefulCollect (ServerState txid tx) n txid tx m
-> ServerState txid tx
-> Collect txid tx
-> m (ServerStIdle n txid tx m ())
forall s (n :: N) txid tx (m :: * -> *).
NoThunks s =>
StatefulCollect s n txid tx m
-> s -> Collect txid tx -> m (ServerStIdle n txid tx m ())
collectAndContinueWithState (Nat n -> StatefulCollect (ServerState txid tx) n txid tx m
forall (n :: N).
Nat n -> StatefulCollect (ServerState txid tx) n txid tx m
handleReply Nat n
n') ServerState txid tx
st)
where
canRequestMoreTxs :: ServerState k tx -> Bool
canRequestMoreTxs :: forall k. ServerState k tx -> Bool
canRequestMoreTxs ServerState k tx
st =
Bool -> Bool
not (Map k SizeInBytes -> Bool
forall k a. Map k a -> Bool
Map.null (ServerState k tx -> Map k SizeInBytes
forall txid tx. ServerState txid tx -> Map txid SizeInBytes
availableTxids ServerState k tx
st))
handleReply :: forall (n :: N).
Nat n
-> StatefulCollect (ServerState txid tx) n txid tx m
handleReply :: forall (n :: N).
Nat n -> StatefulCollect (ServerState txid tx) n txid tx m
handleReply Nat n
n = (ServerState txid tx
-> Collect txid tx -> m (ServerStIdle n txid tx m ()))
-> StatefulCollect (ServerState txid tx) n txid tx m
forall s (n :: N) txid tx (m :: * -> *).
(s -> Collect txid tx -> m (ServerStIdle n txid tx m ()))
-> StatefulCollect s n txid tx m
StatefulCollect ((ServerState txid tx
-> Collect txid tx -> m (ServerStIdle n txid tx m ()))
-> StatefulCollect (ServerState txid tx) n txid tx m)
-> (ServerState txid tx
-> Collect txid tx -> m (ServerStIdle n txid tx m ()))
-> StatefulCollect (ServerState txid tx) n txid tx m
forall a b. (a -> b) -> a -> b
$ \ServerState txid tx
st Collect txid tx
collect -> case Collect txid tx
collect of
CollectTxIds (NumTxIdsToReq Word16
reqNo) [(txid, SizeInBytes)]
txids -> do
let txidsSeq :: StrictSeq txid
txidsSeq = [txid] -> StrictSeq txid
forall a. [a] -> StrictSeq a
Seq.fromList (((txid, SizeInBytes) -> txid) -> [(txid, SizeInBytes)] -> [txid]
forall a b. (a -> b) -> [a] -> [b]
map (txid, SizeInBytes) -> txid
forall a b. (a, b) -> a
fst [(txid, SizeInBytes)]
txids)
txidsMap :: Map txid SizeInBytes
txidsMap = [(txid, SizeInBytes)] -> Map txid SizeInBytes
forall k a. Ord k => [(k, a)] -> Map k a
Map.fromList [(txid, SizeInBytes)]
txids
Bool -> m () -> m ()
forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
unless (StrictSeq txid -> Int
forall a. StrictSeq a -> Int
Seq.length StrictSeq txid
txidsSeq Int -> Int -> Bool
forall a. Ord a => a -> a -> Bool
<= Word16 -> Int
forall a b. (Integral a, Num b) => a -> b
fromIntegral Word16
reqNo) (m () -> m ()) -> m () -> m ()
forall a b. (a -> b) -> a -> b
$
TxSubmissionProtocolError -> m ()
forall e a. Exception e => e -> m a
forall (m :: * -> *) e a. (MonadThrow m, Exception e) => e -> m a
throwIO TxSubmissionProtocolError
ProtocolErrorTxIdsNotRequested
let st' :: ServerState txid tx
st' = ServerState txid tx
st {
requestedTxIdsInFlight = requestedTxIdsInFlight st - reqNo
}
mpSnapshot <- STM m (MempoolSnapshot txid tx idx)
-> m (MempoolSnapshot txid tx idx)
forall a. (?callStack::CallStack) => STM m a -> m a
forall (m :: * -> *) a.
(MonadSTM m, ?callStack::CallStack) =>
STM m a -> m a
atomically STM m (MempoolSnapshot txid tx idx)
mempoolGetSnapshot
continueWithStateM
(serverIdle n)
(acknowledgeTxIds st' txidsSeq txidsMap mpSnapshot)
CollectTxs [txid]
txids [tx]
txs -> do
let txsMap :: Map txid tx
txsMap :: Map txid tx
txsMap = [(txid, tx)] -> Map txid tx
forall k a. Ord k => [(k, a)] -> Map k a
Map.fromList [ (tx -> txid
txId tx
tx, tx
tx) | tx
tx <- [tx]
txs ]
txidsReceived :: Set txid
txidsReceived = Map txid tx -> Set txid
forall k a. Map k a -> Set k
Map.keysSet Map txid tx
txsMap
txidsRequested :: Set txid
txidsRequested = [txid] -> Set txid
forall a. Ord a => [a] -> Set a
Set.fromList [txid]
txids
Bool -> m () -> m ()
forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
unless (Set txid
txidsReceived Set txid -> Set txid -> Bool
forall a. Ord a => Set a -> Set a -> Bool
`Set.isSubsetOf` Set txid
txidsRequested) (m () -> m ()) -> m () -> m ()
forall a b. (a -> b) -> a -> b
$
TxSubmissionProtocolError -> m ()
forall e a. Exception e => e -> m a
forall (m :: * -> *) e a. (MonadThrow m, Exception e) => e -> m a
throwIO TxSubmissionProtocolError
ProtocolErrorTxNotRequested
let txIdsRequestedWithTxsReceived :: Map txid (Maybe tx)
txIdsRequestedWithTxsReceived :: Map txid (Maybe tx)
txIdsRequestedWithTxsReceived =
(tx -> Maybe tx) -> Map txid tx -> Map txid (Maybe tx)
forall a b k. (a -> b) -> Map k a -> Map k b
Map.map tx -> Maybe tx
forall a. a -> Maybe a
Just Map txid tx
txsMap
Map txid (Maybe tx) -> Map txid (Maybe tx) -> Map txid (Maybe tx)
forall a. Semigroup a => a -> a -> a
<> (txid -> Maybe tx) -> Set txid -> Map txid (Maybe tx)
forall k a. (k -> a) -> Set k -> Map k a
Map.fromSet (Maybe tx -> txid -> Maybe tx
forall a b. a -> b -> a
const Maybe tx
forall a. Maybe a
Nothing) Set txid
txidsRequested
bufferedTxs1 :: Map txid (Maybe tx)
bufferedTxs1 = ServerState txid tx -> Map txid (Maybe tx)
forall txid tx. ServerState txid tx -> Map txid (Maybe tx)
bufferedTxs ServerState txid tx
st Map txid (Maybe tx) -> Map txid (Maybe tx) -> Map txid (Maybe tx)
forall a. Semigroup a => a -> a -> a
<> Map txid (Maybe tx)
txIdsRequestedWithTxsReceived
(StrictSeq txid
acknowledgedTxIds, StrictSeq txid
unacknowledgedTxIds') =
(txid -> Bool)
-> StrictSeq txid -> (StrictSeq txid, StrictSeq txid)
forall a. (a -> Bool) -> StrictSeq a -> (StrictSeq a, StrictSeq a)
Seq.spanl (txid -> Map txid (Maybe tx) -> Bool
forall k a. Ord k => k -> Map k a -> Bool
`Map.member` Map txid (Maybe tx)
bufferedTxs1) (ServerState txid tx -> StrictSeq txid
forall txid tx. ServerState txid tx -> StrictSeq txid
unacknowledgedTxIds ServerState txid tx
st)
txsReady :: [tx]
txsReady = (txid -> [tx] -> [tx]) -> [tx] -> StrictSeq txid -> [tx]
forall a b. (a -> b -> b) -> b -> StrictSeq a -> b
forall (t :: * -> *) a b.
Foldable t =>
(a -> b -> b) -> b -> t a -> b
foldr (\txid
txid [tx]
r -> [tx] -> (tx -> [tx]) -> Maybe tx -> [tx]
forall b a. b -> (a -> b) -> Maybe a -> b
maybe [tx]
r (tx -> [tx] -> [tx]
forall a. a -> [a] -> [a]
:[tx]
r) (Map txid (Maybe tx)
bufferedTxs1 Map txid (Maybe tx) -> txid -> Maybe tx
forall k a. Ord k => Map k a -> k -> a
Map.! txid
txid))
[] StrictSeq txid
acknowledgedTxIds
bufferedTxs2 :: Map txid (Maybe tx)
bufferedTxs2 = (Map txid (Maybe tx) -> txid -> Map txid (Maybe tx))
-> Map txid (Maybe tx) -> StrictSeq txid -> Map txid (Maybe tx)
forall b a. (b -> a -> b) -> b -> StrictSeq a -> b
forall (t :: * -> *) b a.
Foldable t =>
(b -> a -> b) -> b -> t a -> b
Foldable.foldl' ((txid -> Map txid (Maybe tx) -> Map txid (Maybe tx))
-> Map txid (Maybe tx) -> txid -> Map txid (Maybe tx)
forall a b c. (a -> b -> c) -> b -> a -> c
flip txid -> Map txid (Maybe tx) -> Map txid (Maybe tx)
forall k a. Ord k => k -> Map k a -> Map k a
Map.delete)
Map txid (Maybe tx)
bufferedTxs1 StrictSeq txid
acknowledgedTxIds
live :: [txid]
live = (txid -> Bool) -> [txid] -> [txid]
forall a. (a -> Bool) -> [a] -> [a]
filter (txid -> StrictSeq txid -> Bool
forall a. Eq a => a -> StrictSeq a -> Bool
forall (t :: * -> *) a. (Foldable t, Eq a) => a -> t a -> Bool
`elem` StrictSeq txid
unacknowledgedTxIds') ([txid] -> [txid]) -> [txid] -> [txid]
forall a b. (a -> b) -> a -> b
$ StrictSeq txid -> [txid]
forall a. StrictSeq a -> [a]
forall (t :: * -> *) a. Foldable t => t a -> [a]
toList StrictSeq txid
acknowledgedTxIds
bufferedTxs3 :: Map txid (Maybe tx)
bufferedTxs3 = Map txid (Maybe tx) -> Map txid (Maybe tx)
forall (t :: * -> *) a. Foldable t => t a -> t a
forceElemsToWHNF (Map txid (Maybe tx) -> Map txid (Maybe tx))
-> Map txid (Maybe tx) -> Map txid (Maybe tx)
forall a b. (a -> b) -> a -> b
$ Map txid (Maybe tx)
bufferedTxs2 Map txid (Maybe tx) -> Map txid (Maybe tx) -> Map txid (Maybe tx)
forall a. Semigroup a => a -> a -> a
<>
[(txid, Maybe tx)] -> Map txid (Maybe tx)
forall k a. Ord k => [(k, a)] -> Map k a
Map.fromList ([txid] -> [Maybe tx] -> [(txid, Maybe tx)]
forall a b. [a] -> [b] -> [(a, b)]
zip [txid]
live (Maybe tx -> [Maybe tx]
forall a. a -> [a]
repeat Maybe tx
forall a. Maybe a
Nothing))
let !collected :: Int
collected = [tx] -> Int
forall a. [a] -> Int
forall (t :: * -> *) a. Foldable t => t a -> Int
length [tx]
txs
Tracer m (TraceTxSubmissionInbound txid tx)
-> TraceTxSubmissionInbound txid tx -> m ()
forall (m :: * -> *) a. Tracer m a -> a -> m ()
traceWith Tracer m (TraceTxSubmissionInbound txid tx)
tracer (TraceTxSubmissionInbound txid tx -> m ())
-> TraceTxSubmissionInbound txid tx -> m ()
forall a b. (a -> b) -> a -> b
$
Int -> TraceTxSubmissionInbound txid tx
forall txid tx. Int -> TraceTxSubmissionInbound txid tx
TraceTxSubmissionCollected Int
collected
txidsAccepted <- [tx] -> m [txid]
mempoolAddTxs [tx]
txsReady
let !accepted = [txid] -> Int
forall a. [a] -> Int
forall (t :: * -> *) a. Foldable t => t a -> Int
length [txid]
txidsAccepted
traceWith tracer $ TraceTxSubmissionProcessed ProcessedTxCount {
ptxcAccepted = accepted
, ptxcRejected = collected - accepted
}
continueWithStateM (serverIdle n) st {
bufferedTxs = bufferedTxs3,
unacknowledgedTxIds = unacknowledgedTxIds',
numTxsToAcknowledge = numTxsToAcknowledge st
+ fromIntegral (Seq.length acknowledgedTxIds)
}
acknowledgeTxIds :: ServerState txid tx
-> StrictSeq txid
-> Map txid SizeInBytes
-> MempoolSnapshot txid tx idx
-> ServerState txid tx
acknowledgeTxIds :: ServerState txid tx
-> StrictSeq txid
-> Map txid SizeInBytes
-> MempoolSnapshot txid tx idx
-> ServerState txid tx
acknowledgeTxIds ServerState txid tx
st StrictSeq txid
txidsSeq Map txid SizeInBytes
_ MempoolSnapshot txid tx idx
_ | StrictSeq txid -> Bool
forall a. StrictSeq a -> Bool
Seq.null StrictSeq txid
txidsSeq = ServerState txid tx
st
acknowledgeTxIds ServerState txid tx
st StrictSeq txid
txidsSeq Map txid SizeInBytes
txidsMap MempoolSnapshot{txid -> Bool
mempoolHasTx :: txid -> Bool
mempoolHasTx :: forall txid tx idx. MempoolSnapshot txid tx idx -> txid -> Bool
mempoolHasTx} =
ServerState txid tx
st {
availableTxids = availableTxids',
bufferedTxs = bufferedTxs'',
unacknowledgedTxIds = unacknowledgedTxIds'',
numTxsToAcknowledge = numTxsToAcknowledge st
+ fromIntegral (Seq.length acknowledgedTxIds)
}
where
(Map txid SizeInBytes
ignoredTxids, Map txid SizeInBytes
availableTxidsMp) =
(txid -> SizeInBytes -> Bool)
-> Map txid SizeInBytes
-> (Map txid SizeInBytes, Map txid SizeInBytes)
forall k a. (k -> a -> Bool) -> Map k a -> (Map k a, Map k a)
Map.partitionWithKey
(\txid
txid SizeInBytes
_ -> txid -> Bool
mempoolHasTx txid
txid)
Map txid SizeInBytes
txidsMap
availableTxidsU :: Map txid SizeInBytes
availableTxidsU =
(txid -> SizeInBytes -> Bool)
-> Map txid SizeInBytes -> Map txid SizeInBytes
forall k a. (k -> a -> Bool) -> Map k a -> Map k a
Map.filterWithKey
(\txid
txid SizeInBytes
_ -> txid -> StrictSeq txid -> Bool
forall (t :: * -> *) a. (Foldable t, Eq a) => a -> t a -> Bool
notElem txid
txid (ServerState txid tx -> StrictSeq txid
forall txid tx. ServerState txid tx -> StrictSeq txid
unacknowledgedTxIds ServerState txid tx
st))
Map txid SizeInBytes
txidsMap
availableTxids' :: Map txid SizeInBytes
availableTxids' = ServerState txid tx -> Map txid SizeInBytes
forall txid tx. ServerState txid tx -> Map txid SizeInBytes
availableTxids ServerState txid tx
st Map txid SizeInBytes
-> Map txid SizeInBytes -> Map txid SizeInBytes
forall a. Semigroup a => a -> a -> a
<> Map txid SizeInBytes
-> Map txid SizeInBytes -> Map txid SizeInBytes
forall k a b. Ord k => Map k a -> Map k b -> Map k a
Map.intersection Map txid SizeInBytes
availableTxidsMp Map txid SizeInBytes
availableTxidsU
bufferedTxs' :: Map txid (Maybe tx)
bufferedTxs' = ServerState txid tx -> Map txid (Maybe tx)
forall txid tx. ServerState txid tx -> Map txid (Maybe tx)
bufferedTxs ServerState txid tx
st
Map txid (Maybe tx) -> Map txid (Maybe tx) -> Map txid (Maybe tx)
forall a. Semigroup a => a -> a -> a
<> (SizeInBytes -> Maybe tx)
-> Map txid SizeInBytes -> Map txid (Maybe tx)
forall a b k. (a -> b) -> Map k a -> Map k b
Map.map (Maybe tx -> SizeInBytes -> Maybe tx
forall a b. a -> b -> a
const Maybe tx
forall a. Maybe a
Nothing) Map txid SizeInBytes
ignoredTxids
unacknowledgedTxIds' :: StrictSeq txid
unacknowledgedTxIds' = ServerState txid tx -> StrictSeq txid
forall txid tx. ServerState txid tx -> StrictSeq txid
unacknowledgedTxIds ServerState txid tx
st StrictSeq txid -> StrictSeq txid -> StrictSeq txid
forall a. Semigroup a => a -> a -> a
<> StrictSeq txid
txidsSeq
(StrictSeq txid
acknowledgedTxIds, StrictSeq txid
unacknowledgedTxIds'') =
(txid -> Bool)
-> StrictSeq txid -> (StrictSeq txid, StrictSeq txid)
forall a. (a -> Bool) -> StrictSeq a -> (StrictSeq a, StrictSeq a)
Seq.spanl (txid -> Map txid (Maybe tx) -> Bool
forall k a. Ord k => k -> Map k a -> Bool
`Map.member` Map txid (Maybe tx)
bufferedTxs') StrictSeq txid
unacknowledgedTxIds'
bufferedTxs'' :: Map txid (Maybe tx)
bufferedTxs'' = Map txid (Maybe tx) -> Map txid (Maybe tx)
forall (t :: * -> *) a. Foldable t => t a -> t a
forceElemsToWHNF (Map txid (Maybe tx) -> Map txid (Maybe tx))
-> Map txid (Maybe tx) -> Map txid (Maybe tx)
forall a b. (a -> b) -> a -> b
$ (Map txid (Maybe tx) -> txid -> Map txid (Maybe tx))
-> Map txid (Maybe tx) -> StrictSeq txid -> Map txid (Maybe tx)
forall b a. (b -> a -> b) -> b -> StrictSeq a -> b
forall (t :: * -> *) b a.
Foldable t =>
(b -> a -> b) -> b -> t a -> b
Foldable.foldl' (\Map txid (Maybe tx)
m txid
txid -> if txid -> StrictSeq txid -> Bool
forall a. Eq a => a -> StrictSeq a -> Bool
forall (t :: * -> *) a. (Foldable t, Eq a) => a -> t a -> Bool
elem txid
txid StrictSeq txid
unacknowledgedTxIds''
then Map txid (Maybe tx)
m
else txid -> Map txid (Maybe tx) -> Map txid (Maybe tx)
forall k a. Ord k => k -> Map k a -> Map k a
Map.delete txid
txid Map txid (Maybe tx)
m)
Map txid (Maybe tx)
bufferedTxs' StrictSeq txid
acknowledgedTxIds
serverReqTxs :: forall (n :: N).
Nat n
-> Stateful (ServerState txid tx) n txid tx m
serverReqTxs :: forall (n :: N).
Nat n -> Stateful (ServerState txid tx) n txid tx m
serverReqTxs Nat n
n = (ServerState txid tx -> ServerStIdle n txid tx m ())
-> Stateful (ServerState txid tx) n txid tx m
forall s (n :: N) txid tx (m :: * -> *).
(s -> ServerStIdle n txid tx m ()) -> Stateful s n txid tx m
Stateful ((ServerState txid tx -> ServerStIdle n txid tx m ())
-> Stateful (ServerState txid tx) n txid tx m)
-> (ServerState txid tx -> ServerStIdle n txid tx m ())
-> Stateful (ServerState txid tx) n txid tx m
forall a b. (a -> b) -> a -> b
$ \ServerState txid tx
st -> do
let (Map txid SizeInBytes
txsToRequest, Map txid SizeInBytes
availableTxids') =
Int
-> Map txid SizeInBytes
-> (Map txid SizeInBytes, Map txid SizeInBytes)
forall k a. Int -> Map k a -> (Map k a, Map k a)
Map.splitAt (Word16 -> Int
forall a b. (Integral a, Num b) => a -> b
fromIntegral Word16
maxTxToRequest) (ServerState txid tx -> Map txid SizeInBytes
forall txid tx. ServerState txid tx -> Map txid SizeInBytes
availableTxids ServerState txid tx
st)
[txid]
-> m (ServerStIdle ('S n) txid tx m ())
-> ServerStIdle n txid tx m ()
forall txid (m :: * -> *) (n :: N) tx a.
[txid]
-> m (ServerStIdle ('S n) txid tx m a)
-> ServerStIdle n txid tx m a
SendMsgRequestTxsPipelined
(Map txid SizeInBytes -> [txid]
forall k a. Map k a -> [k]
Map.keys Map txid SizeInBytes
txsToRequest)
(StatefulM (ServerState txid tx) ('S n) txid tx m
-> ServerState txid tx -> m (ServerStIdle ('S n) txid tx m ())
forall s (n :: N) txid tx (m :: * -> *).
NoThunks s =>
StatefulM s n txid tx m -> s -> m (ServerStIdle n txid tx m ())
continueWithStateM (Nat ('S n) -> StatefulM (ServerState txid tx) ('S n) txid tx m
forall (n :: N).
Nat n -> StatefulM (ServerState txid tx) n txid tx m
serverReqTxIds (Nat n -> Nat ('S n)
forall (m :: N) (n :: N). (m ~ 'S n) => Nat n -> Nat m
Succ Nat n
n)) ServerState txid tx
st {
availableTxids = availableTxids'
})
serverReqTxIds :: forall (n :: N).
Nat n
-> StatefulM (ServerState txid tx) n txid tx m
serverReqTxIds :: forall (n :: N).
Nat n -> StatefulM (ServerState txid tx) n txid tx m
serverReqTxIds Nat n
n = (ServerState txid tx -> m (ServerStIdle n txid tx m ()))
-> StatefulM (ServerState txid tx) n txid tx m
forall s (n :: N) txid tx (m :: * -> *).
(s -> m (ServerStIdle n txid tx m ())) -> StatefulM s n txid tx m
StatefulM ((ServerState txid tx -> m (ServerStIdle n txid tx m ()))
-> StatefulM (ServerState txid tx) n txid tx m)
-> (ServerState txid tx -> m (ServerStIdle n txid tx m ()))
-> StatefulM (ServerState txid tx) n txid tx m
forall a b. (a -> b) -> a -> b
$ \ServerState txid tx
st -> do
let numTxIdsToRequest :: Word16
numTxIdsToRequest =
(Word16
maxUnacked
Word16 -> Word16 -> Word16
forall a. Num a => a -> a -> a
- Int -> Word16
forall a b. (Integral a, Num b) => a -> b
fromIntegral (StrictSeq txid -> Int
forall a. StrictSeq a -> Int
Seq.length (ServerState txid tx -> StrictSeq txid
forall txid tx. ServerState txid tx -> StrictSeq txid
unacknowledgedTxIds ServerState txid tx
st))
Word16 -> Word16 -> Word16
forall a. Num a => a -> a -> a
- ServerState txid tx -> Word16
forall txid tx. ServerState txid tx -> Word16
requestedTxIdsInFlight ServerState txid tx
st)
Word16 -> Word16 -> Word16
forall a. Ord a => a -> a -> a
`min` Word16
maxTxIdsToRequest
if Word16
numTxIdsToRequest Word16 -> Word16 -> Bool
forall a. Ord a => a -> a -> Bool
> Word16
0
then ServerStIdle n txid tx m () -> m (ServerStIdle n txid tx m ())
forall a. a -> m a
forall (f :: * -> *) a. Applicative f => a -> f a
pure (ServerStIdle n txid tx m () -> m (ServerStIdle n txid tx m ()))
-> ServerStIdle n txid tx m () -> m (ServerStIdle n txid tx m ())
forall a b. (a -> b) -> a -> b
$ NumTxIdsToAck
-> NumTxIdsToReq
-> m (ServerStIdle ('S n) txid tx m ())
-> ServerStIdle n txid tx m ()
forall (m :: * -> *) (n :: N) txid tx a.
NumTxIdsToAck
-> NumTxIdsToReq
-> m (ServerStIdle ('S n) txid tx m a)
-> ServerStIdle n txid tx m a
SendMsgRequestTxIdsPipelined
(Word16 -> NumTxIdsToAck
NumTxIdsToAck (ServerState txid tx -> Word16
forall txid tx. ServerState txid tx -> Word16
numTxsToAcknowledge ServerState txid tx
st))
(Word16 -> NumTxIdsToReq
NumTxIdsToReq Word16
numTxIdsToRequest)
(StatefulM (ServerState txid tx) ('S n) txid tx m
-> ServerState txid tx -> m (ServerStIdle ('S n) txid tx m ())
forall s (n :: N) txid tx (m :: * -> *).
NoThunks s =>
StatefulM s n txid tx m -> s -> m (ServerStIdle n txid tx m ())
continueWithStateM (Nat ('S n) -> StatefulM (ServerState txid tx) ('S n) txid tx m
forall (n :: N).
Nat n -> StatefulM (ServerState txid tx) n txid tx m
serverIdle (Nat n -> Nat ('S n)
forall (m :: N) (n :: N). (m ~ 'S n) => Nat n -> Nat m
Succ Nat n
n)) ServerState txid tx
st {
requestedTxIdsInFlight = requestedTxIdsInFlight st
+ numTxIdsToRequest,
numTxsToAcknowledge = 0
})
else StatefulM (ServerState txid tx) n txid tx m
-> ServerState txid tx -> m (ServerStIdle n txid tx m ())
forall s (n :: N) txid tx (m :: * -> *).
NoThunks s =>
StatefulM s n txid tx m -> s -> m (ServerStIdle n txid tx m ())
continueWithStateM (Nat n -> StatefulM (ServerState txid tx) n txid tx m
forall (n :: N).
Nat n -> StatefulM (ServerState txid tx) n txid tx m
serverIdle Nat n
n) ServerState txid tx
st
newtype Stateful s n txid tx m = Stateful (s -> ServerStIdle n txid tx m ())
newtype StatefulM s n txid tx m
= StatefulM (s -> m (ServerStIdle n txid tx m ()))
newtype StatefulCollect s n txid tx m
= StatefulCollect (s -> Collect txid tx -> m (ServerStIdle n txid tx m ()))
continueWithState :: NoThunks s
=> Stateful s n txid tx m
-> s
-> ServerStIdle n txid tx m ()
continueWithState :: forall s (n :: N) txid tx (m :: * -> *).
NoThunks s =>
Stateful s n txid tx m -> s -> ServerStIdle n txid tx m ()
continueWithState (Stateful s -> ServerStIdle n txid tx m ()
f) !s
st =
Maybe String
-> ServerStIdle n txid tx m () -> ServerStIdle n txid tx m ()
forall a. (?callStack::CallStack) => Maybe String -> a -> a
checkInvariant (ThunkInfo -> String
forall a. Show a => a -> String
show (ThunkInfo -> String) -> Maybe ThunkInfo -> Maybe String
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> s -> Maybe ThunkInfo
forall a. NoThunks a => a -> Maybe ThunkInfo
unsafeNoThunks s
st) (s -> ServerStIdle n txid tx m ()
f s
st)
continueWithStateM :: NoThunks s
=> StatefulM s n txid tx m
-> s
-> m (ServerStIdle n txid tx m ())
continueWithStateM :: forall s (n :: N) txid tx (m :: * -> *).
NoThunks s =>
StatefulM s n txid tx m -> s -> m (ServerStIdle n txid tx m ())
continueWithStateM (StatefulM s -> m (ServerStIdle n txid tx m ())
f) !s
st =
Maybe String
-> m (ServerStIdle n txid tx m ())
-> m (ServerStIdle n txid tx m ())
forall a. (?callStack::CallStack) => Maybe String -> a -> a
checkInvariant (ThunkInfo -> String
forall a. Show a => a -> String
show (ThunkInfo -> String) -> Maybe ThunkInfo -> Maybe String
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> s -> Maybe ThunkInfo
forall a. NoThunks a => a -> Maybe ThunkInfo
unsafeNoThunks s
st) (s -> m (ServerStIdle n txid tx m ())
f s
st)
{-# NOINLINE continueWithStateM #-}
collectAndContinueWithState :: NoThunks s
=> StatefulCollect s n txid tx m
-> s
-> Collect txid tx
-> m (ServerStIdle n txid tx m ())
collectAndContinueWithState :: forall s (n :: N) txid tx (m :: * -> *).
NoThunks s =>
StatefulCollect s n txid tx m
-> s -> Collect txid tx -> m (ServerStIdle n txid tx m ())
collectAndContinueWithState (StatefulCollect s -> Collect txid tx -> m (ServerStIdle n txid tx m ())
f) !s
st Collect txid tx
c =
Maybe String
-> m (ServerStIdle n txid tx m ())
-> m (ServerStIdle n txid tx m ())
forall a. (?callStack::CallStack) => Maybe String -> a -> a
checkInvariant (ThunkInfo -> String
forall a. Show a => a -> String
show (ThunkInfo -> String) -> Maybe ThunkInfo -> Maybe String
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> s -> Maybe ThunkInfo
forall a. NoThunks a => a -> Maybe ThunkInfo
unsafeNoThunks s
st) (s -> Collect txid tx -> m (ServerStIdle n txid tx m ())
f s
st Collect txid tx
c)
{-# NOINLINE collectAndContinueWithState #-}