{-# LANGUAGE BangPatterns #-}
{-# LANGUAGE BlockArguments #-}
{-# LANGUAGE LambdaCase #-}
{-# LANGUAGE NamedFieldPuns #-}
{-# LANGUAGE ScopedTypeVariables #-}
module Ouroboros.Network.TxSubmission.Inbound.V2.Registry
( TxChannels (..)
, TxChannelsVar
, TxMempoolSem
, SharedTxStateVar
, newSharedTxStateVar
, newTxChannelsVar
, newTxMempoolSem
, PeerTxAPI (..)
, decisionLogicThreads
, withPeer
) where
import Control.Concurrent.Class.MonadMVar.Strict
import Control.Concurrent.Class.MonadSTM.Strict
import Control.Concurrent.Class.MonadSTM.TSem
import Control.Monad.Class.MonadAsync
import Control.Monad.Class.MonadFork
import Control.Monad.Class.MonadThrow
import Control.Monad.Class.MonadTime.SI
import Control.Monad.Class.MonadTimer.SI
import Data.Foldable as Foldable (foldl', traverse_)
import Data.Hashable
import Data.Map.Strict (Map)
import Data.Map.Strict qualified as Map
import Data.Maybe (fromMaybe)
import Data.Sequence.Strict (StrictSeq)
import Data.Sequence.Strict qualified as StrictSeq
import Data.Set qualified as Set
import Data.Typeable (Typeable)
import Data.Void (Void)
import Control.Tracer (Tracer, traceWith)
import Ouroboros.Network.Protocol.TxSubmission2.Type
import Ouroboros.Network.TxSubmission.Inbound.V2.Decision
import Ouroboros.Network.TxSubmission.Inbound.V2.Policy
import Ouroboros.Network.TxSubmission.Inbound.V2.State
import Ouroboros.Network.TxSubmission.Inbound.V2.Types
import Ouroboros.Network.TxSubmission.Mempool.Reader
newtype TxChannels m peeraddr txid tx = TxChannels {
forall (m :: * -> *) peeraddr txid tx.
TxChannels m peeraddr txid tx
-> Map peeraddr (StrictMVar m (TxDecision txid tx))
txChannelMap :: Map peeraddr (StrictMVar m (TxDecision txid tx))
}
type TxChannelsVar m peeraddr txid tx = StrictMVar m (TxChannels m peeraddr txid tx)
newTxChannelsVar :: MonadMVar m => m (TxChannelsVar m peeraddr txid tx)
newTxChannelsVar :: forall (m :: * -> *) peeraddr txid tx.
MonadMVar m =>
m (TxChannelsVar m peeraddr txid tx)
newTxChannelsVar = TxChannels m peeraddr txid tx
-> m (StrictMVar m (TxChannels m peeraddr txid tx))
forall (m :: * -> *) a. MonadMVar m => a -> m (StrictMVar m a)
newMVar (Map peeraddr (StrictMVar m (TxDecision txid tx))
-> TxChannels m peeraddr txid tx
forall (m :: * -> *) peeraddr txid tx.
Map peeraddr (StrictMVar m (TxDecision txid tx))
-> TxChannels m peeraddr txid tx
TxChannels Map peeraddr (StrictMVar m (TxDecision txid tx))
forall k a. Map k a
Map.empty)
newtype TxMempoolSem m = TxMempoolSem (TSem m)
newTxMempoolSem :: MonadSTM m => m (TxMempoolSem m)
newTxMempoolSem :: forall (m :: * -> *). MonadSTM m => m (TxMempoolSem m)
newTxMempoolSem = TSem m -> TxMempoolSem m
forall (m :: * -> *). TSem m -> TxMempoolSem m
TxMempoolSem (TSem m -> TxMempoolSem m) -> m (TSem m) -> m (TxMempoolSem m)
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> STM m (TSem m) -> m (TSem m)
forall a. HasCallStack => STM m a -> m a
forall (m :: * -> *) a.
(MonadSTM m, HasCallStack) =>
STM m a -> m a
atomically (Integer -> STM m (TSem m)
forall (m :: * -> *). MonadSTM m => Integer -> STM m (TSem m)
newTSem Integer
1)
data PeerTxAPI m txid tx = PeerTxAPI {
forall (m :: * -> *) txid tx.
PeerTxAPI m txid tx -> m (TxDecision txid tx)
readTxDecision :: m (TxDecision txid tx),
forall (m :: * -> *) txid tx.
PeerTxAPI m txid tx
-> NumTxIdsToReq -> StrictSeq txid -> Map txid SizeInBytes -> m ()
handleReceivedTxIds :: NumTxIdsToReq
-> StrictSeq txid
-> Map txid SizeInBytes
-> m (),
forall (m :: * -> *) txid tx.
PeerTxAPI m txid tx
-> Map txid SizeInBytes
-> Map txid tx
-> m (Maybe TxSubmissionProtocolError)
handleReceivedTxs :: Map txid SizeInBytes
-> Map txid tx
-> m (Maybe TxSubmissionProtocolError),
forall (m :: * -> *) txid tx.
PeerTxAPI m txid tx
-> Tracer m (TraceTxSubmissionInbound txid tx)
-> txid
-> tx
-> m ()
submitTxToMempool :: Tracer m (TraceTxSubmissionInbound txid tx)
-> txid -> tx -> m ()
}
data TxMempoolResult = TxAccepted | TxRejected
withPeer
:: forall tx peeraddr txid idx m a.
( MonadMask m
, MonadMVar m
, MonadSTM m
, MonadMonotonicTime m
, Ord txid
, Show txid
, Typeable txid
, Ord peeraddr
, Show peeraddr
)
=> Tracer m (TraceTxLogic peeraddr txid tx)
-> TxChannelsVar m peeraddr txid tx
-> TxMempoolSem m
-> TxDecisionPolicy
-> SharedTxStateVar m peeraddr txid tx
-> TxSubmissionMempoolReader txid tx idx m
-> TxSubmissionMempoolWriter txid tx idx m
-> (tx -> SizeInBytes)
-> peeraddr
-> (PeerTxAPI m txid tx -> m a)
-> m a
withPeer :: forall tx peeraddr txid idx (m :: * -> *) a.
(MonadMask m, MonadMVar m, MonadSTM m, MonadMonotonicTime m,
Ord txid, Show txid, Typeable txid, Ord peeraddr, Show peeraddr) =>
Tracer m (TraceTxLogic peeraddr txid tx)
-> TxChannelsVar m peeraddr txid tx
-> TxMempoolSem m
-> TxDecisionPolicy
-> SharedTxStateVar m peeraddr txid tx
-> TxSubmissionMempoolReader txid tx idx m
-> TxSubmissionMempoolWriter txid tx idx m
-> (tx -> SizeInBytes)
-> peeraddr
-> (PeerTxAPI m txid tx -> m a)
-> m a
withPeer Tracer m (TraceTxLogic peeraddr txid tx)
tracer
TxChannelsVar m peeraddr txid tx
channelsVar
(TxMempoolSem TSem m
mempoolSem)
policy :: TxDecisionPolicy
policy@TxDecisionPolicy { DiffTime
bufferedTxsMinLifetime :: DiffTime
bufferedTxsMinLifetime :: TxDecisionPolicy -> DiffTime
bufferedTxsMinLifetime }
SharedTxStateVar m peeraddr txid tx
sharedStateVar
TxSubmissionMempoolReader { STM m (MempoolSnapshot txid tx idx)
mempoolGetSnapshot :: STM m (MempoolSnapshot txid tx idx)
mempoolGetSnapshot :: forall txid tx idx (m :: * -> *).
TxSubmissionMempoolReader txid tx idx m
-> STM m (MempoolSnapshot txid tx idx)
mempoolGetSnapshot }
TxSubmissionMempoolWriter { [tx] -> m [txid]
mempoolAddTxs :: [tx] -> m [txid]
mempoolAddTxs :: forall txid tx idx (m :: * -> *).
TxSubmissionMempoolWriter txid tx idx m -> [tx] -> m [txid]
mempoolAddTxs }
tx -> SizeInBytes
txSize
peeraddr
peeraddr PeerTxAPI m txid tx -> m a
io =
m (PeerTxAPI m txid tx)
-> (PeerTxAPI m txid tx -> m ())
-> (PeerTxAPI m txid tx -> m a)
-> m a
forall a b c. m a -> (a -> m b) -> (a -> m c) -> m c
forall (m :: * -> *) a b c.
MonadThrow m =>
m a -> (a -> m b) -> (a -> m c) -> m c
bracket
(do
!peerTxAPI <-
TxChannelsVar m peeraddr txid tx
-> (TxChannels m peeraddr txid tx
-> m (TxChannels m peeraddr txid tx, PeerTxAPI m txid tx))
-> m (PeerTxAPI m txid tx)
forall (m :: * -> *) a b.
MonadMVar m =>
StrictMVar m a -> (a -> m (a, b)) -> m b
modifyMVar TxChannelsVar m peeraddr txid tx
channelsVar
\ TxChannels { Map peeraddr (StrictMVar m (TxDecision txid tx))
txChannelMap :: forall (m :: * -> *) peeraddr txid tx.
TxChannels m peeraddr txid tx
-> Map peeraddr (StrictMVar m (TxDecision txid tx))
txChannelMap :: Map peeraddr (StrictMVar m (TxDecision txid tx))
txChannelMap } -> do
chann <- m (StrictMVar m (TxDecision txid tx))
forall (m :: * -> *) a. MonadMVar m => m (StrictMVar m a)
newEmptyMVar
let (chann', txChannelMap') =
Map.alterF (\Maybe (StrictMVar m (TxDecision txid tx))
mbChann ->
let !chann'' :: StrictMVar m (TxDecision txid tx)
chann'' = StrictMVar m (TxDecision txid tx)
-> Maybe (StrictMVar m (TxDecision txid tx))
-> StrictMVar m (TxDecision txid tx)
forall a. a -> Maybe a -> a
fromMaybe StrictMVar m (TxDecision txid tx)
chann Maybe (StrictMVar m (TxDecision txid tx))
mbChann
in (StrictMVar m (TxDecision txid tx)
chann'', StrictMVar m (TxDecision txid tx)
-> Maybe (StrictMVar m (TxDecision txid tx))
forall a. a -> Maybe a
Just StrictMVar m (TxDecision txid tx)
chann''))
peeraddr
txChannelMap
return
( TxChannels { txChannelMap = txChannelMap' }
, PeerTxAPI { readTxDecision = takeMVar chann',
handleReceivedTxIds,
handleReceivedTxs,
submitTxToMempool }
)
atomically $ modifyTVar sharedStateVar registerPeer
return peerTxAPI
)
(\PeerTxAPI m txid tx
_ -> m () -> m ()
forall a. m a -> m a
forall (m :: * -> *) a. MonadMask m => m a -> m a
uninterruptibleMask_ do
STM m () -> m ()
forall a. HasCallStack => STM m a -> m a
forall (m :: * -> *) a.
(MonadSTM m, HasCallStack) =>
STM m a -> m a
atomically (STM m () -> m ()) -> STM m () -> m ()
forall a b. (a -> b) -> a -> b
$ SharedTxStateVar m peeraddr txid tx
-> (SharedTxState peeraddr txid tx
-> SharedTxState peeraddr txid tx)
-> STM m ()
forall (m :: * -> *) a.
MonadSTM m =>
StrictTVar m a -> (a -> a) -> STM m ()
modifyTVar SharedTxStateVar m peeraddr txid tx
sharedStateVar SharedTxState peeraddr txid tx -> SharedTxState peeraddr txid tx
unregisterPeer
TxChannelsVar m peeraddr txid tx
-> (TxChannels m peeraddr txid tx
-> m (TxChannels m peeraddr txid tx))
-> m ()
forall (m :: * -> *) a.
MonadMVar m =>
StrictMVar m a -> (a -> m a) -> m ()
modifyMVar_ TxChannelsVar m peeraddr txid tx
channelsVar
\ TxChannels { Map peeraddr (StrictMVar m (TxDecision txid tx))
txChannelMap :: forall (m :: * -> *) peeraddr txid tx.
TxChannels m peeraddr txid tx
-> Map peeraddr (StrictMVar m (TxDecision txid tx))
txChannelMap :: Map peeraddr (StrictMVar m (TxDecision txid tx))
txChannelMap } ->
TxChannels m peeraddr txid tx -> m (TxChannels m peeraddr txid tx)
forall a. a -> m a
forall (m :: * -> *) a. Monad m => a -> m a
return TxChannels { txChannelMap :: Map peeraddr (StrictMVar m (TxDecision txid tx))
txChannelMap = peeraddr
-> Map peeraddr (StrictMVar m (TxDecision txid tx))
-> Map peeraddr (StrictMVar m (TxDecision txid tx))
forall k a. Ord k => k -> Map k a -> Map k a
Map.delete peeraddr
peeraddr Map peeraddr (StrictMVar m (TxDecision txid tx))
txChannelMap }
)
PeerTxAPI m txid tx -> m a
io
where
registerPeer :: SharedTxState peeraddr txid tx
-> SharedTxState peeraddr txid tx
registerPeer :: SharedTxState peeraddr txid tx -> SharedTxState peeraddr txid tx
registerPeer st :: SharedTxState peeraddr txid tx
st@SharedTxState { Map peeraddr (PeerTxState txid tx)
peerTxStates :: Map peeraddr (PeerTxState txid tx)
peerTxStates :: forall peeraddr txid tx.
SharedTxState peeraddr txid tx
-> Map peeraddr (PeerTxState txid tx)
peerTxStates } =
SharedTxState peeraddr txid tx
st { peerTxStates =
Map.insert
peeraddr
PeerTxState {
availableTxIds = Map.empty,
requestedTxIdsInflight = 0,
requestedTxsInflightSize = 0,
requestedTxsInflight = Set.empty,
unacknowledgedTxIds = StrictSeq.empty,
unknownTxs = Set.empty,
score = 0,
scoreTs = Time 0,
downloadedTxs = Map.empty,
toMempoolTxs = Map.empty }
peerTxStates
}
unregisterPeer :: SharedTxState peeraddr txid tx
-> SharedTxState peeraddr txid tx
unregisterPeer :: SharedTxState peeraddr txid tx -> SharedTxState peeraddr txid tx
unregisterPeer st :: SharedTxState peeraddr txid tx
st@SharedTxState { Map peeraddr (PeerTxState txid tx)
peerTxStates :: forall peeraddr txid tx.
SharedTxState peeraddr txid tx
-> Map peeraddr (PeerTxState txid tx)
peerTxStates :: Map peeraddr (PeerTxState txid tx)
peerTxStates,
Map txid (Maybe tx)
bufferedTxs :: Map txid (Maybe tx)
bufferedTxs :: forall peeraddr txid tx.
SharedTxState peeraddr txid tx -> Map txid (Maybe tx)
bufferedTxs,
Map txid Int
referenceCounts :: Map txid Int
referenceCounts :: forall peeraddr txid tx.
SharedTxState peeraddr txid tx -> Map txid Int
referenceCounts,
Map txid Int
inflightTxs :: Map txid Int
inflightTxs :: forall peeraddr txid tx.
SharedTxState peeraddr txid tx -> Map txid Int
inflightTxs,
SizeInBytes
inflightTxsSize :: SizeInBytes
inflightTxsSize :: forall peeraddr txid tx.
SharedTxState peeraddr txid tx -> SizeInBytes
inflightTxsSize,
Map txid Int
inSubmissionToMempoolTxs :: Map txid Int
inSubmissionToMempoolTxs :: forall peeraddr txid tx.
SharedTxState peeraddr txid tx -> Map txid Int
inSubmissionToMempoolTxs } =
SharedTxState peeraddr txid tx
st { peerTxStates = peerTxStates',
bufferedTxs = bufferedTxs',
referenceCounts = referenceCounts',
inflightTxs = inflightTxs',
inflightTxsSize = inflightTxsSize',
inSubmissionToMempoolTxs = inSubmissionToMempoolTxs' }
where
(PeerTxState { StrictSeq txid
unacknowledgedTxIds :: forall txid tx. PeerTxState txid tx -> StrictSeq txid
unacknowledgedTxIds :: StrictSeq txid
unacknowledgedTxIds,
Set txid
requestedTxsInflight :: forall txid tx. PeerTxState txid tx -> Set txid
requestedTxsInflight :: Set txid
requestedTxsInflight,
SizeInBytes
requestedTxsInflightSize :: forall txid tx. PeerTxState txid tx -> SizeInBytes
requestedTxsInflightSize :: SizeInBytes
requestedTxsInflightSize,
Map txid tx
toMempoolTxs :: forall txid tx. PeerTxState txid tx -> Map txid tx
toMempoolTxs :: Map txid tx
toMempoolTxs }
, Map peeraddr (PeerTxState txid tx)
peerTxStates')
=
(Maybe (PeerTxState txid tx)
-> (PeerTxState txid tx, Maybe (PeerTxState txid tx)))
-> peeraddr
-> Map peeraddr (PeerTxState txid tx)
-> (PeerTxState txid tx, Map peeraddr (PeerTxState txid tx))
forall (f :: * -> *) k a.
(Functor f, Ord k) =>
(Maybe a -> f (Maybe a)) -> k -> Map k a -> f (Map k a)
Map.alterF
(\case
Maybe (PeerTxState txid tx)
Nothing -> [Char] -> (PeerTxState txid tx, Maybe (PeerTxState txid tx))
forall a. HasCallStack => [Char] -> a
error ([Char]
"TxSubmission.withPeer: invariant violation for peer " [Char] -> [Char] -> [Char]
forall a. [a] -> [a] -> [a]
++ peeraddr -> [Char]
forall a. Show a => a -> [Char]
show peeraddr
peeraddr)
Just PeerTxState txid tx
a -> (PeerTxState txid tx
a, Maybe (PeerTxState txid tx)
forall a. Maybe a
Nothing))
peeraddr
peeraddr
Map peeraddr (PeerTxState txid tx)
peerTxStates
referenceCounts' :: Map txid Int
referenceCounts' =
(Map txid Int -> txid -> Map txid Int)
-> Map txid Int -> StrictSeq txid -> Map txid Int
forall b a. (b -> a -> b) -> b -> StrictSeq a -> b
forall (t :: * -> *) b a.
Foldable t =>
(b -> a -> b) -> b -> t a -> b
Foldable.foldl'
((txid -> Map txid Int -> Map txid Int)
-> Map txid Int -> txid -> Map txid Int
forall a b c. (a -> b -> c) -> b -> a -> c
flip ((txid -> Map txid Int -> Map txid Int)
-> Map txid Int -> txid -> Map txid Int)
-> (txid -> Map txid Int -> Map txid Int)
-> Map txid Int
-> txid
-> Map txid Int
forall a b. (a -> b) -> a -> b
$ (Int -> Maybe Int) -> txid -> Map txid Int -> Map txid Int
forall k a. Ord k => (a -> Maybe a) -> k -> Map k a -> Map k a
Map.update \Int
cnt ->
if Int
cnt Int -> Int -> Bool
forall a. Ord a => a -> a -> Bool
> Int
1
then Int -> Maybe Int
forall a. a -> Maybe a
Just (Int -> Maybe Int) -> Int -> Maybe Int
forall a b. (a -> b) -> a -> b
$! Int -> Int
forall a. Enum a => a -> a
pred Int
cnt
else Maybe Int
forall a. Maybe a
Nothing
)
Map txid Int
referenceCounts
StrictSeq txid
unacknowledgedTxIds
liveSet :: Set txid
liveSet = Map txid Int -> Set txid
forall k a. Map k a -> Set k
Map.keysSet Map txid Int
referenceCounts'
bufferedTxs' :: Map txid (Maybe tx)
bufferedTxs' = Map txid (Maybe tx)
bufferedTxs
Map txid (Maybe tx) -> Set txid -> Map txid (Maybe tx)
forall k a. Ord k => Map k a -> Set k -> Map k a
`Map.restrictKeys`
Set txid
liveSet
inflightTxs' :: Map txid Int
inflightTxs' = (Map txid Int -> txid -> Map txid Int)
-> Map txid Int -> Set txid -> Map txid Int
forall b a. (b -> a -> b) -> b -> Set a -> b
forall (t :: * -> *) b a.
Foldable t =>
(b -> a -> b) -> b -> t a -> b
Foldable.foldl' Map txid Int -> txid -> Map txid Int
forall {k} {a}.
(Ord k, Ord a, Num a, Enum a) =>
Map k a -> k -> Map k a
purgeInflightTxs Map txid Int
inflightTxs Set txid
requestedTxsInflight
inflightTxsSize' :: SizeInBytes
inflightTxsSize' = SizeInBytes
inflightTxsSize SizeInBytes -> SizeInBytes -> SizeInBytes
forall a. Num a => a -> a -> a
- SizeInBytes
requestedTxsInflightSize
inSubmissionToMempoolTxs' :: Map txid Int
inSubmissionToMempoolTxs' =
(Map txid Int -> txid -> Map txid Int)
-> Map txid Int -> Set txid -> Map txid Int
forall b a. (b -> a -> b) -> b -> Set a -> b
forall (t :: * -> *) b a.
Foldable t =>
(b -> a -> b) -> b -> t a -> b
Foldable.foldl' ((txid -> Map txid Int -> Map txid Int)
-> Map txid Int -> txid -> Map txid Int
forall a b c. (a -> b -> c) -> b -> a -> c
flip ((txid -> Map txid Int -> Map txid Int)
-> Map txid Int -> txid -> Map txid Int)
-> (txid -> Map txid Int -> Map txid Int)
-> Map txid Int
-> txid
-> Map txid Int
forall a b. (a -> b) -> a -> b
$ (Int -> Maybe Int) -> txid -> Map txid Int -> Map txid Int
forall k a. Ord k => (a -> Maybe a) -> k -> Map k a -> Map k a
Map.update \Int
cnt ->
if Int
cnt Int -> Int -> Bool
forall a. Ord a => a -> a -> Bool
> Int
1
then Int -> Maybe Int
forall a. a -> Maybe a
Just (Int -> Maybe Int) -> Int -> Maybe Int
forall a b. (a -> b) -> a -> b
$! Int -> Int
forall a. Enum a => a -> a
pred Int
cnt
else Maybe Int
forall a. Maybe a
Nothing
)
Map txid Int
inSubmissionToMempoolTxs
(Map txid tx -> Set txid
forall k a. Map k a -> Set k
Map.keysSet Map txid tx
toMempoolTxs)
purgeInflightTxs :: Map k a -> k -> Map k a
purgeInflightTxs Map k a
m k
txid = (Maybe a -> Maybe a) -> k -> Map k a -> Map k a
forall k a.
Ord k =>
(Maybe a -> Maybe a) -> k -> Map k a -> Map k a
Map.alter Maybe a -> Maybe a
forall {a}. (Ord a, Num a, Enum a) => Maybe a -> Maybe a
fn k
txid Map k a
m
where
fn :: Maybe a -> Maybe a
fn (Just a
n) | a
n a -> a -> Bool
forall a. Ord a => a -> a -> Bool
> a
1 = a -> Maybe a
forall a. a -> Maybe a
Just (a -> Maybe a) -> a -> Maybe a
forall a b. (a -> b) -> a -> b
$! a -> a
forall a. Enum a => a -> a
pred a
n
fn Maybe a
_ = Maybe a
forall a. Maybe a
Nothing
submitTxToMempool :: Tracer m (TraceTxSubmissionInbound txid tx) -> txid -> tx -> m ()
submitTxToMempool :: Tracer m (TraceTxSubmissionInbound txid tx) -> txid -> tx -> m ()
submitTxToMempool Tracer m (TraceTxSubmissionInbound txid tx)
txTracer txid
txid tx
tx =
m () -> m () -> m () -> m ()
forall a b c. m a -> m b -> m c -> m c
forall (m :: * -> *) a b c.
MonadThrow m =>
m a -> m b -> m c -> m c
bracket_ (STM m () -> m ()
forall a. HasCallStack => STM m a -> m a
forall (m :: * -> *) a.
(MonadSTM m, HasCallStack) =>
STM m a -> m a
atomically (STM m () -> m ()) -> STM m () -> m ()
forall a b. (a -> b) -> a -> b
$ TSem m -> STM m ()
forall (m :: * -> *). MonadSTM m => TSem m -> STM m ()
waitTSem TSem m
mempoolSem)
(STM m () -> m ()
forall a. HasCallStack => STM m a -> m a
forall (m :: * -> *) a.
(MonadSTM m, HasCallStack) =>
STM m a -> m a
atomically (STM m () -> m ()) -> STM m () -> m ()
forall a b. (a -> b) -> a -> b
$ TSem m -> STM m ()
forall (m :: * -> *). MonadSTM m => TSem m -> STM m ()
signalTSem TSem m
mempoolSem)
(m () -> m ()) -> m () -> m ()
forall a b. (a -> b) -> a -> b
$ do
start <- m Time
forall (m :: * -> *). MonadMonotonicTime m => m Time
getMonotonicTime
res <- addTx
end <- getMonotonicTime
atomically $ modifyTVar sharedStateVar (updateBufferedTx end res)
let duration = Time
end Time -> Time -> DiffTime
`diffTime` Time
start
case res of
TxMempoolResult
TxAccepted -> Tracer m (TraceTxSubmissionInbound txid tx)
-> TraceTxSubmissionInbound txid tx -> m ()
forall (m :: * -> *) a. Tracer m a -> a -> m ()
traceWith Tracer m (TraceTxSubmissionInbound txid tx)
txTracer ([txid] -> DiffTime -> TraceTxSubmissionInbound txid tx
forall txid tx.
[txid] -> DiffTime -> TraceTxSubmissionInbound txid tx
TraceTxInboundAddedToMempool [txid
txid] DiffTime
duration)
TxMempoolResult
TxRejected -> Tracer m (TraceTxSubmissionInbound txid tx)
-> TraceTxSubmissionInbound txid tx -> m ()
forall (m :: * -> *) a. Tracer m a -> a -> m ()
traceWith Tracer m (TraceTxSubmissionInbound txid tx)
txTracer ([txid] -> DiffTime -> TraceTxSubmissionInbound txid tx
forall txid tx.
[txid] -> DiffTime -> TraceTxSubmissionInbound txid tx
TraceTxInboundRejectedFromMempool [txid
txid] DiffTime
duration)
where
addTx :: m TxMempoolResult
addTx :: m TxMempoolResult
addTx = do
mpSnapshot <- STM m (MempoolSnapshot txid tx idx)
-> m (MempoolSnapshot txid tx idx)
forall a. HasCallStack => STM m a -> m a
forall (m :: * -> *) a.
(MonadSTM m, HasCallStack) =>
STM m a -> m a
atomically STM m (MempoolSnapshot txid tx idx)
mempoolGetSnapshot
if mempoolHasTx mpSnapshot txid
then do
!now <- getMonotonicTime
!s <- countRejectedTxs now 1
traceWith txTracer $ TraceTxSubmissionProcessed ProcessedTxCount {
ptxcAccepted = 0
, ptxcRejected = 1
, ptxcScore = s
}
return TxRejected
else do
acceptedTxs <- mempoolAddTxs [tx]
end <- getMonotonicTime
if null acceptedTxs
then do
!s <- countRejectedTxs end 1
traceWith txTracer $ TraceTxSubmissionProcessed ProcessedTxCount {
ptxcAccepted = 0
, ptxcRejected = 1
, ptxcScore = s
}
return TxRejected
else do
!s <- countRejectedTxs end 0
traceWith txTracer $ TraceTxSubmissionProcessed ProcessedTxCount {
ptxcAccepted = 1
, ptxcRejected = 0
, ptxcScore = s
}
return TxAccepted
updateBufferedTx :: Time
-> TxMempoolResult
-> SharedTxState peeraddr txid tx
-> SharedTxState peeraddr txid tx
updateBufferedTx :: Time
-> TxMempoolResult
-> SharedTxState peeraddr txid tx
-> SharedTxState peeraddr txid tx
updateBufferedTx Time
_ TxMempoolResult
TxRejected st :: SharedTxState peeraddr txid tx
st@SharedTxState { Map peeraddr (PeerTxState txid tx)
peerTxStates :: forall peeraddr txid tx.
SharedTxState peeraddr txid tx
-> Map peeraddr (PeerTxState txid tx)
peerTxStates :: Map peeraddr (PeerTxState txid tx)
peerTxStates
, Map txid Int
inSubmissionToMempoolTxs :: forall peeraddr txid tx.
SharedTxState peeraddr txid tx -> Map txid Int
inSubmissionToMempoolTxs :: Map txid Int
inSubmissionToMempoolTxs } =
SharedTxState peeraddr txid tx
st { peerTxStates = peerTxStates'
, inSubmissionToMempoolTxs = inSubmissionToMempoolTxs' }
where
inSubmissionToMempoolTxs' :: Map txid Int
inSubmissionToMempoolTxs' =
(Int -> Maybe Int) -> txid -> Map txid Int -> Map txid Int
forall k a. Ord k => (a -> Maybe a) -> k -> Map k a -> Map k a
Map.update (\case Int
1 -> Maybe Int
forall a. Maybe a
Nothing; Int
n -> Int -> Maybe Int
forall a. a -> Maybe a
Just (Int -> Maybe Int) -> Int -> Maybe Int
forall a b. (a -> b) -> a -> b
$! Int -> Int
forall a. Enum a => a -> a
pred Int
n)
txid
txid Map txid Int
inSubmissionToMempoolTxs
peerTxStates' :: Map peeraddr (PeerTxState txid tx)
peerTxStates' = (PeerTxState txid tx -> Maybe (PeerTxState txid tx))
-> peeraddr
-> Map peeraddr (PeerTxState txid tx)
-> Map peeraddr (PeerTxState txid tx)
forall k a. Ord k => (a -> Maybe a) -> k -> Map k a -> Map k a
Map.update PeerTxState txid tx -> Maybe (PeerTxState txid tx)
forall {tx}. PeerTxState txid tx -> Maybe (PeerTxState txid tx)
fn peeraddr
peeraddr Map peeraddr (PeerTxState txid tx)
peerTxStates
where
fn :: PeerTxState txid tx -> Maybe (PeerTxState txid tx)
fn PeerTxState txid tx
ps = PeerTxState txid tx -> Maybe (PeerTxState txid tx)
forall a. a -> Maybe a
Just (PeerTxState txid tx -> Maybe (PeerTxState txid tx))
-> PeerTxState txid tx -> Maybe (PeerTxState txid tx)
forall a b. (a -> b) -> a -> b
$! PeerTxState txid tx
ps { toMempoolTxs = Map.delete txid (toMempoolTxs ps)}
updateBufferedTx Time
now TxMempoolResult
TxAccepted
st :: SharedTxState peeraddr txid tx
st@SharedTxState { Map peeraddr (PeerTxState txid tx)
peerTxStates :: forall peeraddr txid tx.
SharedTxState peeraddr txid tx
-> Map peeraddr (PeerTxState txid tx)
peerTxStates :: Map peeraddr (PeerTxState txid tx)
peerTxStates
, Map txid (Maybe tx)
bufferedTxs :: forall peeraddr txid tx.
SharedTxState peeraddr txid tx -> Map txid (Maybe tx)
bufferedTxs :: Map txid (Maybe tx)
bufferedTxs
, Map txid Int
referenceCounts :: forall peeraddr txid tx.
SharedTxState peeraddr txid tx -> Map txid Int
referenceCounts :: Map txid Int
referenceCounts
, Map Time [txid]
timedTxs :: Map Time [txid]
timedTxs :: forall peeraddr txid tx.
SharedTxState peeraddr txid tx -> Map Time [txid]
timedTxs
, Map txid Int
inSubmissionToMempoolTxs :: forall peeraddr txid tx.
SharedTxState peeraddr txid tx -> Map txid Int
inSubmissionToMempoolTxs :: Map txid Int
inSubmissionToMempoolTxs } =
SharedTxState peeraddr txid tx
st { peerTxStates = peerTxStates'
, bufferedTxs = bufferedTxs'
, timedTxs = timedTxs'
, referenceCounts = referenceCounts'
, inSubmissionToMempoolTxs = inSubmissionToMempoolTxs'
}
where
inSubmissionToMempoolTxs' :: Map txid Int
inSubmissionToMempoolTxs' =
(Int -> Maybe Int) -> txid -> Map txid Int -> Map txid Int
forall k a. Ord k => (a -> Maybe a) -> k -> Map k a -> Map k a
Map.update (\case Int
1 -> Maybe Int
forall a. Maybe a
Nothing; Int
n -> Int -> Maybe Int
forall a. a -> Maybe a
Just (Int -> Maybe Int) -> Int -> Maybe Int
forall a b. (a -> b) -> a -> b
$! Int -> Int
forall a. Enum a => a -> a
pred Int
n)
txid
txid Map txid Int
inSubmissionToMempoolTxs
timedTxs' :: Map Time [txid]
timedTxs' = (Maybe [txid] -> Maybe [txid])
-> Time -> Map Time [txid] -> Map Time [txid]
forall k a.
Ord k =>
(Maybe a -> Maybe a) -> k -> Map k a -> Map k a
Map.alter Maybe [txid] -> Maybe [txid]
fn (DiffTime -> Time -> Time
addTime DiffTime
bufferedTxsMinLifetime Time
now) Map Time [txid]
timedTxs
where
fn :: Maybe [txid] -> Maybe [txid]
fn :: Maybe [txid] -> Maybe [txid]
fn Maybe [txid]
Nothing = [txid] -> Maybe [txid]
forall a. a -> Maybe a
Just [txid
txid]
fn (Just [txid]
txids) = [txid] -> Maybe [txid]
forall a. a -> Maybe a
Just ([txid] -> Maybe [txid]) -> [txid] -> Maybe [txid]
forall a b. (a -> b) -> a -> b
$! (txid
txidtxid -> [txid] -> [txid]
forall a. a -> [a] -> [a]
:[txid]
txids)
referenceCounts' :: Map txid Int
referenceCounts' = (Maybe Int -> Maybe Int) -> txid -> Map txid Int -> Map txid Int
forall k a.
Ord k =>
(Maybe a -> Maybe a) -> k -> Map k a -> Map k a
Map.alter Maybe Int -> Maybe Int
fn txid
txid Map txid Int
referenceCounts
where
fn :: Maybe Int -> Maybe Int
fn :: Maybe Int -> Maybe Int
fn Maybe Int
Nothing = Int -> Maybe Int
forall a. a -> Maybe a
Just Int
1
fn (Just Int
n) = Int -> Maybe Int
forall a. a -> Maybe a
Just (Int -> Maybe Int) -> Int -> Maybe Int
forall a b. (a -> b) -> a -> b
$! Int -> Int
forall a. Enum a => a -> a
succ Int
n
bufferedTxs' :: Map txid (Maybe tx)
bufferedTxs' = txid -> Maybe tx -> Map txid (Maybe tx) -> Map txid (Maybe tx)
forall k a. Ord k => k -> a -> Map k a -> Map k a
Map.insert txid
txid (tx -> Maybe tx
forall a. a -> Maybe a
Just tx
tx) Map txid (Maybe tx)
bufferedTxs
peerTxStates' :: Map peeraddr (PeerTxState txid tx)
peerTxStates' = (PeerTxState txid tx -> Maybe (PeerTxState txid tx))
-> peeraddr
-> Map peeraddr (PeerTxState txid tx)
-> Map peeraddr (PeerTxState txid tx)
forall k a. Ord k => (a -> Maybe a) -> k -> Map k a -> Map k a
Map.update PeerTxState txid tx -> Maybe (PeerTxState txid tx)
forall {tx}. PeerTxState txid tx -> Maybe (PeerTxState txid tx)
fn peeraddr
peeraddr Map peeraddr (PeerTxState txid tx)
peerTxStates
where
fn :: PeerTxState txid tx -> Maybe (PeerTxState txid tx)
fn PeerTxState txid tx
ps = PeerTxState txid tx -> Maybe (PeerTxState txid tx)
forall a. a -> Maybe a
Just (PeerTxState txid tx -> Maybe (PeerTxState txid tx))
-> PeerTxState txid tx -> Maybe (PeerTxState txid tx)
forall a b. (a -> b) -> a -> b
$! PeerTxState txid tx
ps { toMempoolTxs = Map.delete txid (toMempoolTxs ps)}
handleReceivedTxIds :: NumTxIdsToReq
-> StrictSeq txid
-> Map txid SizeInBytes
-> m ()
handleReceivedTxIds :: NumTxIdsToReq -> StrictSeq txid -> Map txid SizeInBytes -> m ()
handleReceivedTxIds NumTxIdsToReq
numTxIdsToReq StrictSeq txid
txidsSeq Map txid SizeInBytes
txidsMap =
Tracer m (TraceTxLogic peeraddr txid tx)
-> SharedTxStateVar m peeraddr txid tx
-> STM m (MempoolSnapshot txid tx idx)
-> peeraddr
-> NumTxIdsToReq
-> StrictSeq txid
-> Map txid SizeInBytes
-> m ()
forall (m :: * -> *) peeraddr idx tx txid.
(MonadSTM m, Ord txid, Ord peeraddr) =>
Tracer m (TraceTxLogic peeraddr txid tx)
-> SharedTxStateVar m peeraddr txid tx
-> STM m (MempoolSnapshot txid tx idx)
-> peeraddr
-> NumTxIdsToReq
-> StrictSeq txid
-> Map txid SizeInBytes
-> m ()
receivedTxIds Tracer m (TraceTxLogic peeraddr txid tx)
tracer
SharedTxStateVar m peeraddr txid tx
sharedStateVar
STM m (MempoolSnapshot txid tx idx)
mempoolGetSnapshot
peeraddr
peeraddr
NumTxIdsToReq
numTxIdsToReq
StrictSeq txid
txidsSeq
Map txid SizeInBytes
txidsMap
handleReceivedTxs :: Map txid SizeInBytes
-> Map txid tx
-> m (Maybe TxSubmissionProtocolError)
handleReceivedTxs :: Map txid SizeInBytes
-> Map txid tx -> m (Maybe TxSubmissionProtocolError)
handleReceivedTxs Map txid SizeInBytes
txids Map txid tx
txs =
Tracer m (TraceTxLogic peeraddr txid tx)
-> (tx -> SizeInBytes)
-> SharedTxStateVar m peeraddr txid tx
-> peeraddr
-> Map txid SizeInBytes
-> Map txid tx
-> m (Maybe TxSubmissionProtocolError)
forall (m :: * -> *) peeraddr tx txid.
(MonadSTM m, Ord txid, Ord peeraddr, Show txid, Typeable txid) =>
Tracer m (TraceTxLogic peeraddr txid tx)
-> (tx -> SizeInBytes)
-> SharedTxStateVar m peeraddr txid tx
-> peeraddr
-> Map txid SizeInBytes
-> Map txid tx
-> m (Maybe TxSubmissionProtocolError)
collectTxs Tracer m (TraceTxLogic peeraddr txid tx)
tracer tx -> SizeInBytes
txSize SharedTxStateVar m peeraddr txid tx
sharedStateVar peeraddr
peeraddr Map txid SizeInBytes
txids Map txid tx
txs
countRejectedTxs :: Time
-> Double
-> m Double
countRejectedTxs :: Time -> Double -> m Double
countRejectedTxs Time
_ Double
n | Double
n Double -> Double -> Bool
forall a. Ord a => a -> a -> Bool
< Double
0 =
[Char] -> m Double
forall a. HasCallStack => [Char] -> a
error ([Char]
"TxSubmission.countRejectedTxs: invariant violation for peer " [Char] -> [Char] -> [Char]
forall a. [a] -> [a] -> [a]
++ peeraddr -> [Char]
forall a. Show a => a -> [Char]
show peeraddr
peeraddr)
countRejectedTxs Time
now Double
n = STM m Double -> m Double
forall a. HasCallStack => STM m a -> m a
forall (m :: * -> *) a.
(MonadSTM m, HasCallStack) =>
STM m a -> m a
atomically (STM m Double -> m Double) -> STM m Double -> m Double
forall a b. (a -> b) -> a -> b
$ SharedTxStateVar m peeraddr txid tx
-> (SharedTxState peeraddr txid tx
-> (Double, SharedTxState peeraddr txid tx))
-> STM m Double
forall (m :: * -> *) s a.
MonadSTM m =>
StrictTVar m s -> (s -> (a, s)) -> STM m a
stateTVar SharedTxStateVar m peeraddr txid tx
sharedStateVar ((SharedTxState peeraddr txid tx
-> (Double, SharedTxState peeraddr txid tx))
-> STM m Double)
-> (SharedTxState peeraddr txid tx
-> (Double, SharedTxState peeraddr txid tx))
-> STM m Double
forall a b. (a -> b) -> a -> b
$ \SharedTxState peeraddr txid tx
st ->
let (Double
result, Map peeraddr (PeerTxState txid tx)
peerTxStates') = (Maybe (PeerTxState txid tx)
-> (Double, Maybe (PeerTxState txid tx)))
-> peeraddr
-> Map peeraddr (PeerTxState txid tx)
-> (Double, Map peeraddr (PeerTxState txid tx))
forall (f :: * -> *) k a.
(Functor f, Ord k) =>
(Maybe a -> f (Maybe a)) -> k -> Map k a -> f (Map k a)
Map.alterF Maybe (PeerTxState txid tx)
-> (Double, Maybe (PeerTxState txid tx))
fn peeraddr
peeraddr (SharedTxState peeraddr txid tx
-> Map peeraddr (PeerTxState txid tx)
forall peeraddr txid tx.
SharedTxState peeraddr txid tx
-> Map peeraddr (PeerTxState txid tx)
peerTxStates SharedTxState peeraddr txid tx
st)
in (Double
result, SharedTxState peeraddr txid tx
st { peerTxStates = peerTxStates' })
where
fn :: Maybe (PeerTxState txid tx) -> (Double, Maybe (PeerTxState txid tx))
fn :: Maybe (PeerTxState txid tx)
-> (Double, Maybe (PeerTxState txid tx))
fn Maybe (PeerTxState txid tx)
Nothing = [Char] -> (Double, Maybe (PeerTxState txid tx))
forall a. HasCallStack => [Char] -> a
error ([Char]
"TxSubmission.withPeer: invariant violation for peer " [Char] -> [Char] -> [Char]
forall a. [a] -> [a] -> [a]
++ peeraddr -> [Char]
forall a. Show a => a -> [Char]
show peeraddr
peeraddr)
fn (Just PeerTxState txid tx
ps) = (PeerTxState txid tx -> Double
forall txid tx. PeerTxState txid tx -> Double
score PeerTxState txid tx
ps', PeerTxState txid tx -> Maybe (PeerTxState txid tx)
forall a. a -> Maybe a
Just (PeerTxState txid tx -> Maybe (PeerTxState txid tx))
-> PeerTxState txid tx -> Maybe (PeerTxState txid tx)
forall a b. (a -> b) -> a -> b
$! PeerTxState txid tx
ps')
where
ps' :: PeerTxState txid tx
ps' = TxDecisionPolicy
-> Time -> Double -> PeerTxState txid tx -> PeerTxState txid tx
forall txid tx.
TxDecisionPolicy
-> Time -> Double -> PeerTxState txid tx -> PeerTxState txid tx
updateRejects TxDecisionPolicy
policy Time
now Double
n PeerTxState txid tx
ps
updateRejects :: TxDecisionPolicy
-> Time
-> Double
-> PeerTxState txid tx
-> PeerTxState txid tx
updateRejects :: forall txid tx.
TxDecisionPolicy
-> Time -> Double -> PeerTxState txid tx -> PeerTxState txid tx
updateRejects TxDecisionPolicy
_ Time
now Double
0 PeerTxState txid tx
pts | PeerTxState txid tx -> Double
forall txid tx. PeerTxState txid tx -> Double
score PeerTxState txid tx
pts Double -> Double -> Bool
forall a. Eq a => a -> a -> Bool
== Double
0 = PeerTxState txid tx
pts {scoreTs = now}
updateRejects TxDecisionPolicy { Double
scoreRate :: Double
scoreRate :: TxDecisionPolicy -> Double
scoreRate, Double
scoreMax :: Double
scoreMax :: TxDecisionPolicy -> Double
scoreMax } Time
now Double
n
pts :: PeerTxState txid tx
pts@PeerTxState { Double
score :: forall txid tx. PeerTxState txid tx -> Double
score :: Double
score, Time
scoreTs :: forall txid tx. PeerTxState txid tx -> Time
scoreTs :: Time
scoreTs } =
let duration :: DiffTime
duration = Time -> Time -> DiffTime
diffTime Time
now Time
scoreTs
!drain :: Double
drain = DiffTime -> Double
forall a b. (Real a, Fractional b) => a -> b
realToFrac DiffTime
duration Double -> Double -> Double
forall a. Num a => a -> a -> a
* Double
scoreRate
!drained :: Double
drained = Double -> Double -> Double
forall a. Ord a => a -> a -> a
max Double
0 (Double -> Double) -> Double -> Double
forall a b. (a -> b) -> a -> b
$ Double
score Double -> Double -> Double
forall a. Num a => a -> a -> a
- Double
drain in
PeerTxState txid tx
pts { score = min scoreMax $ drained + n
, scoreTs = now
}
drainRejectionThread
:: forall m peeraddr txid tx.
( MonadDelay m
, MonadSTM m
, MonadThread m
, Ord txid
)
=> Tracer m (TraceTxLogic peeraddr txid tx)
-> TxDecisionPolicy
-> SharedTxStateVar m peeraddr txid tx
-> m Void
drainRejectionThread :: forall (m :: * -> *) peeraddr txid tx.
(MonadDelay m, MonadSTM m, MonadThread m, Ord txid) =>
Tracer m (TraceTxLogic peeraddr txid tx)
-> TxDecisionPolicy
-> SharedTxStateVar m peeraddr txid tx
-> m Void
drainRejectionThread Tracer m (TraceTxLogic peeraddr txid tx)
tracer TxDecisionPolicy
policy SharedTxStateVar m peeraddr txid tx
sharedStateVar = do
[Char] -> m ()
forall (m :: * -> *). MonadThread m => [Char] -> m ()
labelThisThread [Char]
"tx-rejection-drain"
now <- m Time
forall (m :: * -> *). MonadMonotonicTime m => m Time
getMonotonicTime
go $ addTime drainInterval now
where
drainInterval :: DiffTime
drainInterval :: DiffTime
drainInterval = DiffTime
7
go :: Time -> m Void
go :: Time -> m Void
go !Time
nextDrain = do
DiffTime -> m ()
forall (m :: * -> *). MonadDelay m => DiffTime -> m ()
threadDelay DiffTime
1
!now <- m Time
forall (m :: * -> *). MonadMonotonicTime m => m Time
getMonotonicTime
st'' <- atomically $ do
st <- readTVar sharedStateVar
let ptss = if Time
now Time -> Time -> Bool
forall a. Ord a => a -> a -> Bool
> Time
nextDrain then (PeerTxState txid tx -> PeerTxState txid tx)
-> Map peeraddr (PeerTxState txid tx)
-> Map peeraddr (PeerTxState txid tx)
forall a b k. (a -> b) -> Map k a -> Map k b
Map.map (TxDecisionPolicy
-> Time -> Double -> PeerTxState txid tx -> PeerTxState txid tx
forall txid tx.
TxDecisionPolicy
-> Time -> Double -> PeerTxState txid tx -> PeerTxState txid tx
updateRejects TxDecisionPolicy
policy Time
now Double
0) (SharedTxState peeraddr txid tx
-> Map peeraddr (PeerTxState txid tx)
forall peeraddr txid tx.
SharedTxState peeraddr txid tx
-> Map peeraddr (PeerTxState txid tx)
peerTxStates SharedTxState peeraddr txid tx
st)
else SharedTxState peeraddr txid tx
-> Map peeraddr (PeerTxState txid tx)
forall peeraddr txid tx.
SharedTxState peeraddr txid tx
-> Map peeraddr (PeerTxState txid tx)
peerTxStates SharedTxState peeraddr txid tx
st
st' = Time
-> SharedTxState peeraddr txid tx -> SharedTxState peeraddr txid tx
forall peeraddr tx txid.
Ord txid =>
Time
-> SharedTxState peeraddr txid tx -> SharedTxState peeraddr txid tx
tickTimedTxs Time
now SharedTxState peeraddr txid tx
st
{ peerTxStates = ptss }
writeTVar sharedStateVar st'
return st'
traceWith tracer (TraceSharedTxState "drainRejectionThread" st'')
if now > nextDrain
then go $ addTime drainInterval now
else go nextDrain
decisionLogicThread
:: forall m peeraddr txid tx.
( MonadDelay m
, MonadMVar m
, MonadSTM m
, MonadMask m
, MonadFork m
, Ord peeraddr
, Ord txid
, Hashable peeraddr
)
=> Tracer m (TraceTxLogic peeraddr txid tx)
-> Tracer m TxSubmissionCounters
-> TxDecisionPolicy
-> TxChannelsVar m peeraddr txid tx
-> SharedTxStateVar m peeraddr txid tx
-> m Void
decisionLogicThread :: forall (m :: * -> *) peeraddr txid tx.
(MonadDelay m, MonadMVar m, MonadSTM m, MonadMask m, MonadFork m,
Ord peeraddr, Ord txid, Hashable peeraddr) =>
Tracer m (TraceTxLogic peeraddr txid tx)
-> Tracer m TxSubmissionCounters
-> TxDecisionPolicy
-> TxChannelsVar m peeraddr txid tx
-> SharedTxStateVar m peeraddr txid tx
-> m Void
decisionLogicThread Tracer m (TraceTxLogic peeraddr txid tx)
tracer Tracer m TxSubmissionCounters
counterTracer TxDecisionPolicy
policy TxChannelsVar m peeraddr txid tx
txChannelsVar SharedTxStateVar m peeraddr txid tx
sharedStateVar = do
[Char] -> m ()
forall (m :: * -> *). MonadThread m => [Char] -> m ()
labelThisThread [Char]
"tx-decision"
m Void
go
where
go :: m Void
go :: m Void
go = do
DiffTime -> m ()
forall (m :: * -> *). MonadDelay m => DiffTime -> m ()
threadDelay DiffTime
_DECISION_LOOP_DELAY
(decisions, st) <- STM
m
(Map peeraddr (TxDecision txid tx), SharedTxState peeraddr txid tx)
-> m (Map peeraddr (TxDecision txid tx),
SharedTxState peeraddr txid tx)
forall a. HasCallStack => STM m a -> m a
forall (m :: * -> *) a.
(MonadSTM m, HasCallStack) =>
STM m a -> m a
atomically do
sharedTxState <- SharedTxStateVar m peeraddr txid tx
-> STM m (SharedTxState peeraddr txid tx)
forall (m :: * -> *) a. MonadSTM m => StrictTVar m a -> STM m a
readTVar SharedTxStateVar m peeraddr txid tx
sharedStateVar
let activePeers = TxDecisionPolicy
-> SharedTxState peeraddr txid tx
-> Map peeraddr (PeerTxState txid tx)
forall peeraddr txid tx.
(Ord txid, HasCallStack) =>
TxDecisionPolicy
-> SharedTxState peeraddr txid tx
-> Map peeraddr (PeerTxState txid tx)
filterActivePeers TxDecisionPolicy
policy SharedTxState peeraddr txid tx
sharedTxState
check (not (Map.null activePeers))
let (sharedState, decisions) = makeDecisions policy sharedTxState activePeers
writeTVar sharedStateVar sharedState
return (decisions, sharedState)
traceWith tracer (TraceSharedTxState "decisionLogicThread" st)
traceWith tracer (TraceTxDecisions decisions)
TxChannels { txChannelMap } <- readMVar txChannelsVar
traverse_
(\(StrictMVar m (TxDecision txid tx)
mvar, TxDecision txid tx
d) -> StrictMVar m (TxDecision txid tx)
-> TxDecision txid tx
-> (TxDecision txid tx -> m (TxDecision txid tx))
-> m ()
forall a. StrictMVar m a -> a -> (a -> m a) -> m ()
modifyMVarWithDefault_ StrictMVar m (TxDecision txid tx)
mvar TxDecision txid tx
d (\TxDecision txid tx
d' -> TxDecision txid tx -> m (TxDecision txid tx)
forall a. a -> m a
forall (f :: * -> *) a. Applicative f => a -> f a
pure (TxDecision txid tx
d' TxDecision txid tx -> TxDecision txid tx -> TxDecision txid tx
forall a. Semigroup a => a -> a -> a
<> TxDecision txid tx
d)))
(Map.intersectionWith (,)
txChannelMap
decisions)
traceWith counterTracer (mkTxSubmissionCounters st)
go
modifyMVarWithDefault_ :: StrictMVar m a -> a -> (a -> m a) -> m ()
modifyMVarWithDefault_ :: forall a. StrictMVar m a -> a -> (a -> m a) -> m ()
modifyMVarWithDefault_ StrictMVar m a
m a
d a -> m a
io =
((forall a. m a -> m a) -> m ()) -> m ()
forall b. ((forall a. m a -> m a) -> m b) -> m b
forall (m :: * -> *) b.
MonadMask m =>
((forall a. m a -> m a) -> m b) -> m b
mask (((forall a. m a -> m a) -> m ()) -> m ())
-> ((forall a. m a -> m a) -> m ()) -> m ()
forall a b. (a -> b) -> a -> b
$ \forall a. m a -> m a
restore -> do
mbA <- StrictMVar m a -> m (Maybe a)
forall (m :: * -> *) a.
MonadMVar m =>
StrictMVar m a -> m (Maybe a)
tryTakeMVar StrictMVar m a
m
case mbA of
Just a
a -> do
a' <- m a -> m a
forall a. m a -> m a
restore (a -> m a
io a
a) m a -> m () -> m a
forall a b. m a -> m b -> m a
forall (m :: * -> *) a b. MonadCatch m => m a -> m b -> m a
`onException` StrictMVar m a -> a -> m ()
forall (m :: * -> *) a. MonadMVar m => StrictMVar m a -> a -> m ()
putMVar StrictMVar m a
m a
a
putMVar m a'
Maybe a
Nothing -> StrictMVar m a -> a -> m ()
forall (m :: * -> *) a. MonadMVar m => StrictMVar m a -> a -> m ()
putMVar StrictMVar m a
m a
d
decisionLogicThreads
:: forall m peeraddr txid tx.
( MonadDelay m
, MonadMVar m
, MonadMask m
, MonadAsync m
, MonadFork m
, Ord peeraddr
, Ord txid
, Hashable peeraddr
)
=> Tracer m (TraceTxLogic peeraddr txid tx)
-> Tracer m TxSubmissionCounters
-> TxDecisionPolicy
-> TxChannelsVar m peeraddr txid tx
-> SharedTxStateVar m peeraddr txid tx
-> m Void
decisionLogicThreads :: forall (m :: * -> *) peeraddr txid tx.
(MonadDelay m, MonadMVar m, MonadMask m, MonadAsync m, MonadFork m,
Ord peeraddr, Ord txid, Hashable peeraddr) =>
Tracer m (TraceTxLogic peeraddr txid tx)
-> Tracer m TxSubmissionCounters
-> TxDecisionPolicy
-> TxChannelsVar m peeraddr txid tx
-> SharedTxStateVar m peeraddr txid tx
-> m Void
decisionLogicThreads Tracer m (TraceTxLogic peeraddr txid tx)
tracer Tracer m TxSubmissionCounters
counterTracer TxDecisionPolicy
policy TxChannelsVar m peeraddr txid tx
txChannelsVar SharedTxStateVar m peeraddr txid tx
sharedStateVar =
(Void -> Void -> Void) -> (Void, Void) -> Void
forall a b c. (a -> b -> c) -> (a, b) -> c
uncurry Void -> Void -> Void
forall a. Semigroup a => a -> a -> a
(<>) ((Void, Void) -> Void) -> m (Void, Void) -> m Void
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$>
Tracer m (TraceTxLogic peeraddr txid tx)
-> TxDecisionPolicy
-> SharedTxStateVar m peeraddr txid tx
-> m Void
forall (m :: * -> *) peeraddr txid tx.
(MonadDelay m, MonadSTM m, MonadThread m, Ord txid) =>
Tracer m (TraceTxLogic peeraddr txid tx)
-> TxDecisionPolicy
-> SharedTxStateVar m peeraddr txid tx
-> m Void
drainRejectionThread Tracer m (TraceTxLogic peeraddr txid tx)
tracer TxDecisionPolicy
policy SharedTxStateVar m peeraddr txid tx
sharedStateVar
m Void -> m Void -> m (Void, Void)
forall a b. m a -> m b -> m (a, b)
forall (m :: * -> *) a b. MonadAsync m => m a -> m b -> m (a, b)
`concurrently`
Tracer m (TraceTxLogic peeraddr txid tx)
-> Tracer m TxSubmissionCounters
-> TxDecisionPolicy
-> TxChannelsVar m peeraddr txid tx
-> SharedTxStateVar m peeraddr txid tx
-> m Void
forall (m :: * -> *) peeraddr txid tx.
(MonadDelay m, MonadMVar m, MonadSTM m, MonadMask m, MonadFork m,
Ord peeraddr, Ord txid, Hashable peeraddr) =>
Tracer m (TraceTxLogic peeraddr txid tx)
-> Tracer m TxSubmissionCounters
-> TxDecisionPolicy
-> TxChannelsVar m peeraddr txid tx
-> SharedTxStateVar m peeraddr txid tx
-> m Void
decisionLogicThread Tracer m (TraceTxLogic peeraddr txid tx)
tracer Tracer m TxSubmissionCounters
counterTracer TxDecisionPolicy
policy TxChannelsVar m peeraddr txid tx
txChannelsVar SharedTxStateVar m peeraddr txid tx
sharedStateVar
_DECISION_LOOP_DELAY :: DiffTime
_DECISION_LOOP_DELAY :: DiffTime
_DECISION_LOOP_DELAY = DiffTime
0.005