{-# LANGUAGE BlockArguments #-}
{-# LANGUAGE LambdaCase #-}
{-# LANGUAGE NamedFieldPuns #-}
{-# LANGUAGE RankNTypes #-}
{-# LANGUAGE ScopedTypeVariables #-}
module Ouroboros.Network.TxSubmission.Inbound.V2.State
(
SharedTxState (..)
, PeerTxState (..)
, SharedTxStateVar
, newSharedTxStateVar
, receivedTxIds
, collectTxs
, acknowledgeTxIds
, splitAcknowledgedTxIds
, tickTimedTxs
, const_MAX_TX_SIZE_DISCREPENCY
, RefCountDiff (..)
, updateRefCounts
, receivedTxIdsImpl
, collectTxsImpl
) where
import Control.Concurrent.Class.MonadSTM.Strict
import Control.Exception (assert)
import Control.Monad.Class.MonadTime.SI
import Control.Tracer (Tracer, traceWith)
import Data.Foldable (fold, toList)
import Data.Foldable qualified as Foldable
import Data.Functor (($>))
import Data.Map.Merge.Strict qualified as Map
import Data.Map.Strict (Map)
import Data.Map.Strict qualified as Map
import Data.Maybe (fromJust, maybeToList)
import Data.Sequence.Strict (StrictSeq)
import Data.Sequence.Strict qualified as StrictSeq
import Data.Set qualified as Set
import Data.Typeable (Typeable)
import GHC.Stack (HasCallStack)
import System.Random (StdGen)
import Ouroboros.Network.Protocol.TxSubmission2.Type (NumTxIdsToAck (..))
import Ouroboros.Network.SizeInBytes (SizeInBytes (..))
import Ouroboros.Network.TxSubmission.Inbound.V2.Policy
import Ouroboros.Network.TxSubmission.Inbound.V2.Types
import Ouroboros.Network.TxSubmission.Mempool.Reader (MempoolSnapshot (..))
acknowledgeTxIds
:: forall peeraddr tx txid.
Ord txid
=> HasCallStack
=> TxDecisionPolicy
-> SharedTxState peeraddr txid tx
-> PeerTxState txid tx
-> ( NumTxIdsToAck
, NumTxIdsToReq
, TxsToMempool txid tx
, RefCountDiff txid
, PeerTxState txid tx
)
{-# INLINE acknowledgeTxIds #-}
acknowledgeTxIds :: forall peeraddr tx txid.
(Ord txid, HasCallStack) =>
TxDecisionPolicy
-> SharedTxState peeraddr txid tx
-> PeerTxState txid tx
-> (NumTxIdsToAck, NumTxIdsToReq, TxsToMempool txid tx,
RefCountDiff txid, PeerTxState txid tx)
acknowledgeTxIds
TxDecisionPolicy
policy
SharedTxState peeraddr txid tx
sharedTxState
ps :: PeerTxState txid tx
ps@PeerTxState { Map txid SizeInBytes
availableTxIds :: Map txid SizeInBytes
availableTxIds :: forall txid tx. PeerTxState txid tx -> Map txid SizeInBytes
availableTxIds,
Set txid
unknownTxs :: Set txid
unknownTxs :: forall txid tx. PeerTxState txid tx -> Set txid
unknownTxs,
NumTxIdsToReq
requestedTxIdsInflight :: NumTxIdsToReq
requestedTxIdsInflight :: forall txid tx. PeerTxState txid tx -> NumTxIdsToReq
requestedTxIdsInflight,
Map txid tx
downloadedTxs :: Map txid tx
downloadedTxs :: forall txid tx. PeerTxState txid tx -> Map txid tx
downloadedTxs,
Double
score :: Double
score :: forall txid tx. PeerTxState txid tx -> Double
score,
Map txid tx
toMempoolTxs :: Map txid tx
toMempoolTxs :: forall txid tx. PeerTxState txid tx -> Map txid tx
toMempoolTxs
}
=
if NumTxIdsToReq
txIdsToRequest NumTxIdsToReq -> NumTxIdsToReq -> Bool
forall a. Ord a => a -> a -> Bool
> NumTxIdsToReq
0
then
( NumTxIdsToAck
txIdsToAcknowledge
, NumTxIdsToReq
txIdsToRequest
, [(txid, tx)] -> TxsToMempool txid tx
forall txid tx. [(txid, tx)] -> TxsToMempool txid tx
TxsToMempool [(txid, tx)]
txsToMempool
, RefCountDiff txid
refCountDiff
, PeerTxState txid tx
ps { unacknowledgedTxIds = unacknowledgedTxIds',
availableTxIds = availableTxIds',
unknownTxs = unknownTxs',
requestedTxIdsInflight = requestedTxIdsInflight
+ txIdsToRequest,
downloadedTxs = downloadedTxs',
score = score',
toMempoolTxs = toMempoolTxs' }
)
else
( NumTxIdsToAck
0
, NumTxIdsToReq
0
, [(txid, tx)] -> TxsToMempool txid tx
forall txid tx. [(txid, tx)] -> TxsToMempool txid tx
TxsToMempool [(txid, tx)]
txsToMempool
, Map txid Int -> RefCountDiff txid
forall txid. Map txid Int -> RefCountDiff txid
RefCountDiff Map txid Int
forall k a. Map k a
Map.empty
, PeerTxState txid tx
ps { toMempoolTxs = toMempoolTxs' }
)
where
(NumTxIdsToReq
txIdsToRequest, StrictSeq txid
acknowledgedTxIds, StrictSeq txid
unacknowledgedTxIds')
= TxDecisionPolicy
-> SharedTxState peeraddr txid tx
-> PeerTxState txid tx
-> (NumTxIdsToReq, StrictSeq txid, StrictSeq txid)
forall txid peer tx.
(Ord txid, HasCallStack) =>
TxDecisionPolicy
-> SharedTxState peer txid tx
-> PeerTxState txid tx
-> (NumTxIdsToReq, StrictSeq txid, StrictSeq txid)
splitAcknowledgedTxIds TxDecisionPolicy
policy SharedTxState peeraddr txid tx
sharedTxState PeerTxState txid tx
ps
txsToMempool :: [(txid, tx)]
txsToMempool = [ (txid
txid, tx
tx)
| txid
txid <- StrictSeq txid -> [txid]
forall a. StrictSeq a -> [a]
forall (t :: * -> *) a. Foldable t => t a -> [a]
toList StrictSeq txid
toMempoolTxIds
, txid
txid txid -> Map txid (Maybe tx) -> Bool
forall k a. Ord k => k -> Map k a -> Bool
`Map.notMember` SharedTxState peeraddr txid tx -> Map txid (Maybe tx)
forall peeraddr txid tx.
SharedTxState peeraddr txid tx -> Map txid (Maybe tx)
bufferedTxs SharedTxState peeraddr txid tx
sharedTxState
, tx
tx <- Maybe tx -> [tx]
forall a. Maybe a -> [a]
maybeToList (Maybe tx -> [tx]) -> Maybe tx -> [tx]
forall a b. (a -> b) -> a -> b
$ txid
txid txid -> Map txid tx -> Maybe tx
forall k a. Ord k => k -> Map k a -> Maybe a
`Map.lookup` Map txid tx
downloadedTxs
]
(StrictSeq txid
toMempoolTxIds, StrictSeq txid
_) =
(txid -> Bool)
-> StrictSeq txid -> (StrictSeq txid, StrictSeq txid)
forall a. (a -> Bool) -> StrictSeq a -> (StrictSeq a, StrictSeq a)
StrictSeq.spanl (txid -> Map txid tx -> Bool
forall k a. Ord k => k -> Map k a -> Bool
`Map.member` Map txid tx
downloadedTxs) StrictSeq txid
acknowledgedTxIds
txsToMempoolMap :: Map txid tx
txsToMempoolMap = [(txid, tx)] -> Map txid tx
forall k a. Ord k => [(k, a)] -> Map k a
Map.fromList [(txid, tx)]
txsToMempool
toMempoolTxs' :: Map txid tx
toMempoolTxs' = Map txid tx
toMempoolTxs Map txid tx -> Map txid tx -> Map txid tx
forall a. Semigroup a => a -> a -> a
<> Map txid tx
txsToMempoolMap
(Map txid tx
downloadedTxs', Map txid tx
ackedDownloadedTxs) = (txid -> tx -> Bool) -> Map txid tx -> (Map txid tx, Map txid tx)
forall k a. (k -> a -> Bool) -> Map k a -> (Map k a, Map k a)
Map.partitionWithKey (\txid
txid tx
_ -> txid
txid txid -> Set txid -> Bool
forall a. Ord a => a -> Set a -> Bool
`Set.member` Set txid
liveSet) Map txid tx
downloadedTxs
lateTxs :: Map txid tx
lateTxs = (txid -> tx -> Bool) -> Map txid tx -> Map txid tx
forall k a. (k -> a -> Bool) -> Map k a -> Map k a
Map.filterWithKey (\txid
txid tx
_ -> txid
txid txid -> Map txid tx -> Bool
forall k a. Ord k => k -> Map k a -> Bool
`Map.notMember` Map txid tx
txsToMempoolMap) Map txid tx
ackedDownloadedTxs
score' :: Double
score' = Double
score Double -> Double -> Double
forall a. Num a => a -> a -> a
+ Int -> Double
forall a b. (Integral a, Num b) => a -> b
fromIntegral (Map txid tx -> Int
forall k a. Map k a -> Int
Map.size Map txid tx
lateTxs)
liveSet :: Set txid
liveSet = [txid] -> Set txid
forall a. Ord a => [a] -> Set a
Set.fromList (StrictSeq txid -> [txid]
forall a. StrictSeq a -> [a]
forall (t :: * -> *) a. Foldable t => t a -> [a]
toList StrictSeq txid
unacknowledgedTxIds')
availableTxIds' :: Map txid SizeInBytes
availableTxIds' = Map txid SizeInBytes
availableTxIds
Map txid SizeInBytes -> Set txid -> Map txid SizeInBytes
forall k a. Ord k => Map k a -> Set k -> Map k a
`Map.restrictKeys`
Set txid
liveSet
unknownTxs' :: Set txid
unknownTxs' = Set txid
unknownTxs Set txid -> Set txid -> Set txid
forall a. Ord a => Set a -> Set a -> Set a
`Set.intersection` Set txid
liveSet
refCountDiff :: RefCountDiff txid
refCountDiff = Map txid Int -> RefCountDiff txid
forall txid. Map txid Int -> RefCountDiff txid
RefCountDiff
(Map txid Int -> RefCountDiff txid)
-> Map txid Int -> RefCountDiff txid
forall a b. (a -> b) -> a -> b
$ (txid -> Map txid Int -> Map txid Int)
-> Map txid Int -> StrictSeq txid -> Map txid Int
forall a b. (a -> b -> b) -> b -> StrictSeq a -> b
forall (t :: * -> *) a b.
Foldable t =>
(a -> b -> b) -> b -> t a -> b
foldr ((Maybe Int -> Maybe Int) -> txid -> Map txid Int -> Map txid Int
forall k a.
Ord k =>
(Maybe a -> Maybe a) -> k -> Map k a -> Map k a
Map.alter Maybe Int -> Maybe Int
fn)
Map txid Int
forall k a. Map k a
Map.empty StrictSeq txid
acknowledgedTxIds
where
fn :: Maybe Int -> Maybe Int
fn :: Maybe Int -> Maybe Int
fn Maybe Int
Nothing = Int -> Maybe Int
forall a. a -> Maybe a
Just Int
1
fn (Just Int
n) = Int -> Maybe Int
forall a. a -> Maybe a
Just (Int -> Maybe Int) -> Int -> Maybe Int
forall a b. (a -> b) -> a -> b
$! Int
n Int -> Int -> Int
forall a. Num a => a -> a -> a
+ Int
1
txIdsToAcknowledge :: NumTxIdsToAck
txIdsToAcknowledge :: NumTxIdsToAck
txIdsToAcknowledge = Int -> NumTxIdsToAck
forall a b. (Integral a, Num b) => a -> b
fromIntegral (Int -> NumTxIdsToAck) -> Int -> NumTxIdsToAck
forall a b. (a -> b) -> a -> b
$ StrictSeq txid -> Int
forall a. StrictSeq a -> Int
StrictSeq.length StrictSeq txid
acknowledgedTxIds
splitAcknowledgedTxIds
:: Ord txid
=> HasCallStack
=> TxDecisionPolicy
-> SharedTxState peer txid tx
-> PeerTxState txid tx
-> (NumTxIdsToReq, StrictSeq.StrictSeq txid, StrictSeq.StrictSeq txid)
splitAcknowledgedTxIds :: forall txid peer tx.
(Ord txid, HasCallStack) =>
TxDecisionPolicy
-> SharedTxState peer txid tx
-> PeerTxState txid tx
-> (NumTxIdsToReq, StrictSeq txid, StrictSeq txid)
splitAcknowledgedTxIds
TxDecisionPolicy {
NumTxIdsToReq
maxUnacknowledgedTxIds :: NumTxIdsToReq
maxUnacknowledgedTxIds :: TxDecisionPolicy -> NumTxIdsToReq
maxUnacknowledgedTxIds,
NumTxIdsToReq
maxNumTxIdsToRequest :: NumTxIdsToReq
maxNumTxIdsToRequest :: TxDecisionPolicy -> NumTxIdsToReq
maxNumTxIdsToRequest
}
SharedTxState {
Map txid (Maybe tx)
bufferedTxs :: forall peeraddr txid tx.
SharedTxState peeraddr txid tx -> Map txid (Maybe tx)
bufferedTxs :: Map txid (Maybe tx)
bufferedTxs
}
PeerTxState {
StrictSeq txid
unacknowledgedTxIds :: forall txid tx. PeerTxState txid tx -> StrictSeq txid
unacknowledgedTxIds :: StrictSeq txid
unacknowledgedTxIds,
Set txid
unknownTxs :: forall txid tx. PeerTxState txid tx -> Set txid
unknownTxs :: Set txid
unknownTxs,
Map txid tx
downloadedTxs :: forall txid tx. PeerTxState txid tx -> Map txid tx
downloadedTxs :: Map txid tx
downloadedTxs,
Set txid
requestedTxsInflight :: Set txid
requestedTxsInflight :: forall txid tx. PeerTxState txid tx -> Set txid
requestedTxsInflight,
NumTxIdsToReq
requestedTxIdsInflight :: forall txid tx. PeerTxState txid tx -> NumTxIdsToReq
requestedTxIdsInflight :: NumTxIdsToReq
requestedTxIdsInflight
}
=
(NumTxIdsToReq
txIdsToRequest, StrictSeq txid
acknowledgedTxIds', StrictSeq txid
unacknowledgedTxIds')
where
(StrictSeq txid
acknowledgedTxIds', StrictSeq txid
unacknowledgedTxIds')
= (txid -> Bool)
-> StrictSeq txid -> (StrictSeq txid, StrictSeq txid)
forall a. (a -> Bool) -> StrictSeq a -> (StrictSeq a, StrictSeq a)
StrictSeq.spanl (\txid
txid -> (txid
txid txid -> Map txid (Maybe tx) -> Bool
forall k a. Ord k => k -> Map k a -> Bool
`Map.member` Map txid (Maybe tx)
bufferedTxs
Bool -> Bool -> Bool
|| txid
txid txid -> Set txid -> Bool
forall a. Ord a => a -> Set a -> Bool
`Set.member` Set txid
unknownTxs
Bool -> Bool -> Bool
|| txid
txid txid -> Map txid tx -> Bool
forall k a. Ord k => k -> Map k a -> Bool
`Map.member` Map txid tx
downloadedTxs)
Bool -> Bool -> Bool
&& txid
txid txid -> Set txid -> Bool
forall a. Ord a => a -> Set a -> Bool
`Set.notMember` Set txid
requestedTxsInflight
)
StrictSeq txid
unacknowledgedTxIds
numOfUnacked :: Int
numOfUnacked = StrictSeq txid -> Int
forall a. StrictSeq a -> Int
StrictSeq.length StrictSeq txid
unacknowledgedTxIds
numOfAcked :: Int
numOfAcked = StrictSeq txid -> Int
forall a. StrictSeq a -> Int
StrictSeq.length StrictSeq txid
acknowledgedTxIds'
unackedAndRequested :: NumTxIdsToReq
unackedAndRequested = Int -> NumTxIdsToReq
forall a b. (Integral a, Num b) => a -> b
fromIntegral Int
numOfUnacked NumTxIdsToReq -> NumTxIdsToReq -> NumTxIdsToReq
forall a. Num a => a -> a -> a
+ NumTxIdsToReq
requestedTxIdsInflight
txIdsToRequest :: NumTxIdsToReq
txIdsToRequest =
Bool -> NumTxIdsToReq -> NumTxIdsToReq
forall a. HasCallStack => Bool -> a -> a
assert (NumTxIdsToReq
unackedAndRequested NumTxIdsToReq -> NumTxIdsToReq -> Bool
forall a. Ord a => a -> a -> Bool
<= NumTxIdsToReq
maxUnacknowledgedTxIds) (NumTxIdsToReq -> NumTxIdsToReq) -> NumTxIdsToReq -> NumTxIdsToReq
forall a b. (a -> b) -> a -> b
$
Bool -> NumTxIdsToReq -> NumTxIdsToReq
forall a. HasCallStack => Bool -> a -> a
assert (NumTxIdsToReq
requestedTxIdsInflight NumTxIdsToReq -> NumTxIdsToReq -> Bool
forall a. Ord a => a -> a -> Bool
<= NumTxIdsToReq
maxNumTxIdsToRequest) (NumTxIdsToReq -> NumTxIdsToReq) -> NumTxIdsToReq -> NumTxIdsToReq
forall a b. (a -> b) -> a -> b
$
(NumTxIdsToReq
maxUnacknowledgedTxIds NumTxIdsToReq -> NumTxIdsToReq -> NumTxIdsToReq
forall a. Num a => a -> a -> a
- NumTxIdsToReq
unackedAndRequested NumTxIdsToReq -> NumTxIdsToReq -> NumTxIdsToReq
forall a. Num a => a -> a -> a
+ Int -> NumTxIdsToReq
forall a b. (Integral a, Num b) => a -> b
fromIntegral Int
numOfAcked)
NumTxIdsToReq -> NumTxIdsToReq -> NumTxIdsToReq
forall a. Ord a => a -> a -> a
`min`
(NumTxIdsToReq
maxNumTxIdsToRequest NumTxIdsToReq -> NumTxIdsToReq -> NumTxIdsToReq
forall a. Num a => a -> a -> a
- NumTxIdsToReq
requestedTxIdsInflight)
newtype RefCountDiff txid = RefCountDiff {
forall txid. RefCountDiff txid -> Map txid Int
txIdsToAck :: Map txid Int
}
updateRefCounts :: Ord txid
=> Map txid Int
-> RefCountDiff txid
-> Map txid Int
updateRefCounts :: forall txid.
Ord txid =>
Map txid Int -> RefCountDiff txid -> Map txid Int
updateRefCounts Map txid Int
referenceCounts (RefCountDiff Map txid Int
diff) =
SimpleWhenMissing txid Int Int
-> SimpleWhenMissing txid Int Int
-> SimpleWhenMatched txid Int Int Int
-> Map txid Int
-> Map txid Int
-> Map txid Int
forall k a c b.
Ord k =>
SimpleWhenMissing k a c
-> SimpleWhenMissing k b c
-> SimpleWhenMatched k a b c
-> Map k a
-> Map k b
-> Map k c
Map.merge ((txid -> Int -> Maybe Int) -> SimpleWhenMissing txid Int Int
forall (f :: * -> *) k x y.
Applicative f =>
(k -> x -> Maybe y) -> WhenMissing f k x y
Map.mapMaybeMissing \txid
_ Int
x -> Int -> Maybe Int
forall a. a -> Maybe a
Just Int
x)
((txid -> Int -> Maybe Int) -> SimpleWhenMissing txid Int Int
forall (f :: * -> *) k x y.
Applicative f =>
(k -> x -> Maybe y) -> WhenMissing f k x y
Map.mapMaybeMissing \txid
_ Int
_ -> Maybe Int
forall a. Maybe a
Nothing)
((txid -> Int -> Int -> Maybe Int)
-> SimpleWhenMatched txid Int Int Int
forall (f :: * -> *) k x y z.
Applicative f =>
(k -> x -> y -> Maybe z) -> WhenMatched f k x y z
Map.zipWithMaybeMatched \txid
_ Int
x Int
y -> Bool -> Maybe Int -> Maybe Int
forall a. HasCallStack => Bool -> a -> a
assert (Int
x Int -> Int -> Bool
forall a. Ord a => a -> a -> Bool
>= Int
y)
if Int
x Int -> Int -> Bool
forall a. Ord a => a -> a -> Bool
> Int
y then Int -> Maybe Int
forall a. a -> Maybe a
Just (Int -> Maybe Int) -> Int -> Maybe Int
forall a b. (a -> b) -> a -> b
$! Int
x Int -> Int -> Int
forall a. Num a => a -> a -> a
- Int
y
else Maybe Int
forall a. Maybe a
Nothing)
Map txid Int
referenceCounts
Map txid Int
diff
tickTimedTxs :: forall peeraddr tx txid.
(Ord txid)
=> Time
-> SharedTxState peeraddr txid tx
-> SharedTxState peeraddr txid tx
tickTimedTxs :: forall peeraddr tx txid.
Ord txid =>
Time
-> SharedTxState peeraddr txid tx -> SharedTxState peeraddr txid tx
tickTimedTxs Time
now st :: SharedTxState peeraddr txid tx
st@SharedTxState{ Map Time [txid]
timedTxs :: Map Time [txid]
timedTxs :: forall peeraddr txid tx.
SharedTxState peeraddr txid tx -> Map Time [txid]
timedTxs
, Map txid Int
referenceCounts :: Map txid Int
referenceCounts :: forall peeraddr txid tx.
SharedTxState peeraddr txid tx -> Map txid Int
referenceCounts
, Map txid (Maybe tx)
bufferedTxs :: forall peeraddr txid tx.
SharedTxState peeraddr txid tx -> Map txid (Maybe tx)
bufferedTxs :: Map txid (Maybe tx)
bufferedTxs } =
let (Map Time [txid]
expiredTxs', Map Time [txid]
timedTxs') =
case Time
-> Map Time [txid]
-> (Map Time [txid], Maybe [txid], Map Time [txid])
forall k a. Ord k => k -> Map k a -> (Map k a, Maybe a, Map k a)
Map.splitLookup Time
now Map Time [txid]
timedTxs of
(Map Time [txid]
expired, Just [txid]
txids, Map Time [txid]
timed) ->
(Map Time [txid]
expired,
Time -> [txid] -> Map Time [txid] -> Map Time [txid]
forall k a. Ord k => k -> a -> Map k a -> Map k a
Map.insert Time
now [txid]
txids Map Time [txid]
timed)
(Map Time [txid]
expired, Maybe [txid]
Nothing, Map Time [txid]
timed) ->
(Map Time [txid]
expired, Map Time [txid]
timed)
refDiff :: Map txid Int
refDiff = (Map txid Int -> [txid] -> Map txid Int)
-> Map txid Int -> Map Time [txid] -> Map txid Int
forall a b k. (a -> b -> a) -> a -> Map k b -> a
Map.foldl' Map txid Int -> [txid] -> Map txid Int
fn Map txid Int
forall k a. Map k a
Map.empty Map Time [txid]
expiredTxs'
referenceCounts' :: Map txid Int
referenceCounts' = Map txid Int -> RefCountDiff txid -> Map txid Int
forall txid.
Ord txid =>
Map txid Int -> RefCountDiff txid -> Map txid Int
updateRefCounts Map txid Int
referenceCounts (Map txid Int -> RefCountDiff txid
forall txid. Map txid Int -> RefCountDiff txid
RefCountDiff Map txid Int
refDiff)
liveSet :: Set txid
liveSet = Map txid Int -> Set txid
forall k a. Map k a -> Set k
Map.keysSet Map txid Int
referenceCounts'
bufferedTxs' :: Map txid (Maybe tx)
bufferedTxs' = Map txid (Maybe tx)
bufferedTxs Map txid (Maybe tx) -> Set txid -> Map txid (Maybe tx)
forall k a. Ord k => Map k a -> Set k -> Map k a
`Map.restrictKeys` Set txid
liveSet in
SharedTxState peeraddr txid tx
st { timedTxs = timedTxs'
, referenceCounts = referenceCounts'
, bufferedTxs = bufferedTxs'
}
where
fn :: Map txid Int
-> [txid]
-> Map txid Int
fn :: Map txid Int -> [txid] -> Map txid Int
fn Map txid Int
m [txid]
txids = (Map txid Int -> txid -> Map txid Int)
-> Map txid Int -> [txid] -> Map txid Int
forall b a. (b -> a -> b) -> b -> [a] -> b
forall (t :: * -> *) b a.
Foldable t =>
(b -> a -> b) -> b -> t a -> b
Foldable.foldl' Map txid Int -> txid -> Map txid Int
gn Map txid Int
m [txid]
txids
gn :: Map txid Int
-> txid
-> Map txid Int
gn :: Map txid Int -> txid -> Map txid Int
gn Map txid Int
m txid
txid = (Maybe Int -> Maybe Int) -> txid -> Map txid Int -> Map txid Int
forall k a.
Ord k =>
(Maybe a -> Maybe a) -> k -> Map k a -> Map k a
Map.alter Maybe Int -> Maybe Int
af txid
txid Map txid Int
m
af :: Maybe Int
-> Maybe Int
af :: Maybe Int -> Maybe Int
af Maybe Int
Nothing = Int -> Maybe Int
forall a. a -> Maybe a
Just Int
1
af (Just Int
n) = Int -> Maybe Int
forall a. a -> Maybe a
Just (Int -> Maybe Int) -> Int -> Maybe Int
forall a b. (a -> b) -> a -> b
$! Int -> Int
forall a. Enum a => a -> a
succ Int
n
receivedTxIdsImpl
:: forall peeraddr tx txid.
(Ord txid, Ord peeraddr, HasCallStack)
=> (txid -> Bool)
-> peeraddr
-> NumTxIdsToReq
-> StrictSeq txid
-> Map txid SizeInBytes
-> SharedTxState peeraddr txid tx
-> SharedTxState peeraddr txid tx
receivedTxIdsImpl :: forall peeraddr tx txid.
(Ord txid, Ord peeraddr, HasCallStack) =>
(txid -> Bool)
-> peeraddr
-> NumTxIdsToReq
-> StrictSeq txid
-> Map txid SizeInBytes
-> SharedTxState peeraddr txid tx
-> SharedTxState peeraddr txid tx
receivedTxIdsImpl
txid -> Bool
mempoolHasTx
peeraddr
peeraddr NumTxIdsToReq
reqNo StrictSeq txid
txidsSeq Map txid SizeInBytes
txidsMap
st :: SharedTxState peeraddr txid tx
st@SharedTxState{ Map peeraddr (PeerTxState txid tx)
peerTxStates :: Map peeraddr (PeerTxState txid tx)
peerTxStates :: forall peeraddr txid tx.
SharedTxState peeraddr txid tx
-> Map peeraddr (PeerTxState txid tx)
peerTxStates,
Map txid (Maybe tx)
bufferedTxs :: forall peeraddr txid tx.
SharedTxState peeraddr txid tx -> Map txid (Maybe tx)
bufferedTxs :: Map txid (Maybe tx)
bufferedTxs,
Map txid Int
referenceCounts :: forall peeraddr txid tx.
SharedTxState peeraddr txid tx -> Map txid Int
referenceCounts :: Map txid Int
referenceCounts }
=
case (Maybe (PeerTxState txid tx)
-> (SharedTxState peeraddr txid tx, Maybe (PeerTxState txid tx)))
-> peeraddr
-> Map peeraddr (PeerTxState txid tx)
-> (SharedTxState peeraddr txid tx,
Map peeraddr (PeerTxState txid tx))
forall (f :: * -> *) k a.
(Functor f, Ord k) =>
(Maybe a -> f (Maybe a)) -> k -> Map k a -> f (Map k a)
Map.alterF ((PeerTxState txid tx -> Maybe (PeerTxState txid tx))
-> (SharedTxState peeraddr txid tx, PeerTxState txid tx)
-> (SharedTxState peeraddr txid tx, Maybe (PeerTxState txid tx))
forall a b.
(a -> b)
-> (SharedTxState peeraddr txid tx, a)
-> (SharedTxState peeraddr txid tx, b)
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
fmap PeerTxState txid tx -> Maybe (PeerTxState txid tx)
forall a. a -> Maybe a
Just ((SharedTxState peeraddr txid tx, PeerTxState txid tx)
-> (SharedTxState peeraddr txid tx, Maybe (PeerTxState txid tx)))
-> (Maybe (PeerTxState txid tx)
-> (SharedTxState peeraddr txid tx, PeerTxState txid tx))
-> Maybe (PeerTxState txid tx)
-> (SharedTxState peeraddr txid tx, Maybe (PeerTxState txid tx))
forall b c a. (b -> c) -> (a -> b) -> a -> c
. PeerTxState txid tx
-> (SharedTxState peeraddr txid tx, PeerTxState txid tx)
fn (PeerTxState txid tx
-> (SharedTxState peeraddr txid tx, PeerTxState txid tx))
-> (Maybe (PeerTxState txid tx) -> PeerTxState txid tx)
-> Maybe (PeerTxState txid tx)
-> (SharedTxState peeraddr txid tx, PeerTxState txid tx)
forall b c a. (b -> c) -> (a -> b) -> a -> c
. Maybe (PeerTxState txid tx) -> PeerTxState txid tx
forall a. HasCallStack => Maybe a -> a
fromJust)
peeraddr
peeraddr
Map peeraddr (PeerTxState txid tx)
peerTxStates of
( SharedTxState peeraddr txid tx
st', Map peeraddr (PeerTxState txid tx)
peerTxStates' ) ->
SharedTxState peeraddr txid tx
st' { peerTxStates = peerTxStates' }
where
fn :: PeerTxState txid tx
-> ( SharedTxState peeraddr txid tx
, PeerTxState txid tx
)
fn :: PeerTxState txid tx
-> (SharedTxState peeraddr txid tx, PeerTxState txid tx)
fn ps :: PeerTxState txid tx
ps@PeerTxState { Map txid SizeInBytes
availableTxIds :: forall txid tx. PeerTxState txid tx -> Map txid SizeInBytes
availableTxIds :: Map txid SizeInBytes
availableTxIds,
NumTxIdsToReq
requestedTxIdsInflight :: forall txid tx. PeerTxState txid tx -> NumTxIdsToReq
requestedTxIdsInflight :: NumTxIdsToReq
requestedTxIdsInflight,
StrictSeq txid
unacknowledgedTxIds :: forall txid tx. PeerTxState txid tx -> StrictSeq txid
unacknowledgedTxIds :: StrictSeq txid
unacknowledgedTxIds } =
(SharedTxState peeraddr txid tx
st', PeerTxState txid tx
ps')
where
(Map txid SizeInBytes
ignoredTxIds, Map txid SizeInBytes
availableTxIdsMap) =
(txid -> SizeInBytes -> Bool)
-> Map txid SizeInBytes
-> (Map txid SizeInBytes, Map txid SizeInBytes)
forall k a. (k -> a -> Bool) -> Map k a -> (Map k a, Map k a)
Map.partitionWithKey
(\txid
txid SizeInBytes
_ -> txid -> Bool
mempoolHasTx txid
txid)
Map txid SizeInBytes
txidsMap
availableTxIds' :: Map txid SizeInBytes
availableTxIds' =
(Map txid SizeInBytes
-> txid -> SizeInBytes -> Map txid SizeInBytes)
-> Map txid SizeInBytes
-> Map txid SizeInBytes
-> Map txid SizeInBytes
forall a k b. (a -> k -> b -> a) -> a -> Map k b -> a
Map.foldlWithKey
(\Map txid SizeInBytes
m txid
txid SizeInBytes
sizeInBytes -> txid -> SizeInBytes -> Map txid SizeInBytes -> Map txid SizeInBytes
forall k a. Ord k => k -> a -> Map k a -> Map k a
Map.insert txid
txid SizeInBytes
sizeInBytes Map txid SizeInBytes
m)
Map txid SizeInBytes
availableTxIds
((txid -> SizeInBytes -> Bool)
-> Map txid SizeInBytes -> Map txid SizeInBytes
forall k a. (k -> a -> Bool) -> Map k a -> Map k a
Map.filterWithKey
(\txid
txid SizeInBytes
_ -> txid
txid txid -> StrictSeq txid -> Bool
forall (t :: * -> *) a. (Foldable t, Eq a) => a -> t a -> Bool
`notElem` StrictSeq txid
unacknowledgedTxIds
Bool -> Bool -> Bool
&& txid
txid txid -> Map txid (Maybe tx) -> Bool
forall k a. Ord k => k -> Map k a -> Bool
`Map.notMember` Map txid (Maybe tx)
bufferedTxs)
Map txid SizeInBytes
availableTxIdsMap)
unacknowledgedTxIds' :: StrictSeq txid
unacknowledgedTxIds' = StrictSeq txid
unacknowledgedTxIds StrictSeq txid -> StrictSeq txid -> StrictSeq txid
forall a. Semigroup a => a -> a -> a
<> StrictSeq txid
txidsSeq
bufferedTxs' :: Map txid (Maybe tx)
bufferedTxs' = Map txid (Maybe tx)
bufferedTxs
Map txid (Maybe tx) -> Map txid (Maybe tx) -> Map txid (Maybe tx)
forall a. Semigroup a => a -> a -> a
<> (SizeInBytes -> Maybe tx)
-> Map txid SizeInBytes -> Map txid (Maybe tx)
forall a b k. (a -> b) -> Map k a -> Map k b
Map.map (Maybe tx -> SizeInBytes -> Maybe tx
forall a b. a -> b -> a
const Maybe tx
forall a. Maybe a
Nothing) Map txid SizeInBytes
ignoredTxIds
referenceCounts' :: Map txid Int
referenceCounts' =
(Map txid Int -> txid -> Map txid Int)
-> Map txid Int -> StrictSeq txid -> Map txid Int
forall b a. (b -> a -> b) -> b -> StrictSeq a -> b
forall (t :: * -> *) b a.
Foldable t =>
(b -> a -> b) -> b -> t a -> b
Foldable.foldl'
((txid -> Map txid Int -> Map txid Int)
-> Map txid Int -> txid -> Map txid Int
forall a b c. (a -> b -> c) -> b -> a -> c
flip ((txid -> Map txid Int -> Map txid Int)
-> Map txid Int -> txid -> Map txid Int)
-> (txid -> Map txid Int -> Map txid Int)
-> Map txid Int
-> txid
-> Map txid Int
forall a b. (a -> b) -> a -> b
$ (Maybe Int -> Maybe Int) -> txid -> Map txid Int -> Map txid Int
forall k a.
Ord k =>
(Maybe a -> Maybe a) -> k -> Map k a -> Map k a
Map.alter (\case
Maybe Int
Nothing -> Int -> Maybe Int
forall a. a -> Maybe a
Just (Int -> Maybe Int) -> Int -> Maybe Int
forall a b. (a -> b) -> a -> b
$! Int
1
Just Int
cnt -> Int -> Maybe Int
forall a. a -> Maybe a
Just (Int -> Maybe Int) -> Int -> Maybe Int
forall a b. (a -> b) -> a -> b
$! Int -> Int
forall a. Enum a => a -> a
succ Int
cnt))
Map txid Int
referenceCounts
StrictSeq txid
txidsSeq
st' :: SharedTxState peeraddr txid tx
st' = SharedTxState peeraddr txid tx
st { bufferedTxs = bufferedTxs',
referenceCounts = referenceCounts' }
ps' :: PeerTxState txid tx
ps' = Bool -> PeerTxState txid tx -> PeerTxState txid tx
forall a. HasCallStack => Bool -> a -> a
assert (NumTxIdsToReq
requestedTxIdsInflight NumTxIdsToReq -> NumTxIdsToReq -> Bool
forall a. Ord a => a -> a -> Bool
>= NumTxIdsToReq
reqNo)
PeerTxState txid tx
ps { availableTxIds = availableTxIds',
unacknowledgedTxIds = unacknowledgedTxIds',
requestedTxIdsInflight = requestedTxIdsInflight - reqNo }
const_MAX_TX_SIZE_DISCREPENCY :: SizeInBytes
const_MAX_TX_SIZE_DISCREPENCY :: SizeInBytes
const_MAX_TX_SIZE_DISCREPENCY = SizeInBytes
32
collectTxsImpl
:: forall peeraddr tx txid.
( Ord peeraddr
, Ord txid
, Show txid
, Typeable txid
)
=> (tx -> SizeInBytes)
-> peeraddr
-> Map txid SizeInBytes
-> Map txid tx
-> SharedTxState peeraddr txid tx
-> Either TxSubmissionProtocolError
(SharedTxState peeraddr txid tx)
collectTxsImpl :: forall peeraddr tx txid.
(Ord peeraddr, Ord txid, Show txid, Typeable txid) =>
(tx -> SizeInBytes)
-> peeraddr
-> Map txid SizeInBytes
-> Map txid tx
-> SharedTxState peeraddr txid tx
-> Either
TxSubmissionProtocolError (SharedTxState peeraddr txid tx)
collectTxsImpl tx -> SizeInBytes
txSize peeraddr
peeraddr Map txid SizeInBytes
requestedTxIdsMap Map txid tx
receivedTxs
st :: SharedTxState peeraddr txid tx
st@SharedTxState { Map peeraddr (PeerTxState txid tx)
peerTxStates :: forall peeraddr txid tx.
SharedTxState peeraddr txid tx
-> Map peeraddr (PeerTxState txid tx)
peerTxStates :: Map peeraddr (PeerTxState txid tx)
peerTxStates } =
case (Maybe (PeerTxState txid tx)
-> (Either
[(txid, SizeInBytes, SizeInBytes)]
(SharedTxState peeraddr txid tx),
Maybe (PeerTxState txid tx)))
-> peeraddr
-> Map peeraddr (PeerTxState txid tx)
-> (Either
[(txid, SizeInBytes, SizeInBytes)]
(SharedTxState peeraddr txid tx),
Map peeraddr (PeerTxState txid tx))
forall (f :: * -> *) k a.
(Functor f, Ord k) =>
(Maybe a -> f (Maybe a)) -> k -> Map k a -> f (Map k a)
Map.alterF ((PeerTxState txid tx -> Maybe (PeerTxState txid tx))
-> (Either
[(txid, SizeInBytes, SizeInBytes)]
(SharedTxState peeraddr txid tx),
PeerTxState txid tx)
-> (Either
[(txid, SizeInBytes, SizeInBytes)]
(SharedTxState peeraddr txid tx),
Maybe (PeerTxState txid tx))
forall a b.
(a -> b)
-> (Either
[(txid, SizeInBytes, SizeInBytes)]
(SharedTxState peeraddr txid tx),
a)
-> (Either
[(txid, SizeInBytes, SizeInBytes)]
(SharedTxState peeraddr txid tx),
b)
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
fmap PeerTxState txid tx -> Maybe (PeerTxState txid tx)
forall a. a -> Maybe a
Just ((Either
[(txid, SizeInBytes, SizeInBytes)]
(SharedTxState peeraddr txid tx),
PeerTxState txid tx)
-> (Either
[(txid, SizeInBytes, SizeInBytes)]
(SharedTxState peeraddr txid tx),
Maybe (PeerTxState txid tx)))
-> (Maybe (PeerTxState txid tx)
-> (Either
[(txid, SizeInBytes, SizeInBytes)]
(SharedTxState peeraddr txid tx),
PeerTxState txid tx))
-> Maybe (PeerTxState txid tx)
-> (Either
[(txid, SizeInBytes, SizeInBytes)]
(SharedTxState peeraddr txid tx),
Maybe (PeerTxState txid tx))
forall b c a. (b -> c) -> (a -> b) -> a -> c
. PeerTxState txid tx
-> (Either
[(txid, SizeInBytes, SizeInBytes)]
(SharedTxState peeraddr txid tx),
PeerTxState txid tx)
fn (PeerTxState txid tx
-> (Either
[(txid, SizeInBytes, SizeInBytes)]
(SharedTxState peeraddr txid tx),
PeerTxState txid tx))
-> (Maybe (PeerTxState txid tx) -> PeerTxState txid tx)
-> Maybe (PeerTxState txid tx)
-> (Either
[(txid, SizeInBytes, SizeInBytes)]
(SharedTxState peeraddr txid tx),
PeerTxState txid tx)
forall b c a. (b -> c) -> (a -> b) -> a -> c
. Maybe (PeerTxState txid tx) -> PeerTxState txid tx
forall a. HasCallStack => Maybe a -> a
fromJust)
peeraddr
peeraddr
Map peeraddr (PeerTxState txid tx)
peerTxStates of
(Right SharedTxState peeraddr txid tx
st', Map peeraddr (PeerTxState txid tx)
peerTxStates') ->
SharedTxState peeraddr txid tx
-> Either
TxSubmissionProtocolError (SharedTxState peeraddr txid tx)
forall a b. b -> Either a b
Right SharedTxState peeraddr txid tx
st' { peerTxStates = peerTxStates' }
(Left [(txid, SizeInBytes, SizeInBytes)]
e, Map peeraddr (PeerTxState txid tx)
_) ->
TxSubmissionProtocolError
-> Either
TxSubmissionProtocolError (SharedTxState peeraddr txid tx)
forall a b. a -> Either a b
Left (TxSubmissionProtocolError
-> Either
TxSubmissionProtocolError (SharedTxState peeraddr txid tx))
-> TxSubmissionProtocolError
-> Either
TxSubmissionProtocolError (SharedTxState peeraddr txid tx)
forall a b. (a -> b) -> a -> b
$ [(txid, SizeInBytes, SizeInBytes)] -> TxSubmissionProtocolError
forall txid.
(Typeable txid, Show txid, Eq txid) =>
[(txid, SizeInBytes, SizeInBytes)] -> TxSubmissionProtocolError
ProtocolErrorTxSizeError [(txid, SizeInBytes, SizeInBytes)]
e
where
fn :: PeerTxState txid tx
-> ( Either [(txid, SizeInBytes, SizeInBytes)]
(SharedTxState peeraddr txid tx)
, PeerTxState txid tx
)
fn :: PeerTxState txid tx
-> (Either
[(txid, SizeInBytes, SizeInBytes)]
(SharedTxState peeraddr txid tx),
PeerTxState txid tx)
fn PeerTxState txid tx
ps =
case [(txid, SizeInBytes, SizeInBytes)]
wrongSizedTxs of
[] -> ( SharedTxState peeraddr txid tx
-> Either
[(txid, SizeInBytes, SizeInBytes)] (SharedTxState peeraddr txid tx)
forall a b. b -> Either a b
Right SharedTxState peeraddr txid tx
st'
, PeerTxState txid tx
ps''
)
[(txid, SizeInBytes, SizeInBytes)]
_ -> ( [(txid, SizeInBytes, SizeInBytes)]
-> Either
[(txid, SizeInBytes, SizeInBytes)] (SharedTxState peeraddr txid tx)
forall a b. a -> Either a b
Left [(txid, SizeInBytes, SizeInBytes)]
wrongSizedTxs
, PeerTxState txid tx
ps
)
where
wrongSizedTxs :: [(txid, SizeInBytes, SizeInBytes)]
wrongSizedTxs :: [(txid, SizeInBytes, SizeInBytes)]
wrongSizedTxs =
((txid, (SizeInBytes, SizeInBytes))
-> (txid, SizeInBytes, SizeInBytes))
-> [(txid, (SizeInBytes, SizeInBytes))]
-> [(txid, SizeInBytes, SizeInBytes)]
forall a b. (a -> b) -> [a] -> [b]
map (\(txid
a, (SizeInBytes
b,SizeInBytes
c)) -> (txid
a,SizeInBytes
b,SizeInBytes
c))
([(txid, (SizeInBytes, SizeInBytes))]
-> [(txid, SizeInBytes, SizeInBytes)])
-> (Map txid (SizeInBytes, SizeInBytes)
-> [(txid, (SizeInBytes, SizeInBytes))])
-> Map txid (SizeInBytes, SizeInBytes)
-> [(txid, SizeInBytes, SizeInBytes)]
forall b c a. (b -> c) -> (a -> b) -> a -> c
. Map txid (SizeInBytes, SizeInBytes)
-> [(txid, (SizeInBytes, SizeInBytes))]
forall k a. Map k a -> [(k, a)]
Map.toList
(Map txid (SizeInBytes, SizeInBytes)
-> [(txid, SizeInBytes, SizeInBytes)])
-> Map txid (SizeInBytes, SizeInBytes)
-> [(txid, SizeInBytes, SizeInBytes)]
forall a b. (a -> b) -> a -> b
$ SimpleWhenMissing txid SizeInBytes (SizeInBytes, SizeInBytes)
-> SimpleWhenMissing txid SizeInBytes (SizeInBytes, SizeInBytes)
-> SimpleWhenMatched
txid SizeInBytes SizeInBytes (SizeInBytes, SizeInBytes)
-> Map txid SizeInBytes
-> Map txid SizeInBytes
-> Map txid (SizeInBytes, SizeInBytes)
forall k a c b.
Ord k =>
SimpleWhenMissing k a c
-> SimpleWhenMissing k b c
-> SimpleWhenMatched k a b c
-> Map k a
-> Map k b
-> Map k c
Map.merge
SimpleWhenMissing txid SizeInBytes (SizeInBytes, SizeInBytes)
forall (f :: * -> *) k x y. Applicative f => WhenMissing f k x y
Map.dropMissing
SimpleWhenMissing txid SizeInBytes (SizeInBytes, SizeInBytes)
forall (f :: * -> *) k x y. Applicative f => WhenMissing f k x y
Map.dropMissing
((txid
-> SizeInBytes -> SizeInBytes -> Maybe (SizeInBytes, SizeInBytes))
-> SimpleWhenMatched
txid SizeInBytes SizeInBytes (SizeInBytes, SizeInBytes)
forall (f :: * -> *) k x y z.
Applicative f =>
(k -> x -> y -> Maybe z) -> WhenMatched f k x y z
Map.zipWithMaybeMatched \txid
_ SizeInBytes
receivedSize SizeInBytes
advertisedSize ->
if SizeInBytes
receivedSize SizeInBytes -> SizeInBytes -> Bool
`checkTxSize` SizeInBytes
advertisedSize
then Maybe (SizeInBytes, SizeInBytes)
forall a. Maybe a
Nothing
else (SizeInBytes, SizeInBytes) -> Maybe (SizeInBytes, SizeInBytes)
forall a. a -> Maybe a
Just (SizeInBytes
receivedSize, SizeInBytes
advertisedSize)
)
(tx -> SizeInBytes
txSize (tx -> SizeInBytes) -> Map txid tx -> Map txid SizeInBytes
forall a b k. (a -> b) -> Map k a -> Map k b
`Map.map` Map txid tx
receivedTxs)
Map txid SizeInBytes
requestedTxIdsMap
checkTxSize :: SizeInBytes
-> SizeInBytes
-> Bool
checkTxSize :: SizeInBytes -> SizeInBytes -> Bool
checkTxSize SizeInBytes
received SizeInBytes
advertised
| SizeInBytes
received SizeInBytes -> SizeInBytes -> Bool
forall a. Ord a => a -> a -> Bool
> SizeInBytes
advertised
= SizeInBytes
received SizeInBytes -> SizeInBytes -> SizeInBytes
forall a. Num a => a -> a -> a
- SizeInBytes
advertised SizeInBytes -> SizeInBytes -> Bool
forall a. Ord a => a -> a -> Bool
<= SizeInBytes
const_MAX_TX_SIZE_DISCREPENCY
| Bool
otherwise
= SizeInBytes
advertised SizeInBytes -> SizeInBytes -> SizeInBytes
forall a. Num a => a -> a -> a
- SizeInBytes
received SizeInBytes -> SizeInBytes -> Bool
forall a. Ord a => a -> a -> Bool
<= SizeInBytes
const_MAX_TX_SIZE_DISCREPENCY
requestedTxIds :: Set txid
requestedTxIds = Map txid SizeInBytes -> Set txid
forall k a. Map k a -> Set k
Map.keysSet Map txid SizeInBytes
requestedTxIdsMap
notReceived :: Set txid
notReceived = Set txid
requestedTxIds Set txid -> Set txid -> Set txid
forall a. Ord a => Set a -> Set a -> Set a
Set.\\ Map txid tx -> Set txid
forall k a. Map k a -> Set k
Map.keysSet Map txid tx
receivedTxs
downloadedTxs' :: Map txid tx
downloadedTxs' = PeerTxState txid tx -> Map txid tx
forall txid tx. PeerTxState txid tx -> Map txid tx
downloadedTxs PeerTxState txid tx
ps Map txid tx -> Map txid tx -> Map txid tx
forall a. Semigroup a => a -> a -> a
<> Map txid tx
receivedTxs
unknownTxs' :: Set txid
unknownTxs' = PeerTxState txid tx -> Set txid
forall txid tx. PeerTxState txid tx -> Set txid
unknownTxs PeerTxState txid tx
ps Set txid -> Set txid -> Set txid
forall a. Semigroup a => a -> a -> a
<> Set txid
notReceived
requestedTxsInflight' :: Set txid
requestedTxsInflight' =
Bool -> Set txid -> Set txid
forall a. HasCallStack => Bool -> a -> a
assert (Set txid
requestedTxIds Set txid -> Set txid -> Bool
forall a. Ord a => Set a -> Set a -> Bool
`Set.isSubsetOf` PeerTxState txid tx -> Set txid
forall txid tx. PeerTxState txid tx -> Set txid
requestedTxsInflight PeerTxState txid tx
ps) (Set txid -> Set txid) -> Set txid -> Set txid
forall a b. (a -> b) -> a -> b
$
PeerTxState txid tx -> Set txid
forall txid tx. PeerTxState txid tx -> Set txid
requestedTxsInflight PeerTxState txid tx
ps Set txid -> Set txid -> Set txid
forall a. Ord a => Set a -> Set a -> Set a
Set.\\ Set txid
requestedTxIds
requestedSize :: SizeInBytes
requestedSize = Map txid SizeInBytes -> SizeInBytes
forall m. Monoid m => Map txid m -> m
forall (t :: * -> *) m. (Foldable t, Monoid m) => t m -> m
fold (Map txid SizeInBytes -> SizeInBytes)
-> Map txid SizeInBytes -> SizeInBytes
forall a b. (a -> b) -> a -> b
$ PeerTxState txid tx -> Map txid SizeInBytes
forall txid tx. PeerTxState txid tx -> Map txid SizeInBytes
availableTxIds PeerTxState txid tx
ps Map txid SizeInBytes -> Set txid -> Map txid SizeInBytes
forall k a. Ord k => Map k a -> Set k -> Map k a
`Map.restrictKeys` Set txid
requestedTxIds
requestedTxsInflightSize' :: SizeInBytes
requestedTxsInflightSize' =
Bool -> SizeInBytes -> SizeInBytes
forall a. HasCallStack => Bool -> a -> a
assert (PeerTxState txid tx -> SizeInBytes
forall txid tx. PeerTxState txid tx -> SizeInBytes
requestedTxsInflightSize PeerTxState txid tx
ps SizeInBytes -> SizeInBytes -> Bool
forall a. Ord a => a -> a -> Bool
>= SizeInBytes
requestedSize) (SizeInBytes -> SizeInBytes) -> SizeInBytes -> SizeInBytes
forall a b. (a -> b) -> a -> b
$
PeerTxState txid tx -> SizeInBytes
forall txid tx. PeerTxState txid tx -> SizeInBytes
requestedTxsInflightSize PeerTxState txid tx
ps SizeInBytes -> SizeInBytes -> SizeInBytes
forall a. Num a => a -> a -> a
- SizeInBytes
requestedSize
inflightTxs'' :: Map txid Int
inflightTxs'' =
SimpleWhenMissing txid Int Int
-> SimpleWhenMissing txid Int Int
-> SimpleWhenMatched txid Int Int Int
-> Map txid Int
-> Map txid Int
-> Map txid Int
forall k a c b.
Ord k =>
SimpleWhenMissing k a c
-> SimpleWhenMissing k b c
-> SimpleWhenMatched k a b c
-> Map k a
-> Map k b
-> Map k c
Map.merge
((txid -> Int -> Maybe Int) -> SimpleWhenMissing txid Int Int
forall (f :: * -> *) k x y.
Applicative f =>
(k -> x -> Maybe y) -> WhenMissing f k x y
Map.mapMaybeMissing \txid
_ Int
x -> Int -> Maybe Int
forall a. a -> Maybe a
Just Int
x)
((txid -> Int -> Maybe Int) -> SimpleWhenMissing txid Int Int
forall (f :: * -> *) k x y.
Applicative f =>
(k -> x -> Maybe y) -> WhenMissing f k x y
Map.mapMaybeMissing \txid
_ Int
_ -> Bool -> Maybe Int -> Maybe Int
forall a. HasCallStack => Bool -> a -> a
assert Bool
False Maybe Int
forall a. Maybe a
Nothing)
((txid -> Int -> Int -> Maybe Int)
-> SimpleWhenMatched txid Int Int Int
forall (f :: * -> *) k x y z.
Applicative f =>
(k -> x -> y -> Maybe z) -> WhenMatched f k x y z
Map.zipWithMaybeMatched \txid
_ Int
x Int
y -> Bool -> Maybe Int -> Maybe Int
forall a. HasCallStack => Bool -> a -> a
assert (Int
x Int -> Int -> Bool
forall a. Ord a => a -> a -> Bool
>= Int
y)
let z :: Int
z = Int
x Int -> Int -> Int
forall a. Num a => a -> a -> a
- Int
y in
if Int
z Int -> Int -> Bool
forall a. Ord a => a -> a -> Bool
> Int
0
then Int -> Maybe Int
forall a. a -> Maybe a
Just Int
z
else Maybe Int
forall a. Maybe a
Nothing)
(SharedTxState peeraddr txid tx -> Map txid Int
forall peeraddr txid tx.
SharedTxState peeraddr txid tx -> Map txid Int
inflightTxs SharedTxState peeraddr txid tx
st)
((txid -> Int) -> Set txid -> Map txid Int
forall k a. (k -> a) -> Set k -> Map k a
Map.fromSet (Int -> txid -> Int
forall a b. a -> b -> a
const Int
1) Set txid
requestedTxIds)
inflightTxsSize'' :: SizeInBytes
inflightTxsSize'' = Bool -> SizeInBytes -> SizeInBytes
forall a. HasCallStack => Bool -> a -> a
assert (SharedTxState peeraddr txid tx -> SizeInBytes
forall peeraddr txid tx.
SharedTxState peeraddr txid tx -> SizeInBytes
inflightTxsSize SharedTxState peeraddr txid tx
st SizeInBytes -> SizeInBytes -> Bool
forall a. Ord a => a -> a -> Bool
>= SizeInBytes
requestedSize) (SizeInBytes -> SizeInBytes) -> SizeInBytes -> SizeInBytes
forall a b. (a -> b) -> a -> b
$
SharedTxState peeraddr txid tx -> SizeInBytes
forall peeraddr txid tx.
SharedTxState peeraddr txid tx -> SizeInBytes
inflightTxsSize SharedTxState peeraddr txid tx
st SizeInBytes -> SizeInBytes -> SizeInBytes
forall a. Num a => a -> a -> a
- SizeInBytes
requestedSize
st' :: SharedTxState peeraddr txid tx
st' = SharedTxState peeraddr txid tx
st { inflightTxs = inflightTxs'',
inflightTxsSize = inflightTxsSize''
}
availableTxIds'' :: Map txid SizeInBytes
availableTxIds'' = PeerTxState txid tx -> Map txid SizeInBytes
forall txid tx. PeerTxState txid tx -> Map txid SizeInBytes
availableTxIds PeerTxState txid tx
ps
Map txid SizeInBytes -> Set txid -> Map txid SizeInBytes
forall k a. Ord k => Map k a -> Set k -> Map k a
`Map.withoutKeys`
Set txid
requestedTxIds
unknownTxs'' :: Set txid
unknownTxs'' = Set txid
unknownTxs'
Set txid -> Set txid -> Set txid
forall a. Ord a => Set a -> Set a -> Set a
`Set.intersection`
Set txid
live
where
live :: Set txid
live = [txid] -> Set txid
forall a. Ord a => [a] -> Set a
Set.fromList (StrictSeq txid -> [txid]
forall a. StrictSeq a -> [a]
forall (t :: * -> *) a. Foldable t => t a -> [a]
toList (PeerTxState txid tx -> StrictSeq txid
forall txid tx. PeerTxState txid tx -> StrictSeq txid
unacknowledgedTxIds PeerTxState txid tx
ps))
ps'' :: PeerTxState txid tx
ps'' = PeerTxState txid tx
ps { availableTxIds = availableTxIds'',
unknownTxs = unknownTxs'',
requestedTxsInflightSize = requestedTxsInflightSize',
requestedTxsInflight = requestedTxsInflight',
downloadedTxs = downloadedTxs' }
type SharedTxStateVar m peeraddr txid tx = StrictTVar m (SharedTxState peeraddr txid tx)
newSharedTxStateVar :: MonadSTM m
=> StdGen
-> m (SharedTxStateVar m peeraddr txid tx)
newSharedTxStateVar :: forall (m :: * -> *) peeraddr txid tx.
MonadSTM m =>
StdGen -> m (SharedTxStateVar m peeraddr txid tx)
newSharedTxStateVar StdGen
rng = SharedTxState peeraddr txid tx
-> m (StrictTVar m (SharedTxState peeraddr txid tx))
forall (m :: * -> *) a. MonadSTM m => a -> m (StrictTVar m a)
newTVarIO SharedTxState {
peerTxStates :: Map peeraddr (PeerTxState txid tx)
peerTxStates = Map peeraddr (PeerTxState txid tx)
forall k a. Map k a
Map.empty,
inflightTxs :: Map txid Int
inflightTxs = Map txid Int
forall k a. Map k a
Map.empty,
inflightTxsSize :: SizeInBytes
inflightTxsSize = SizeInBytes
0,
bufferedTxs :: Map txid (Maybe tx)
bufferedTxs = Map txid (Maybe tx)
forall k a. Map k a
Map.empty,
referenceCounts :: Map txid Int
referenceCounts = Map txid Int
forall k a. Map k a
Map.empty,
timedTxs :: Map Time [txid]
timedTxs = Map Time [txid]
forall k a. Map k a
Map.empty,
inSubmissionToMempoolTxs :: Map txid Int
inSubmissionToMempoolTxs = Map txid Int
forall k a. Map k a
Map.empty,
peerRng :: StdGen
peerRng = StdGen
rng
}
receivedTxIds
:: forall m peeraddr idx tx txid.
(MonadSTM m, Ord txid, Ord peeraddr)
=> Tracer m (TraceTxLogic peeraddr txid tx)
-> SharedTxStateVar m peeraddr txid tx
-> STM m (MempoolSnapshot txid tx idx)
-> peeraddr
-> NumTxIdsToReq
-> StrictSeq txid
-> Map txid SizeInBytes
-> m ()
receivedTxIds :: forall (m :: * -> *) peeraddr idx tx txid.
(MonadSTM m, Ord txid, Ord peeraddr) =>
Tracer m (TraceTxLogic peeraddr txid tx)
-> SharedTxStateVar m peeraddr txid tx
-> STM m (MempoolSnapshot txid tx idx)
-> peeraddr
-> NumTxIdsToReq
-> StrictSeq txid
-> Map txid SizeInBytes
-> m ()
receivedTxIds Tracer m (TraceTxLogic peeraddr txid tx)
tracer SharedTxStateVar m peeraddr txid tx
sharedVar STM m (MempoolSnapshot txid tx idx)
getMempoolSnapshot peeraddr
peeraddr NumTxIdsToReq
reqNo StrictSeq txid
txidsSeq Map txid SizeInBytes
txidsMap = do
st <- STM m (SharedTxState peeraddr txid tx)
-> m (SharedTxState peeraddr txid tx)
forall a. HasCallStack => STM m a -> m a
forall (m :: * -> *) a.
(MonadSTM m, HasCallStack) =>
STM m a -> m a
atomically (STM m (SharedTxState peeraddr txid tx)
-> m (SharedTxState peeraddr txid tx))
-> STM m (SharedTxState peeraddr txid tx)
-> m (SharedTxState peeraddr txid tx)
forall a b. (a -> b) -> a -> b
$ do
MempoolSnapshot{mempoolHasTx} <- STM m (MempoolSnapshot txid tx idx)
getMempoolSnapshot
stateTVar sharedVar ((\SharedTxState peeraddr txid tx
a -> (SharedTxState peeraddr txid tx
a,SharedTxState peeraddr txid tx
a)) . receivedTxIdsImpl mempoolHasTx peeraddr reqNo txidsSeq txidsMap)
traceWith tracer (TraceSharedTxState "receivedTxIds" st)
collectTxs
:: forall m peeraddr tx txid.
(MonadSTM m, Ord txid, Ord peeraddr,
Show txid, Typeable txid)
=> Tracer m (TraceTxLogic peeraddr txid tx)
-> (tx -> SizeInBytes)
-> SharedTxStateVar m peeraddr txid tx
-> peeraddr
-> Map txid SizeInBytes
-> Map txid tx
-> m (Maybe TxSubmissionProtocolError)
collectTxs :: forall (m :: * -> *) peeraddr tx txid.
(MonadSTM m, Ord txid, Ord peeraddr, Show txid, Typeable txid) =>
Tracer m (TraceTxLogic peeraddr txid tx)
-> (tx -> SizeInBytes)
-> SharedTxStateVar m peeraddr txid tx
-> peeraddr
-> Map txid SizeInBytes
-> Map txid tx
-> m (Maybe TxSubmissionProtocolError)
collectTxs Tracer m (TraceTxLogic peeraddr txid tx)
tracer tx -> SizeInBytes
txSize SharedTxStateVar m peeraddr txid tx
sharedVar peeraddr
peeraddr Map txid SizeInBytes
txidsRequested Map txid tx
txsMap = do
r <- STM
m
(Either TxSubmissionProtocolError (SharedTxState peeraddr txid tx))
-> m (Either
TxSubmissionProtocolError (SharedTxState peeraddr txid tx))
forall a. HasCallStack => STM m a -> m a
forall (m :: * -> *) a.
(MonadSTM m, HasCallStack) =>
STM m a -> m a
atomically (STM
m
(Either TxSubmissionProtocolError (SharedTxState peeraddr txid tx))
-> m (Either
TxSubmissionProtocolError (SharedTxState peeraddr txid tx)))
-> STM
m
(Either TxSubmissionProtocolError (SharedTxState peeraddr txid tx))
-> m (Either
TxSubmissionProtocolError (SharedTxState peeraddr txid tx))
forall a b. (a -> b) -> a -> b
$ do
st <- SharedTxStateVar m peeraddr txid tx
-> STM m (SharedTxState peeraddr txid tx)
forall (m :: * -> *) a. MonadSTM m => StrictTVar m a -> STM m a
readTVar SharedTxStateVar m peeraddr txid tx
sharedVar
case collectTxsImpl txSize peeraddr txidsRequested txsMap st of
r :: Either TxSubmissionProtocolError (SharedTxState peeraddr txid tx)
r@(Right SharedTxState peeraddr txid tx
st') -> SharedTxStateVar m peeraddr txid tx
-> SharedTxState peeraddr txid tx -> STM m ()
forall (m :: * -> *) a.
MonadSTM m =>
StrictTVar m a -> a -> STM m ()
writeTVar SharedTxStateVar m peeraddr txid tx
sharedVar SharedTxState peeraddr txid tx
st'
STM m ()
-> Either
TxSubmissionProtocolError (SharedTxState peeraddr txid tx)
-> STM
m
(Either TxSubmissionProtocolError (SharedTxState peeraddr txid tx))
forall (f :: * -> *) a b. Functor f => f a -> b -> f b
$> Either TxSubmissionProtocolError (SharedTxState peeraddr txid tx)
r
r :: Either TxSubmissionProtocolError (SharedTxState peeraddr txid tx)
r@Left {} -> Either TxSubmissionProtocolError (SharedTxState peeraddr txid tx)
-> STM
m
(Either TxSubmissionProtocolError (SharedTxState peeraddr txid tx))
forall a. a -> STM m a
forall (f :: * -> *) a. Applicative f => a -> f a
pure Either TxSubmissionProtocolError (SharedTxState peeraddr txid tx)
r
case r of
Right SharedTxState peeraddr txid tx
st -> Tracer m (TraceTxLogic peeraddr txid tx)
-> TraceTxLogic peeraddr txid tx -> m ()
forall (m :: * -> *) a. Tracer m a -> a -> m ()
traceWith Tracer m (TraceTxLogic peeraddr txid tx)
tracer (String
-> SharedTxState peeraddr txid tx -> TraceTxLogic peeraddr txid tx
forall peeraddr txid tx.
String
-> SharedTxState peeraddr txid tx -> TraceTxLogic peeraddr txid tx
TraceSharedTxState String
"collectTxs" SharedTxState peeraddr txid tx
st)
m ()
-> Maybe TxSubmissionProtocolError
-> m (Maybe TxSubmissionProtocolError)
forall (f :: * -> *) a b. Functor f => f a -> b -> f b
$> Maybe TxSubmissionProtocolError
forall a. Maybe a
Nothing
Left TxSubmissionProtocolError
e -> Maybe TxSubmissionProtocolError
-> m (Maybe TxSubmissionProtocolError)
forall a. a -> m a
forall (m :: * -> *) a. Monad m => a -> m a
return (TxSubmissionProtocolError -> Maybe TxSubmissionProtocolError
forall a. a -> Maybe a
Just TxSubmissionProtocolError
e)