{-# LANGUAGE BlockArguments      #-}
{-# LANGUAGE LambdaCase          #-}
{-# LANGUAGE NamedFieldPuns      #-}
{-# LANGUAGE RankNTypes          #-}
{-# LANGUAGE ScopedTypeVariables #-}

module Ouroboros.Network.TxSubmission.Inbound.V2.State
  ( -- * Core API
    SharedTxState (..)
  , PeerTxState (..)
  , SharedTxStateVar
  , newSharedTxStateVar
  , receivedTxIds
  , collectTxs
  , acknowledgeTxIds
  , splitAcknowledgedTxIds
  , tickTimedTxs
  , const_MAX_TX_SIZE_DISCREPENCY
    -- * Internals, only exported for testing purposes:
  , RefCountDiff (..)
  , updateRefCounts
  , receivedTxIdsImpl
  , collectTxsImpl
  ) where

import Control.Concurrent.Class.MonadSTM.Strict
import Control.Exception (assert)
import Control.Monad.Class.MonadTime.SI
import Control.Tracer (Tracer, traceWith)

import Data.Foldable (fold, toList)
import Data.Foldable qualified as Foldable
import Data.Functor (($>))
import Data.Map.Merge.Strict qualified as Map
import Data.Map.Strict (Map)
import Data.Map.Strict qualified as Map
import Data.Maybe (fromJust, maybeToList)
import Data.Sequence.Strict (StrictSeq)
import Data.Sequence.Strict qualified as StrictSeq
import Data.Set qualified as Set
import Data.Typeable (Typeable)
import GHC.Stack (HasCallStack)
import System.Random (StdGen)

import Ouroboros.Network.Protocol.TxSubmission2.Type (NumTxIdsToAck (..))
import Ouroboros.Network.SizeInBytes (SizeInBytes (..))
import Ouroboros.Network.TxSubmission.Inbound.V2.Policy
import Ouroboros.Network.TxSubmission.Inbound.V2.Types
import Ouroboros.Network.TxSubmission.Mempool.Reader (MempoolSnapshot (..))


--
-- Pure public API
--

acknowledgeTxIds
    :: forall peeraddr tx txid.
       Ord txid
    => HasCallStack
    => TxDecisionPolicy
    -> SharedTxState peeraddr txid tx
    -> PeerTxState txid tx
    -> ( NumTxIdsToAck
       , NumTxIdsToReq
       , TxsToMempool txid tx
       , RefCountDiff txid
       , PeerTxState txid tx
       )
    -- ^ number of txid to acknowledge, requests, txs which we can submit to the
    -- mempool, txids to acknowledge with multiplicities, updated PeerTxState.
{-# INLINE acknowledgeTxIds #-}

acknowledgeTxIds :: forall peeraddr tx txid.
(Ord txid, HasCallStack) =>
TxDecisionPolicy
-> SharedTxState peeraddr txid tx
-> PeerTxState txid tx
-> (NumTxIdsToAck, NumTxIdsToReq, TxsToMempool txid tx,
    RefCountDiff txid, PeerTxState txid tx)
acknowledgeTxIds
    TxDecisionPolicy
policy
    SharedTxState peeraddr txid tx
sharedTxState
    ps :: PeerTxState txid tx
ps@PeerTxState { Map txid SizeInBytes
availableTxIds :: Map txid SizeInBytes
availableTxIds :: forall txid tx. PeerTxState txid tx -> Map txid SizeInBytes
availableTxIds,
                     Set txid
unknownTxs :: Set txid
unknownTxs :: forall txid tx. PeerTxState txid tx -> Set txid
unknownTxs,
                     NumTxIdsToReq
requestedTxIdsInflight :: NumTxIdsToReq
requestedTxIdsInflight :: forall txid tx. PeerTxState txid tx -> NumTxIdsToReq
requestedTxIdsInflight,
                     Map txid tx
downloadedTxs :: Map txid tx
downloadedTxs :: forall txid tx. PeerTxState txid tx -> Map txid tx
downloadedTxs,
                     Double
score :: Double
score :: forall txid tx. PeerTxState txid tx -> Double
score,
                     Map txid tx
toMempoolTxs :: Map txid tx
toMempoolTxs :: forall txid tx. PeerTxState txid tx -> Map txid tx
toMempoolTxs
                   }
    =
    -- We can only acknowledge txids when we can request new ones, since
    -- a `MsgRequestTxIds` for 0 txids is a protocol error.
    if NumTxIdsToReq
txIdsToRequest NumTxIdsToReq -> NumTxIdsToReq -> Bool
forall a. Ord a => a -> a -> Bool
> NumTxIdsToReq
0
      then
      ( NumTxIdsToAck
txIdsToAcknowledge
      , NumTxIdsToReq
txIdsToRequest
      , [(txid, tx)] -> TxsToMempool txid tx
forall txid tx. [(txid, tx)] -> TxsToMempool txid tx
TxsToMempool [(txid, tx)]
txsToMempool
      , RefCountDiff txid
refCountDiff
      , PeerTxState txid tx
ps { unacknowledgedTxIds    = unacknowledgedTxIds',
             availableTxIds         = availableTxIds',
             unknownTxs             = unknownTxs',
             requestedTxIdsInflight = requestedTxIdsInflight
                                    + txIdsToRequest,
             downloadedTxs          = downloadedTxs',
             score                  = score',
             toMempoolTxs           = toMempoolTxs' }
      )
      else
      ( NumTxIdsToAck
0
      , NumTxIdsToReq
0
      , [(txid, tx)] -> TxsToMempool txid tx
forall txid tx. [(txid, tx)] -> TxsToMempool txid tx
TxsToMempool [(txid, tx)]
txsToMempool
      , Map txid Int -> RefCountDiff txid
forall txid. Map txid Int -> RefCountDiff txid
RefCountDiff Map txid Int
forall k a. Map k a
Map.empty
      , PeerTxState txid tx
ps { toMempoolTxs = toMempoolTxs' }
      )
  where
    -- Split `unacknowledgedTxIds'` into the longest prefix of `txid`s which
    -- can be acknowledged and the unacknowledged `txid`s.
    (NumTxIdsToReq
txIdsToRequest, StrictSeq txid
acknowledgedTxIds, StrictSeq txid
unacknowledgedTxIds')
      = TxDecisionPolicy
-> SharedTxState peeraddr txid tx
-> PeerTxState txid tx
-> (NumTxIdsToReq, StrictSeq txid, StrictSeq txid)
forall txid peer tx.
(Ord txid, HasCallStack) =>
TxDecisionPolicy
-> SharedTxState peer txid tx
-> PeerTxState txid tx
-> (NumTxIdsToReq, StrictSeq txid, StrictSeq txid)
splitAcknowledgedTxIds TxDecisionPolicy
policy SharedTxState peeraddr txid tx
sharedTxState PeerTxState txid tx
ps

    txsToMempool :: [(txid, tx)]
txsToMempool = [ (txid
txid, tx
tx)
                   | txid
txid <- StrictSeq txid -> [txid]
forall a. StrictSeq a -> [a]
forall (t :: * -> *) a. Foldable t => t a -> [a]
toList StrictSeq txid
toMempoolTxIds
                   , txid
txid txid -> Map txid (Maybe tx) -> Bool
forall k a. Ord k => k -> Map k a -> Bool
`Map.notMember` SharedTxState peeraddr txid tx -> Map txid (Maybe tx)
forall peeraddr txid tx.
SharedTxState peeraddr txid tx -> Map txid (Maybe tx)
bufferedTxs SharedTxState peeraddr txid tx
sharedTxState
                   , tx
tx <- Maybe tx -> [tx]
forall a. Maybe a -> [a]
maybeToList (Maybe tx -> [tx]) -> Maybe tx -> [tx]
forall a b. (a -> b) -> a -> b
$ txid
txid txid -> Map txid tx -> Maybe tx
forall k a. Ord k => k -> Map k a -> Maybe a
`Map.lookup` Map txid tx
downloadedTxs
                   ]
    (StrictSeq txid
toMempoolTxIds, StrictSeq txid
_) =
      (txid -> Bool)
-> StrictSeq txid -> (StrictSeq txid, StrictSeq txid)
forall a. (a -> Bool) -> StrictSeq a -> (StrictSeq a, StrictSeq a)
StrictSeq.spanl (txid -> Map txid tx -> Bool
forall k a. Ord k => k -> Map k a -> Bool
`Map.member` Map txid tx
downloadedTxs) StrictSeq txid
acknowledgedTxIds


    txsToMempoolMap :: Map txid tx
txsToMempoolMap = [(txid, tx)] -> Map txid tx
forall k a. Ord k => [(k, a)] -> Map k a
Map.fromList [(txid, tx)]
txsToMempool

    toMempoolTxs' :: Map txid tx
toMempoolTxs' = Map txid tx
toMempoolTxs Map txid tx -> Map txid tx -> Map txid tx
forall a. Semigroup a => a -> a -> a
<> Map txid tx
txsToMempoolMap

    (Map txid tx
downloadedTxs', Map txid tx
ackedDownloadedTxs) = (txid -> tx -> Bool) -> Map txid tx -> (Map txid tx, Map txid tx)
forall k a. (k -> a -> Bool) -> Map k a -> (Map k a, Map k a)
Map.partitionWithKey (\txid
txid tx
_ -> txid
txid txid -> Set txid -> Bool
forall a. Ord a => a -> Set a -> Bool
`Set.member` Set txid
liveSet) Map txid tx
downloadedTxs
    -- latexTxs: transactions which were downloaded by another peer before we
    -- downloaded them; it relies on that `txToMempool` filters out
    -- `bufferedTxs`.
    lateTxs :: Map txid tx
lateTxs = (txid -> tx -> Bool) -> Map txid tx -> Map txid tx
forall k a. (k -> a -> Bool) -> Map k a -> Map k a
Map.filterWithKey (\txid
txid tx
_ -> txid
txid txid -> Map txid tx -> Bool
forall k a. Ord k => k -> Map k a -> Bool
`Map.notMember` Map txid tx
txsToMempoolMap) Map txid tx
ackedDownloadedTxs
    score' :: Double
score' = Double
score Double -> Double -> Double
forall a. Num a => a -> a -> a
+ Int -> Double
forall a b. (Integral a, Num b) => a -> b
fromIntegral (Map txid tx -> Int
forall k a. Map k a -> Int
Map.size Map txid tx
lateTxs)

    -- the set of live `txids`
    liveSet :: Set txid
liveSet  = [txid] -> Set txid
forall a. Ord a => [a] -> Set a
Set.fromList (StrictSeq txid -> [txid]
forall a. StrictSeq a -> [a]
forall (t :: * -> *) a. Foldable t => t a -> [a]
toList StrictSeq txid
unacknowledgedTxIds')

    availableTxIds' :: Map txid SizeInBytes
availableTxIds' = Map txid SizeInBytes
availableTxIds
                      Map txid SizeInBytes -> Set txid -> Map txid SizeInBytes
forall k a. Ord k => Map k a -> Set k -> Map k a
`Map.restrictKeys`
                      Set txid
liveSet

    -- We remove all acknowledged `txid`s which are not in
    -- `unacknowledgedTxIds''`, but also return the unknown set before any
    -- modifications (which is used to compute `unacknowledgedTxIds''`
    -- above).
    unknownTxs' :: Set txid
unknownTxs' = Set txid
unknownTxs Set txid -> Set txid -> Set txid
forall a. Ord a => Set a -> Set a -> Set a
`Set.intersection` Set txid
liveSet

    refCountDiff :: RefCountDiff txid
refCountDiff = Map txid Int -> RefCountDiff txid
forall txid. Map txid Int -> RefCountDiff txid
RefCountDiff
                 (Map txid Int -> RefCountDiff txid)
-> Map txid Int -> RefCountDiff txid
forall a b. (a -> b) -> a -> b
$ (txid -> Map txid Int -> Map txid Int)
-> Map txid Int -> StrictSeq txid -> Map txid Int
forall a b. (a -> b -> b) -> b -> StrictSeq a -> b
forall (t :: * -> *) a b.
Foldable t =>
(a -> b -> b) -> b -> t a -> b
foldr ((Maybe Int -> Maybe Int) -> txid -> Map txid Int -> Map txid Int
forall k a.
Ord k =>
(Maybe a -> Maybe a) -> k -> Map k a -> Map k a
Map.alter Maybe Int -> Maybe Int
fn)
                         Map txid Int
forall k a. Map k a
Map.empty StrictSeq txid
acknowledgedTxIds
      where
        fn :: Maybe Int -> Maybe Int
        fn :: Maybe Int -> Maybe Int
fn Maybe Int
Nothing  = Int -> Maybe Int
forall a. a -> Maybe a
Just Int
1
        fn (Just Int
n) = Int -> Maybe Int
forall a. a -> Maybe a
Just (Int -> Maybe Int) -> Int -> Maybe Int
forall a b. (a -> b) -> a -> b
$! Int
n Int -> Int -> Int
forall a. Num a => a -> a -> a
+ Int
1

    txIdsToAcknowledge :: NumTxIdsToAck
    txIdsToAcknowledge :: NumTxIdsToAck
txIdsToAcknowledge = Int -> NumTxIdsToAck
forall a b. (Integral a, Num b) => a -> b
fromIntegral (Int -> NumTxIdsToAck) -> Int -> NumTxIdsToAck
forall a b. (a -> b) -> a -> b
$ StrictSeq txid -> Int
forall a. StrictSeq a -> Int
StrictSeq.length StrictSeq txid
acknowledgedTxIds


-- | Split unacknowledged txids into acknowledged and unacknowledged parts, also
-- return number of txids which can be requested.
--
splitAcknowledgedTxIds
  :: Ord txid
  => HasCallStack
  => TxDecisionPolicy
  -> SharedTxState peer txid tx
  -> PeerTxState  txid tx
  -> (NumTxIdsToReq, StrictSeq.StrictSeq txid, StrictSeq.StrictSeq txid)
  -- ^ number of txids to request, acknowledged txids, unacknowledged txids
splitAcknowledgedTxIds :: forall txid peer tx.
(Ord txid, HasCallStack) =>
TxDecisionPolicy
-> SharedTxState peer txid tx
-> PeerTxState txid tx
-> (NumTxIdsToReq, StrictSeq txid, StrictSeq txid)
splitAcknowledgedTxIds
    TxDecisionPolicy {
      NumTxIdsToReq
maxUnacknowledgedTxIds :: NumTxIdsToReq
maxUnacknowledgedTxIds :: TxDecisionPolicy -> NumTxIdsToReq
maxUnacknowledgedTxIds,
      NumTxIdsToReq
maxNumTxIdsToRequest :: NumTxIdsToReq
maxNumTxIdsToRequest :: TxDecisionPolicy -> NumTxIdsToReq
maxNumTxIdsToRequest
    }
    SharedTxState {
      Map txid (Maybe tx)
bufferedTxs :: forall peeraddr txid tx.
SharedTxState peeraddr txid tx -> Map txid (Maybe tx)
bufferedTxs :: Map txid (Maybe tx)
bufferedTxs
    }
    PeerTxState {
      StrictSeq txid
unacknowledgedTxIds :: forall txid tx. PeerTxState txid tx -> StrictSeq txid
unacknowledgedTxIds :: StrictSeq txid
unacknowledgedTxIds,
      Set txid
unknownTxs :: forall txid tx. PeerTxState txid tx -> Set txid
unknownTxs :: Set txid
unknownTxs,
      Map txid tx
downloadedTxs :: forall txid tx. PeerTxState txid tx -> Map txid tx
downloadedTxs :: Map txid tx
downloadedTxs,
      Set txid
requestedTxsInflight :: Set txid
requestedTxsInflight :: forall txid tx. PeerTxState txid tx -> Set txid
requestedTxsInflight,
      NumTxIdsToReq
requestedTxIdsInflight :: forall txid tx. PeerTxState txid tx -> NumTxIdsToReq
requestedTxIdsInflight :: NumTxIdsToReq
requestedTxIdsInflight
    }
    =
    (NumTxIdsToReq
txIdsToRequest, StrictSeq txid
acknowledgedTxIds', StrictSeq txid
unacknowledgedTxIds')
  where
    (StrictSeq txid
acknowledgedTxIds', StrictSeq txid
unacknowledgedTxIds')
      = (txid -> Bool)
-> StrictSeq txid -> (StrictSeq txid, StrictSeq txid)
forall a. (a -> Bool) -> StrictSeq a -> (StrictSeq a, StrictSeq a)
StrictSeq.spanl (\txid
txid -> (txid
txid txid -> Map txid (Maybe tx) -> Bool
forall k a. Ord k => k -> Map k a -> Bool
`Map.member` Map txid (Maybe tx)
bufferedTxs
                         Bool -> Bool -> Bool
|| txid
txid txid -> Set txid -> Bool
forall a. Ord a => a -> Set a -> Bool
`Set.member` Set txid
unknownTxs
                         Bool -> Bool -> Bool
|| txid
txid txid -> Map txid tx -> Bool
forall k a. Ord k => k -> Map k a -> Bool
`Map.member` Map txid tx
downloadedTxs)
                         Bool -> Bool -> Bool
&& txid
txid txid -> Set txid -> Bool
forall a. Ord a => a -> Set a -> Bool
`Set.notMember` Set txid
requestedTxsInflight
                        )
                        StrictSeq txid
unacknowledgedTxIds
    numOfUnacked :: Int
numOfUnacked = StrictSeq txid -> Int
forall a. StrictSeq a -> Int
StrictSeq.length StrictSeq txid
unacknowledgedTxIds
    numOfAcked :: Int
numOfAcked   = StrictSeq txid -> Int
forall a. StrictSeq a -> Int
StrictSeq.length StrictSeq txid
acknowledgedTxIds'
    unackedAndRequested :: NumTxIdsToReq
unackedAndRequested = Int -> NumTxIdsToReq
forall a b. (Integral a, Num b) => a -> b
fromIntegral Int
numOfUnacked NumTxIdsToReq -> NumTxIdsToReq -> NumTxIdsToReq
forall a. Num a => a -> a -> a
+ NumTxIdsToReq
requestedTxIdsInflight

    txIdsToRequest :: NumTxIdsToReq
txIdsToRequest =
        Bool -> NumTxIdsToReq -> NumTxIdsToReq
forall a. HasCallStack => Bool -> a -> a
assert (NumTxIdsToReq
unackedAndRequested NumTxIdsToReq -> NumTxIdsToReq -> Bool
forall a. Ord a => a -> a -> Bool
<= NumTxIdsToReq
maxUnacknowledgedTxIds) (NumTxIdsToReq -> NumTxIdsToReq) -> NumTxIdsToReq -> NumTxIdsToReq
forall a b. (a -> b) -> a -> b
$
        Bool -> NumTxIdsToReq -> NumTxIdsToReq
forall a. HasCallStack => Bool -> a -> a
assert (NumTxIdsToReq
requestedTxIdsInflight NumTxIdsToReq -> NumTxIdsToReq -> Bool
forall a. Ord a => a -> a -> Bool
<= NumTxIdsToReq
maxNumTxIdsToRequest) (NumTxIdsToReq -> NumTxIdsToReq) -> NumTxIdsToReq -> NumTxIdsToReq
forall a b. (a -> b) -> a -> b
$
        (NumTxIdsToReq
maxUnacknowledgedTxIds NumTxIdsToReq -> NumTxIdsToReq -> NumTxIdsToReq
forall a. Num a => a -> a -> a
- NumTxIdsToReq
unackedAndRequested NumTxIdsToReq -> NumTxIdsToReq -> NumTxIdsToReq
forall a. Num a => a -> a -> a
+ Int -> NumTxIdsToReq
forall a b. (Integral a, Num b) => a -> b
fromIntegral Int
numOfAcked)
        NumTxIdsToReq -> NumTxIdsToReq -> NumTxIdsToReq
forall a. Ord a => a -> a -> a
`min`
        (NumTxIdsToReq
maxNumTxIdsToRequest NumTxIdsToReq -> NumTxIdsToReq -> NumTxIdsToReq
forall a. Num a => a -> a -> a
- NumTxIdsToReq
requestedTxIdsInflight)


-- | `RefCountDiff` represents a map of `txid` which can be acknowledged
-- together with their multiplicities.
--
newtype RefCountDiff txid = RefCountDiff {
    forall txid. RefCountDiff txid -> Map txid Int
txIdsToAck :: Map txid Int
  }

updateRefCounts :: Ord txid
                => Map txid Int
                -> RefCountDiff txid
                -> Map txid Int
updateRefCounts :: forall txid.
Ord txid =>
Map txid Int -> RefCountDiff txid -> Map txid Int
updateRefCounts Map txid Int
referenceCounts (RefCountDiff Map txid Int
diff) =
    SimpleWhenMissing txid Int Int
-> SimpleWhenMissing txid Int Int
-> SimpleWhenMatched txid Int Int Int
-> Map txid Int
-> Map txid Int
-> Map txid Int
forall k a c b.
Ord k =>
SimpleWhenMissing k a c
-> SimpleWhenMissing k b c
-> SimpleWhenMatched k a b c
-> Map k a
-> Map k b
-> Map k c
Map.merge ((txid -> Int -> Maybe Int) -> SimpleWhenMissing txid Int Int
forall (f :: * -> *) k x y.
Applicative f =>
(k -> x -> Maybe y) -> WhenMissing f k x y
Map.mapMaybeMissing \txid
_ Int
x -> Int -> Maybe Int
forall a. a -> Maybe a
Just Int
x)
              ((txid -> Int -> Maybe Int) -> SimpleWhenMissing txid Int Int
forall (f :: * -> *) k x y.
Applicative f =>
(k -> x -> Maybe y) -> WhenMissing f k x y
Map.mapMaybeMissing \txid
_ Int
_ -> Maybe Int
forall a. Maybe a
Nothing)
              ((txid -> Int -> Int -> Maybe Int)
-> SimpleWhenMatched txid Int Int Int
forall (f :: * -> *) k x y z.
Applicative f =>
(k -> x -> y -> Maybe z) -> WhenMatched f k x y z
Map.zipWithMaybeMatched \txid
_ Int
x Int
y -> Bool -> Maybe Int -> Maybe Int
forall a. HasCallStack => Bool -> a -> a
assert (Int
x Int -> Int -> Bool
forall a. Ord a => a -> a -> Bool
>= Int
y)
                                                 if Int
x Int -> Int -> Bool
forall a. Ord a => a -> a -> Bool
> Int
y then Int -> Maybe Int
forall a. a -> Maybe a
Just (Int -> Maybe Int) -> Int -> Maybe Int
forall a b. (a -> b) -> a -> b
$! Int
x Int -> Int -> Int
forall a. Num a => a -> a -> a
- Int
y
                                                          else Maybe Int
forall a. Maybe a
Nothing)
              Map txid Int
referenceCounts
              Map txid Int
diff


tickTimedTxs :: forall peeraddr tx txid.
                (Ord txid)
             => Time
             -> SharedTxState peeraddr txid tx
             -> SharedTxState peeraddr txid tx
tickTimedTxs :: forall peeraddr tx txid.
Ord txid =>
Time
-> SharedTxState peeraddr txid tx -> SharedTxState peeraddr txid tx
tickTimedTxs Time
now st :: SharedTxState peeraddr txid tx
st@SharedTxState{ Map Time [txid]
timedTxs :: Map Time [txid]
timedTxs :: forall peeraddr txid tx.
SharedTxState peeraddr txid tx -> Map Time [txid]
timedTxs
                                 , Map txid Int
referenceCounts :: Map txid Int
referenceCounts :: forall peeraddr txid tx.
SharedTxState peeraddr txid tx -> Map txid Int
referenceCounts
                                 , Map txid (Maybe tx)
bufferedTxs :: forall peeraddr txid tx.
SharedTxState peeraddr txid tx -> Map txid (Maybe tx)
bufferedTxs :: Map txid (Maybe tx)
bufferedTxs } =
    let (Map Time [txid]
expiredTxs', Map Time [txid]
timedTxs') =
          case Time
-> Map Time [txid]
-> (Map Time [txid], Maybe [txid], Map Time [txid])
forall k a. Ord k => k -> Map k a -> (Map k a, Maybe a, Map k a)
Map.splitLookup Time
now Map Time [txid]
timedTxs of
            (Map Time [txid]
expired, Just [txid]
txids, Map Time [txid]
timed) ->
              (Map Time [txid]
expired, -- Map.split doesn't include the `now` entry in the map
                        Time -> [txid] -> Map Time [txid] -> Map Time [txid]
forall k a. Ord k => k -> a -> Map k a -> Map k a
Map.insert Time
now [txid]
txids Map Time [txid]
timed)
            (Map Time [txid]
expired, Maybe [txid]
Nothing, Map Time [txid]
timed) ->
              (Map Time [txid]
expired, Map Time [txid]
timed)
        refDiff :: Map txid Int
refDiff = (Map txid Int -> [txid] -> Map txid Int)
-> Map txid Int -> Map Time [txid] -> Map txid Int
forall a b k. (a -> b -> a) -> a -> Map k b -> a
Map.foldl' Map txid Int -> [txid] -> Map txid Int
fn Map txid Int
forall k a. Map k a
Map.empty Map Time [txid]
expiredTxs'
        referenceCounts' :: Map txid Int
referenceCounts' = Map txid Int -> RefCountDiff txid -> Map txid Int
forall txid.
Ord txid =>
Map txid Int -> RefCountDiff txid -> Map txid Int
updateRefCounts Map txid Int
referenceCounts (Map txid Int -> RefCountDiff txid
forall txid. Map txid Int -> RefCountDiff txid
RefCountDiff Map txid Int
refDiff)
        liveSet :: Set txid
liveSet = Map txid Int -> Set txid
forall k a. Map k a -> Set k
Map.keysSet Map txid Int
referenceCounts'
        bufferedTxs' :: Map txid (Maybe tx)
bufferedTxs' = Map txid (Maybe tx)
bufferedTxs Map txid (Maybe tx) -> Set txid -> Map txid (Maybe tx)
forall k a. Ord k => Map k a -> Set k -> Map k a
`Map.restrictKeys` Set txid
liveSet in
    SharedTxState peeraddr txid tx
st { timedTxs        = timedTxs'
       , referenceCounts = referenceCounts'
       , bufferedTxs     = bufferedTxs'
       }
  where
    fn :: Map txid Int
       -> [txid]
       -> Map txid Int
    fn :: Map txid Int -> [txid] -> Map txid Int
fn Map txid Int
m [txid]
txids = (Map txid Int -> txid -> Map txid Int)
-> Map txid Int -> [txid] -> Map txid Int
forall b a. (b -> a -> b) -> b -> [a] -> b
forall (t :: * -> *) b a.
Foldable t =>
(b -> a -> b) -> b -> t a -> b
Foldable.foldl' Map txid Int -> txid -> Map txid Int
gn Map txid Int
m [txid]
txids

    gn :: Map txid Int
       -> txid
       -> Map txid Int
    gn :: Map txid Int -> txid -> Map txid Int
gn Map txid Int
m txid
txid = (Maybe Int -> Maybe Int) -> txid -> Map txid Int -> Map txid Int
forall k a.
Ord k =>
(Maybe a -> Maybe a) -> k -> Map k a -> Map k a
Map.alter Maybe Int -> Maybe Int
af txid
txid Map txid Int
m

    af :: Maybe Int
       -> Maybe Int
    af :: Maybe Int -> Maybe Int
af Maybe Int
Nothing  = Int -> Maybe Int
forall a. a -> Maybe a
Just Int
1
    af (Just Int
n) = Int -> Maybe Int
forall a. a -> Maybe a
Just (Int -> Maybe Int) -> Int -> Maybe Int
forall a b. (a -> b) -> a -> b
$! Int -> Int
forall a. Enum a => a -> a
succ Int
n

--
-- Pure internal API
--

-- | Insert received `txid`s and return the number of txids to be acknowledged
-- and the updated `SharedTxState`.
--
receivedTxIdsImpl
    :: forall peeraddr tx txid.
       (Ord txid, Ord peeraddr, HasCallStack)
    => (txid -> Bool)      -- ^ check if txid is in the mempool, ref
                           -- 'mempoolHasTx'
    -> peeraddr
    -> NumTxIdsToReq
    -- ^ number of requests to subtract from
    -- `requestedTxIdsInflight`

    -> StrictSeq txid
    -- ^ sequence of received `txids`
    -> Map txid SizeInBytes
    -- ^ received `txid`s with sizes

    -> SharedTxState peeraddr txid tx
    -> SharedTxState peeraddr txid tx

receivedTxIdsImpl :: forall peeraddr tx txid.
(Ord txid, Ord peeraddr, HasCallStack) =>
(txid -> Bool)
-> peeraddr
-> NumTxIdsToReq
-> StrictSeq txid
-> Map txid SizeInBytes
-> SharedTxState peeraddr txid tx
-> SharedTxState peeraddr txid tx
receivedTxIdsImpl
    txid -> Bool
mempoolHasTx
    peeraddr
peeraddr NumTxIdsToReq
reqNo StrictSeq txid
txidsSeq Map txid SizeInBytes
txidsMap
    st :: SharedTxState peeraddr txid tx
st@SharedTxState{ Map peeraddr (PeerTxState txid tx)
peerTxStates :: Map peeraddr (PeerTxState txid tx)
peerTxStates :: forall peeraddr txid tx.
SharedTxState peeraddr txid tx
-> Map peeraddr (PeerTxState txid tx)
peerTxStates,
                      Map txid (Maybe tx)
bufferedTxs :: forall peeraddr txid tx.
SharedTxState peeraddr txid tx -> Map txid (Maybe tx)
bufferedTxs :: Map txid (Maybe tx)
bufferedTxs,
                      Map txid Int
referenceCounts :: forall peeraddr txid tx.
SharedTxState peeraddr txid tx -> Map txid Int
referenceCounts :: Map txid Int
referenceCounts }
    =
    -- using `alterF` so the update of `PeerTxState` is done in one lookup
    case (Maybe (PeerTxState txid tx)
 -> (SharedTxState peeraddr txid tx, Maybe (PeerTxState txid tx)))
-> peeraddr
-> Map peeraddr (PeerTxState txid tx)
-> (SharedTxState peeraddr txid tx,
    Map peeraddr (PeerTxState txid tx))
forall (f :: * -> *) k a.
(Functor f, Ord k) =>
(Maybe a -> f (Maybe a)) -> k -> Map k a -> f (Map k a)
Map.alterF ((PeerTxState txid tx -> Maybe (PeerTxState txid tx))
-> (SharedTxState peeraddr txid tx, PeerTxState txid tx)
-> (SharedTxState peeraddr txid tx, Maybe (PeerTxState txid tx))
forall a b.
(a -> b)
-> (SharedTxState peeraddr txid tx, a)
-> (SharedTxState peeraddr txid tx, b)
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
fmap PeerTxState txid tx -> Maybe (PeerTxState txid tx)
forall a. a -> Maybe a
Just ((SharedTxState peeraddr txid tx, PeerTxState txid tx)
 -> (SharedTxState peeraddr txid tx, Maybe (PeerTxState txid tx)))
-> (Maybe (PeerTxState txid tx)
    -> (SharedTxState peeraddr txid tx, PeerTxState txid tx))
-> Maybe (PeerTxState txid tx)
-> (SharedTxState peeraddr txid tx, Maybe (PeerTxState txid tx))
forall b c a. (b -> c) -> (a -> b) -> a -> c
. PeerTxState txid tx
-> (SharedTxState peeraddr txid tx, PeerTxState txid tx)
fn (PeerTxState txid tx
 -> (SharedTxState peeraddr txid tx, PeerTxState txid tx))
-> (Maybe (PeerTxState txid tx) -> PeerTxState txid tx)
-> Maybe (PeerTxState txid tx)
-> (SharedTxState peeraddr txid tx, PeerTxState txid tx)
forall b c a. (b -> c) -> (a -> b) -> a -> c
. Maybe (PeerTxState txid tx) -> PeerTxState txid tx
forall a. HasCallStack => Maybe a -> a
fromJust)
                    peeraddr
peeraddr
                    Map peeraddr (PeerTxState txid tx)
peerTxStates of
      ( SharedTxState peeraddr txid tx
st', Map peeraddr (PeerTxState txid tx)
peerTxStates' ) ->
        SharedTxState peeraddr txid tx
st' { peerTxStates = peerTxStates' }

  where
    -- update `PeerTxState` and return number of `txid`s to acknowledged and
    -- updated `SharedTxState`.
    fn :: PeerTxState txid tx
       -> ( SharedTxState peeraddr txid tx
          , PeerTxState txid tx
          )
    fn :: PeerTxState txid tx
-> (SharedTxState peeraddr txid tx, PeerTxState txid tx)
fn ps :: PeerTxState txid tx
ps@PeerTxState { Map txid SizeInBytes
availableTxIds :: forall txid tx. PeerTxState txid tx -> Map txid SizeInBytes
availableTxIds :: Map txid SizeInBytes
availableTxIds,
                        NumTxIdsToReq
requestedTxIdsInflight :: forall txid tx. PeerTxState txid tx -> NumTxIdsToReq
requestedTxIdsInflight :: NumTxIdsToReq
requestedTxIdsInflight,
                        StrictSeq txid
unacknowledgedTxIds :: forall txid tx. PeerTxState txid tx -> StrictSeq txid
unacknowledgedTxIds :: StrictSeq txid
unacknowledgedTxIds } =
        (SharedTxState peeraddr txid tx
st', PeerTxState txid tx
ps')
      where
        --
        -- Handle new `txid`s
        --

        -- Divide the new txids in two: those that are already in the mempool
        -- and those that are not. We'll request some txs from the latter.
        (Map txid SizeInBytes
ignoredTxIds, Map txid SizeInBytes
availableTxIdsMap) =
          (txid -> SizeInBytes -> Bool)
-> Map txid SizeInBytes
-> (Map txid SizeInBytes, Map txid SizeInBytes)
forall k a. (k -> a -> Bool) -> Map k a -> (Map k a, Map k a)
Map.partitionWithKey
            (\txid
txid SizeInBytes
_ -> txid -> Bool
mempoolHasTx txid
txid)
            Map txid SizeInBytes
txidsMap

        -- Add all `txids` from `availableTxIdsMap` which are not
        -- unacknowledged or already buffered. Unacknowledged txids must have
        -- already been added to `availableTxIds` map before.
        availableTxIds' :: Map txid SizeInBytes
availableTxIds' =
          (Map txid SizeInBytes
 -> txid -> SizeInBytes -> Map txid SizeInBytes)
-> Map txid SizeInBytes
-> Map txid SizeInBytes
-> Map txid SizeInBytes
forall a k b. (a -> k -> b -> a) -> a -> Map k b -> a
Map.foldlWithKey
            (\Map txid SizeInBytes
m txid
txid SizeInBytes
sizeInBytes -> txid -> SizeInBytes -> Map txid SizeInBytes -> Map txid SizeInBytes
forall k a. Ord k => k -> a -> Map k a -> Map k a
Map.insert txid
txid SizeInBytes
sizeInBytes Map txid SizeInBytes
m)
            Map txid SizeInBytes
availableTxIds
            ((txid -> SizeInBytes -> Bool)
-> Map txid SizeInBytes -> Map txid SizeInBytes
forall k a. (k -> a -> Bool) -> Map k a -> Map k a
Map.filterWithKey
                (\txid
txid SizeInBytes
_ -> txid
txid txid -> StrictSeq txid -> Bool
forall (t :: * -> *) a. (Foldable t, Eq a) => a -> t a -> Bool
`notElem` StrictSeq txid
unacknowledgedTxIds
                         Bool -> Bool -> Bool
&& txid
txid txid -> Map txid (Maybe tx) -> Bool
forall k a. Ord k => k -> Map k a -> Bool
`Map.notMember` Map txid (Maybe tx)
bufferedTxs)
                Map txid SizeInBytes
availableTxIdsMap)

        -- Add received txids to `unacknowledgedTxIds`.
        unacknowledgedTxIds' :: StrictSeq txid
unacknowledgedTxIds' = StrictSeq txid
unacknowledgedTxIds StrictSeq txid -> StrictSeq txid -> StrictSeq txid
forall a. Semigroup a => a -> a -> a
<> StrictSeq txid
txidsSeq

        -- Add ignored `txs` to buffered ones.
        -- Note: we prefer to keep the `tx` if it's already in `bufferedTxs`.
        bufferedTxs' :: Map txid (Maybe tx)
bufferedTxs' = Map txid (Maybe tx)
bufferedTxs
                    Map txid (Maybe tx) -> Map txid (Maybe tx) -> Map txid (Maybe tx)
forall a. Semigroup a => a -> a -> a
<> (SizeInBytes -> Maybe tx)
-> Map txid SizeInBytes -> Map txid (Maybe tx)
forall a b k. (a -> b) -> Map k a -> Map k b
Map.map (Maybe tx -> SizeInBytes -> Maybe tx
forall a b. a -> b -> a
const Maybe tx
forall a. Maybe a
Nothing) Map txid SizeInBytes
ignoredTxIds

        referenceCounts' :: Map txid Int
referenceCounts' =
          (Map txid Int -> txid -> Map txid Int)
-> Map txid Int -> StrictSeq txid -> Map txid Int
forall b a. (b -> a -> b) -> b -> StrictSeq a -> b
forall (t :: * -> *) b a.
Foldable t =>
(b -> a -> b) -> b -> t a -> b
Foldable.foldl'
            ((txid -> Map txid Int -> Map txid Int)
-> Map txid Int -> txid -> Map txid Int
forall a b c. (a -> b -> c) -> b -> a -> c
flip ((txid -> Map txid Int -> Map txid Int)
 -> Map txid Int -> txid -> Map txid Int)
-> (txid -> Map txid Int -> Map txid Int)
-> Map txid Int
-> txid
-> Map txid Int
forall a b. (a -> b) -> a -> b
$ (Maybe Int -> Maybe Int) -> txid -> Map txid Int -> Map txid Int
forall k a.
Ord k =>
(Maybe a -> Maybe a) -> k -> Map k a -> Map k a
Map.alter (\case
                                 Maybe Int
Nothing  -> Int -> Maybe Int
forall a. a -> Maybe a
Just (Int -> Maybe Int) -> Int -> Maybe Int
forall a b. (a -> b) -> a -> b
$! Int
1
                                 Just Int
cnt -> Int -> Maybe Int
forall a. a -> Maybe a
Just (Int -> Maybe Int) -> Int -> Maybe Int
forall a b. (a -> b) -> a -> b
$! Int -> Int
forall a. Enum a => a -> a
succ Int
cnt))
            Map txid Int
referenceCounts
            StrictSeq txid
txidsSeq

        st' :: SharedTxState peeraddr txid tx
st' = SharedTxState peeraddr txid tx
st { bufferedTxs     = bufferedTxs',
                   referenceCounts = referenceCounts' }
        ps' :: PeerTxState txid tx
ps' = Bool -> PeerTxState txid tx -> PeerTxState txid tx
forall a. HasCallStack => Bool -> a -> a
assert (NumTxIdsToReq
requestedTxIdsInflight NumTxIdsToReq -> NumTxIdsToReq -> Bool
forall a. Ord a => a -> a -> Bool
>= NumTxIdsToReq
reqNo)
              PeerTxState txid tx
ps { availableTxIds         = availableTxIds',
                   unacknowledgedTxIds    = unacknowledgedTxIds',
                   requestedTxIdsInflight = requestedTxIdsInflight - reqNo }

-- | We check advertised sizes up in a fuzzy way.  The advertised and received
-- sizes need to agree up to `const_MAX_TX_SIZE_DISCREPENCY`.
--
const_MAX_TX_SIZE_DISCREPENCY :: SizeInBytes
const_MAX_TX_SIZE_DISCREPENCY :: SizeInBytes
const_MAX_TX_SIZE_DISCREPENCY = SizeInBytes
32

collectTxsImpl
    :: forall peeraddr tx txid.
       ( Ord peeraddr
       , Ord txid
       , Show txid
       , Typeable txid
       )
    => (tx -> SizeInBytes) -- ^ compute tx size
    -> peeraddr
    -> Map txid SizeInBytes -- ^ requested txids
    -> Map txid tx          -- ^ received txs
    -> SharedTxState peeraddr txid tx
    -> Either TxSubmissionProtocolError
              (SharedTxState peeraddr txid tx)
    -- ^ Return list of `txid` which sizes didn't match or a new state.
    -- If one of the `tx` has wrong size, we return an error.  The
    -- mini-protocol will throw, which will clean the state map from this peer.
collectTxsImpl :: forall peeraddr tx txid.
(Ord peeraddr, Ord txid, Show txid, Typeable txid) =>
(tx -> SizeInBytes)
-> peeraddr
-> Map txid SizeInBytes
-> Map txid tx
-> SharedTxState peeraddr txid tx
-> Either
     TxSubmissionProtocolError (SharedTxState peeraddr txid tx)
collectTxsImpl tx -> SizeInBytes
txSize peeraddr
peeraddr Map txid SizeInBytes
requestedTxIdsMap Map txid tx
receivedTxs
               st :: SharedTxState peeraddr txid tx
st@SharedTxState { Map peeraddr (PeerTxState txid tx)
peerTxStates :: forall peeraddr txid tx.
SharedTxState peeraddr txid tx
-> Map peeraddr (PeerTxState txid tx)
peerTxStates :: Map peeraddr (PeerTxState txid tx)
peerTxStates } =

    -- using `alterF` so the update of `PeerTxState` is done in one lookup
    case (Maybe (PeerTxState txid tx)
 -> (Either
       [(txid, SizeInBytes, SizeInBytes)]
       (SharedTxState peeraddr txid tx),
     Maybe (PeerTxState txid tx)))
-> peeraddr
-> Map peeraddr (PeerTxState txid tx)
-> (Either
      [(txid, SizeInBytes, SizeInBytes)]
      (SharedTxState peeraddr txid tx),
    Map peeraddr (PeerTxState txid tx))
forall (f :: * -> *) k a.
(Functor f, Ord k) =>
(Maybe a -> f (Maybe a)) -> k -> Map k a -> f (Map k a)
Map.alterF ((PeerTxState txid tx -> Maybe (PeerTxState txid tx))
-> (Either
      [(txid, SizeInBytes, SizeInBytes)]
      (SharedTxState peeraddr txid tx),
    PeerTxState txid tx)
-> (Either
      [(txid, SizeInBytes, SizeInBytes)]
      (SharedTxState peeraddr txid tx),
    Maybe (PeerTxState txid tx))
forall a b.
(a -> b)
-> (Either
      [(txid, SizeInBytes, SizeInBytes)]
      (SharedTxState peeraddr txid tx),
    a)
-> (Either
      [(txid, SizeInBytes, SizeInBytes)]
      (SharedTxState peeraddr txid tx),
    b)
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
fmap PeerTxState txid tx -> Maybe (PeerTxState txid tx)
forall a. a -> Maybe a
Just ((Either
    [(txid, SizeInBytes, SizeInBytes)]
    (SharedTxState peeraddr txid tx),
  PeerTxState txid tx)
 -> (Either
       [(txid, SizeInBytes, SizeInBytes)]
       (SharedTxState peeraddr txid tx),
     Maybe (PeerTxState txid tx)))
-> (Maybe (PeerTxState txid tx)
    -> (Either
          [(txid, SizeInBytes, SizeInBytes)]
          (SharedTxState peeraddr txid tx),
        PeerTxState txid tx))
-> Maybe (PeerTxState txid tx)
-> (Either
      [(txid, SizeInBytes, SizeInBytes)]
      (SharedTxState peeraddr txid tx),
    Maybe (PeerTxState txid tx))
forall b c a. (b -> c) -> (a -> b) -> a -> c
. PeerTxState txid tx
-> (Either
      [(txid, SizeInBytes, SizeInBytes)]
      (SharedTxState peeraddr txid tx),
    PeerTxState txid tx)
fn (PeerTxState txid tx
 -> (Either
       [(txid, SizeInBytes, SizeInBytes)]
       (SharedTxState peeraddr txid tx),
     PeerTxState txid tx))
-> (Maybe (PeerTxState txid tx) -> PeerTxState txid tx)
-> Maybe (PeerTxState txid tx)
-> (Either
      [(txid, SizeInBytes, SizeInBytes)]
      (SharedTxState peeraddr txid tx),
    PeerTxState txid tx)
forall b c a. (b -> c) -> (a -> b) -> a -> c
. Maybe (PeerTxState txid tx) -> PeerTxState txid tx
forall a. HasCallStack => Maybe a -> a
fromJust)
                    peeraddr
peeraddr
                    Map peeraddr (PeerTxState txid tx)
peerTxStates of
      (Right SharedTxState peeraddr txid tx
st', Map peeraddr (PeerTxState txid tx)
peerTxStates') ->
        SharedTxState peeraddr txid tx
-> Either
     TxSubmissionProtocolError (SharedTxState peeraddr txid tx)
forall a b. b -> Either a b
Right SharedTxState peeraddr txid tx
st' { peerTxStates = peerTxStates' }
      (Left [(txid, SizeInBytes, SizeInBytes)]
e, Map peeraddr (PeerTxState txid tx)
_) ->
        TxSubmissionProtocolError
-> Either
     TxSubmissionProtocolError (SharedTxState peeraddr txid tx)
forall a b. a -> Either a b
Left (TxSubmissionProtocolError
 -> Either
      TxSubmissionProtocolError (SharedTxState peeraddr txid tx))
-> TxSubmissionProtocolError
-> Either
     TxSubmissionProtocolError (SharedTxState peeraddr txid tx)
forall a b. (a -> b) -> a -> b
$ [(txid, SizeInBytes, SizeInBytes)] -> TxSubmissionProtocolError
forall txid.
(Typeable txid, Show txid, Eq txid) =>
[(txid, SizeInBytes, SizeInBytes)] -> TxSubmissionProtocolError
ProtocolErrorTxSizeError [(txid, SizeInBytes, SizeInBytes)]
e

  where
    -- Update `PeerTxState` and partially update `SharedTxState` (except of
    -- `peerTxStates`).
    fn :: PeerTxState txid tx
       -> ( Either [(txid, SizeInBytes, SizeInBytes)]
                   (SharedTxState peeraddr txid tx)
          , PeerTxState txid tx
          )
    fn :: PeerTxState txid tx
-> (Either
      [(txid, SizeInBytes, SizeInBytes)]
      (SharedTxState peeraddr txid tx),
    PeerTxState txid tx)
fn PeerTxState txid tx
ps =
        case [(txid, SizeInBytes, SizeInBytes)]
wrongSizedTxs of
          [] -> ( SharedTxState peeraddr txid tx
-> Either
     [(txid, SizeInBytes, SizeInBytes)] (SharedTxState peeraddr txid tx)
forall a b. b -> Either a b
Right SharedTxState peeraddr txid tx
st'
                , PeerTxState txid tx
ps''
                )
          [(txid, SizeInBytes, SizeInBytes)]
_  -> ( [(txid, SizeInBytes, SizeInBytes)]
-> Either
     [(txid, SizeInBytes, SizeInBytes)] (SharedTxState peeraddr txid tx)
forall a b. a -> Either a b
Left [(txid, SizeInBytes, SizeInBytes)]
wrongSizedTxs
                , PeerTxState txid tx
ps
                )
      where
        wrongSizedTxs :: [(txid, SizeInBytes, SizeInBytes)]
        wrongSizedTxs :: [(txid, SizeInBytes, SizeInBytes)]
wrongSizedTxs =
            ((txid, (SizeInBytes, SizeInBytes))
 -> (txid, SizeInBytes, SizeInBytes))
-> [(txid, (SizeInBytes, SizeInBytes))]
-> [(txid, SizeInBytes, SizeInBytes)]
forall a b. (a -> b) -> [a] -> [b]
map (\(txid
a, (SizeInBytes
b,SizeInBytes
c)) -> (txid
a,SizeInBytes
b,SizeInBytes
c))
          ([(txid, (SizeInBytes, SizeInBytes))]
 -> [(txid, SizeInBytes, SizeInBytes)])
-> (Map txid (SizeInBytes, SizeInBytes)
    -> [(txid, (SizeInBytes, SizeInBytes))])
-> Map txid (SizeInBytes, SizeInBytes)
-> [(txid, SizeInBytes, SizeInBytes)]
forall b c a. (b -> c) -> (a -> b) -> a -> c
. Map txid (SizeInBytes, SizeInBytes)
-> [(txid, (SizeInBytes, SizeInBytes))]
forall k a. Map k a -> [(k, a)]
Map.toList
          (Map txid (SizeInBytes, SizeInBytes)
 -> [(txid, SizeInBytes, SizeInBytes)])
-> Map txid (SizeInBytes, SizeInBytes)
-> [(txid, SizeInBytes, SizeInBytes)]
forall a b. (a -> b) -> a -> b
$ SimpleWhenMissing txid SizeInBytes (SizeInBytes, SizeInBytes)
-> SimpleWhenMissing txid SizeInBytes (SizeInBytes, SizeInBytes)
-> SimpleWhenMatched
     txid SizeInBytes SizeInBytes (SizeInBytes, SizeInBytes)
-> Map txid SizeInBytes
-> Map txid SizeInBytes
-> Map txid (SizeInBytes, SizeInBytes)
forall k a c b.
Ord k =>
SimpleWhenMissing k a c
-> SimpleWhenMissing k b c
-> SimpleWhenMatched k a b c
-> Map k a
-> Map k b
-> Map k c
Map.merge
              SimpleWhenMissing txid SizeInBytes (SizeInBytes, SizeInBytes)
forall (f :: * -> *) k x y. Applicative f => WhenMissing f k x y
Map.dropMissing
              SimpleWhenMissing txid SizeInBytes (SizeInBytes, SizeInBytes)
forall (f :: * -> *) k x y. Applicative f => WhenMissing f k x y
Map.dropMissing
              ((txid
 -> SizeInBytes -> SizeInBytes -> Maybe (SizeInBytes, SizeInBytes))
-> SimpleWhenMatched
     txid SizeInBytes SizeInBytes (SizeInBytes, SizeInBytes)
forall (f :: * -> *) k x y z.
Applicative f =>
(k -> x -> y -> Maybe z) -> WhenMatched f k x y z
Map.zipWithMaybeMatched \txid
_ SizeInBytes
receivedSize SizeInBytes
advertisedSize ->
                if SizeInBytes
receivedSize SizeInBytes -> SizeInBytes -> Bool
`checkTxSize` SizeInBytes
advertisedSize
                  then Maybe (SizeInBytes, SizeInBytes)
forall a. Maybe a
Nothing
                  else (SizeInBytes, SizeInBytes) -> Maybe (SizeInBytes, SizeInBytes)
forall a. a -> Maybe a
Just (SizeInBytes
receivedSize, SizeInBytes
advertisedSize)
              )
              (tx -> SizeInBytes
txSize (tx -> SizeInBytes) -> Map txid tx -> Map txid SizeInBytes
forall a b k. (a -> b) -> Map k a -> Map k b
`Map.map` Map txid tx
receivedTxs)
              Map txid SizeInBytes
requestedTxIdsMap

        checkTxSize :: SizeInBytes
                    -> SizeInBytes
                    -> Bool
        checkTxSize :: SizeInBytes -> SizeInBytes -> Bool
checkTxSize SizeInBytes
received SizeInBytes
advertised
          | SizeInBytes
received SizeInBytes -> SizeInBytes -> Bool
forall a. Ord a => a -> a -> Bool
> SizeInBytes
advertised
          = SizeInBytes
received SizeInBytes -> SizeInBytes -> SizeInBytes
forall a. Num a => a -> a -> a
- SizeInBytes
advertised SizeInBytes -> SizeInBytes -> Bool
forall a. Ord a => a -> a -> Bool
<= SizeInBytes
const_MAX_TX_SIZE_DISCREPENCY
          | Bool
otherwise
          = SizeInBytes
advertised SizeInBytes -> SizeInBytes -> SizeInBytes
forall a. Num a => a -> a -> a
- SizeInBytes
received SizeInBytes -> SizeInBytes -> Bool
forall a. Ord a => a -> a -> Bool
<= SizeInBytes
const_MAX_TX_SIZE_DISCREPENCY

        requestedTxIds :: Set txid
requestedTxIds = Map txid SizeInBytes -> Set txid
forall k a. Map k a -> Set k
Map.keysSet Map txid SizeInBytes
requestedTxIdsMap
        notReceived :: Set txid
notReceived    = Set txid
requestedTxIds Set txid -> Set txid -> Set txid
forall a. Ord a => Set a -> Set a -> Set a
Set.\\ Map txid tx -> Set txid
forall k a. Map k a -> Set k
Map.keysSet Map txid tx
receivedTxs
        downloadedTxs' :: Map txid tx
downloadedTxs' = PeerTxState txid tx -> Map txid tx
forall txid tx. PeerTxState txid tx -> Map txid tx
downloadedTxs PeerTxState txid tx
ps Map txid tx -> Map txid tx -> Map txid tx
forall a. Semigroup a => a -> a -> a
<> Map txid tx
receivedTxs
        -- Add not received txs to `unknownTxs` before acknowledging txids.
        unknownTxs' :: Set txid
unknownTxs'    = PeerTxState txid tx -> Set txid
forall txid tx. PeerTxState txid tx -> Set txid
unknownTxs PeerTxState txid tx
ps Set txid -> Set txid -> Set txid
forall a. Semigroup a => a -> a -> a
<> Set txid
notReceived

        requestedTxsInflight' :: Set txid
requestedTxsInflight' =
          Bool -> Set txid -> Set txid
forall a. HasCallStack => Bool -> a -> a
assert (Set txid
requestedTxIds Set txid -> Set txid -> Bool
forall a. Ord a => Set a -> Set a -> Bool
`Set.isSubsetOf` PeerTxState txid tx -> Set txid
forall txid tx. PeerTxState txid tx -> Set txid
requestedTxsInflight PeerTxState txid tx
ps) (Set txid -> Set txid) -> Set txid -> Set txid
forall a b. (a -> b) -> a -> b
$
          PeerTxState txid tx -> Set txid
forall txid tx. PeerTxState txid tx -> Set txid
requestedTxsInflight PeerTxState txid tx
ps  Set txid -> Set txid -> Set txid
forall a. Ord a => Set a -> Set a -> Set a
Set.\\ Set txid
requestedTxIds

        requestedSize :: SizeInBytes
requestedSize = Map txid SizeInBytes -> SizeInBytes
forall m. Monoid m => Map txid m -> m
forall (t :: * -> *) m. (Foldable t, Monoid m) => t m -> m
fold (Map txid SizeInBytes -> SizeInBytes)
-> Map txid SizeInBytes -> SizeInBytes
forall a b. (a -> b) -> a -> b
$ PeerTxState txid tx -> Map txid SizeInBytes
forall txid tx. PeerTxState txid tx -> Map txid SizeInBytes
availableTxIds PeerTxState txid tx
ps Map txid SizeInBytes -> Set txid -> Map txid SizeInBytes
forall k a. Ord k => Map k a -> Set k -> Map k a
`Map.restrictKeys` Set txid
requestedTxIds
        requestedTxsInflightSize' :: SizeInBytes
requestedTxsInflightSize' =
          Bool -> SizeInBytes -> SizeInBytes
forall a. HasCallStack => Bool -> a -> a
assert (PeerTxState txid tx -> SizeInBytes
forall txid tx. PeerTxState txid tx -> SizeInBytes
requestedTxsInflightSize PeerTxState txid tx
ps SizeInBytes -> SizeInBytes -> Bool
forall a. Ord a => a -> a -> Bool
>= SizeInBytes
requestedSize) (SizeInBytes -> SizeInBytes) -> SizeInBytes -> SizeInBytes
forall a b. (a -> b) -> a -> b
$
          PeerTxState txid tx -> SizeInBytes
forall txid tx. PeerTxState txid tx -> SizeInBytes
requestedTxsInflightSize PeerTxState txid tx
ps SizeInBytes -> SizeInBytes -> SizeInBytes
forall a. Num a => a -> a -> a
- SizeInBytes
requestedSize

        -- subtract requested from in-flight
        inflightTxs'' :: Map txid Int
inflightTxs'' =
          SimpleWhenMissing txid Int Int
-> SimpleWhenMissing txid Int Int
-> SimpleWhenMatched txid Int Int Int
-> Map txid Int
-> Map txid Int
-> Map txid Int
forall k a c b.
Ord k =>
SimpleWhenMissing k a c
-> SimpleWhenMissing k b c
-> SimpleWhenMatched k a b c
-> Map k a
-> Map k b
-> Map k c
Map.merge
            ((txid -> Int -> Maybe Int) -> SimpleWhenMissing txid Int Int
forall (f :: * -> *) k x y.
Applicative f =>
(k -> x -> Maybe y) -> WhenMissing f k x y
Map.mapMaybeMissing \txid
_ Int
x -> Int -> Maybe Int
forall a. a -> Maybe a
Just Int
x)
            ((txid -> Int -> Maybe Int) -> SimpleWhenMissing txid Int Int
forall (f :: * -> *) k x y.
Applicative f =>
(k -> x -> Maybe y) -> WhenMissing f k x y
Map.mapMaybeMissing \txid
_ Int
_ -> Bool -> Maybe Int -> Maybe Int
forall a. HasCallStack => Bool -> a -> a
assert Bool
False Maybe Int
forall a. Maybe a
Nothing)
            ((txid -> Int -> Int -> Maybe Int)
-> SimpleWhenMatched txid Int Int Int
forall (f :: * -> *) k x y z.
Applicative f =>
(k -> x -> y -> Maybe z) -> WhenMatched f k x y z
Map.zipWithMaybeMatched \txid
_ Int
x Int
y -> Bool -> Maybe Int -> Maybe Int
forall a. HasCallStack => Bool -> a -> a
assert (Int
x Int -> Int -> Bool
forall a. Ord a => a -> a -> Bool
>= Int
y)
                                               let z :: Int
z = Int
x Int -> Int -> Int
forall a. Num a => a -> a -> a
- Int
y in
                                               if Int
z Int -> Int -> Bool
forall a. Ord a => a -> a -> Bool
> Int
0
                                               then Int -> Maybe Int
forall a. a -> Maybe a
Just Int
z
                                               else Maybe Int
forall a. Maybe a
Nothing)
            (SharedTxState peeraddr txid tx -> Map txid Int
forall peeraddr txid tx.
SharedTxState peeraddr txid tx -> Map txid Int
inflightTxs SharedTxState peeraddr txid tx
st)
            ((txid -> Int) -> Set txid -> Map txid Int
forall k a. (k -> a) -> Set k -> Map k a
Map.fromSet (Int -> txid -> Int
forall a b. a -> b -> a
const Int
1) Set txid
requestedTxIds)

        inflightTxsSize'' :: SizeInBytes
inflightTxsSize'' = Bool -> SizeInBytes -> SizeInBytes
forall a. HasCallStack => Bool -> a -> a
assert (SharedTxState peeraddr txid tx -> SizeInBytes
forall peeraddr txid tx.
SharedTxState peeraddr txid tx -> SizeInBytes
inflightTxsSize SharedTxState peeraddr txid tx
st SizeInBytes -> SizeInBytes -> Bool
forall a. Ord a => a -> a -> Bool
>= SizeInBytes
requestedSize) (SizeInBytes -> SizeInBytes) -> SizeInBytes -> SizeInBytes
forall a b. (a -> b) -> a -> b
$
                            SharedTxState peeraddr txid tx -> SizeInBytes
forall peeraddr txid tx.
SharedTxState peeraddr txid tx -> SizeInBytes
inflightTxsSize SharedTxState peeraddr txid tx
st SizeInBytes -> SizeInBytes -> SizeInBytes
forall a. Num a => a -> a -> a
- SizeInBytes
requestedSize

        st' :: SharedTxState peeraddr txid tx
st' = SharedTxState peeraddr txid tx
st { inflightTxs     = inflightTxs'',
                   inflightTxsSize = inflightTxsSize''
                 }

        --
        -- Update PeerTxState
        --

        -- Remove the downloaded `txid`s from the availableTxIds map, this
        -- guarantees that we won't attempt to download the `txids` from this peer
        -- once we collect the `txid`s. Also restrict keys to `liveSet`.
        --
        -- NOTE: we could remove `notReceived` from `availableTxIds`; and
        -- possibly avoid using `unknownTxs` field at all.
        --
        availableTxIds'' :: Map txid SizeInBytes
availableTxIds'' = PeerTxState txid tx -> Map txid SizeInBytes
forall txid tx. PeerTxState txid tx -> Map txid SizeInBytes
availableTxIds PeerTxState txid tx
ps
                           Map txid SizeInBytes -> Set txid -> Map txid SizeInBytes
forall k a. Ord k => Map k a -> Set k -> Map k a
`Map.withoutKeys`
                           Set txid
requestedTxIds

        -- Remove all acknowledged `txid`s from unknown set, but only those
        -- which are not present in `unacknowledgedTxIds'`
        unknownTxs'' :: Set txid
unknownTxs'' = Set txid
unknownTxs'
                       Set txid -> Set txid -> Set txid
forall a. Ord a => Set a -> Set a -> Set a
`Set.intersection`
                       Set txid
live
          where
            -- We cannot use `liveSet` as `unknown <> notReceived` might
            -- contain `txids` which are in `liveSet` but are not `live`.
            live :: Set txid
live = [txid] -> Set txid
forall a. Ord a => [a] -> Set a
Set.fromList (StrictSeq txid -> [txid]
forall a. StrictSeq a -> [a]
forall (t :: * -> *) a. Foldable t => t a -> [a]
toList (PeerTxState txid tx -> StrictSeq txid
forall txid tx. PeerTxState txid tx -> StrictSeq txid
unacknowledgedTxIds PeerTxState txid tx
ps))

        ps'' :: PeerTxState txid tx
ps'' = PeerTxState txid tx
ps { availableTxIds           = availableTxIds'',
                    unknownTxs               = unknownTxs'',
                    requestedTxsInflightSize = requestedTxsInflightSize',
                    requestedTxsInflight     = requestedTxsInflight',
                    downloadedTxs            = downloadedTxs' }

--
-- Monadic public API
--

type SharedTxStateVar m peeraddr txid tx = StrictTVar m (SharedTxState peeraddr txid tx)

newSharedTxStateVar :: MonadSTM m
                    => StdGen
                    -> m (SharedTxStateVar m peeraddr txid tx)
newSharedTxStateVar :: forall (m :: * -> *) peeraddr txid tx.
MonadSTM m =>
StdGen -> m (SharedTxStateVar m peeraddr txid tx)
newSharedTxStateVar StdGen
rng = SharedTxState peeraddr txid tx
-> m (StrictTVar m (SharedTxState peeraddr txid tx))
forall (m :: * -> *) a. MonadSTM m => a -> m (StrictTVar m a)
newTVarIO SharedTxState {
    peerTxStates :: Map peeraddr (PeerTxState txid tx)
peerTxStates             = Map peeraddr (PeerTxState txid tx)
forall k a. Map k a
Map.empty,
    inflightTxs :: Map txid Int
inflightTxs              = Map txid Int
forall k a. Map k a
Map.empty,
    inflightTxsSize :: SizeInBytes
inflightTxsSize          = SizeInBytes
0,
    bufferedTxs :: Map txid (Maybe tx)
bufferedTxs              = Map txid (Maybe tx)
forall k a. Map k a
Map.empty,
    referenceCounts :: Map txid Int
referenceCounts          = Map txid Int
forall k a. Map k a
Map.empty,
    timedTxs :: Map Time [txid]
timedTxs                 = Map Time [txid]
forall k a. Map k a
Map.empty,
    inSubmissionToMempoolTxs :: Map txid Int
inSubmissionToMempoolTxs = Map txid Int
forall k a. Map k a
Map.empty,
    peerRng :: StdGen
peerRng                  = StdGen
rng
  }


-- | Acknowledge `txid`s, return the number of `txids` to be acknowledged to the
-- remote side.
--
receivedTxIds
  :: forall m peeraddr idx tx txid.
     (MonadSTM m, Ord txid, Ord peeraddr)
  => Tracer m (TraceTxLogic peeraddr txid tx)
  -> SharedTxStateVar m peeraddr txid tx
  -> STM m (MempoolSnapshot txid tx idx)
  -> peeraddr
  -> NumTxIdsToReq
  -- ^ number of requests to subtract from
  -- `requestedTxIdsInflight`
  -> StrictSeq txid
  -- ^ sequence of received `txids`
  -> Map txid SizeInBytes
  -- ^ received `txid`s with sizes
  -> m ()
receivedTxIds :: forall (m :: * -> *) peeraddr idx tx txid.
(MonadSTM m, Ord txid, Ord peeraddr) =>
Tracer m (TraceTxLogic peeraddr txid tx)
-> SharedTxStateVar m peeraddr txid tx
-> STM m (MempoolSnapshot txid tx idx)
-> peeraddr
-> NumTxIdsToReq
-> StrictSeq txid
-> Map txid SizeInBytes
-> m ()
receivedTxIds Tracer m (TraceTxLogic peeraddr txid tx)
tracer SharedTxStateVar m peeraddr txid tx
sharedVar STM m (MempoolSnapshot txid tx idx)
getMempoolSnapshot peeraddr
peeraddr NumTxIdsToReq
reqNo StrictSeq txid
txidsSeq Map txid SizeInBytes
txidsMap = do
  st <- STM m (SharedTxState peeraddr txid tx)
-> m (SharedTxState peeraddr txid tx)
forall a. HasCallStack => STM m a -> m a
forall (m :: * -> *) a.
(MonadSTM m, HasCallStack) =>
STM m a -> m a
atomically (STM m (SharedTxState peeraddr txid tx)
 -> m (SharedTxState peeraddr txid tx))
-> STM m (SharedTxState peeraddr txid tx)
-> m (SharedTxState peeraddr txid tx)
forall a b. (a -> b) -> a -> b
$ do
    MempoolSnapshot{mempoolHasTx} <- STM m (MempoolSnapshot txid tx idx)
getMempoolSnapshot
    stateTVar sharedVar ((\SharedTxState peeraddr txid tx
a -> (SharedTxState peeraddr txid tx
a,SharedTxState peeraddr txid tx
a)) . receivedTxIdsImpl mempoolHasTx peeraddr reqNo txidsSeq txidsMap)
  traceWith tracer (TraceSharedTxState "receivedTxIds" st)


-- | Include received `tx`s in `SharedTxState`.  Return number of `txids`
-- to be acknowledged and list of `tx` to be added to the mempool.
--
collectTxs
  :: forall m peeraddr tx txid.
     (MonadSTM m, Ord txid, Ord peeraddr,
      Show txid, Typeable txid)
  => Tracer m (TraceTxLogic peeraddr txid tx)
  -> (tx -> SizeInBytes)
  -> SharedTxStateVar m peeraddr txid tx
  -> peeraddr
  -> Map txid SizeInBytes -- ^ set of requested txids with their announced size
  -> Map txid tx          -- ^ received txs
  -> m (Maybe TxSubmissionProtocolError)
  -- ^ number of txids to be acknowledged and txs to be added to the
  -- mempool
collectTxs :: forall (m :: * -> *) peeraddr tx txid.
(MonadSTM m, Ord txid, Ord peeraddr, Show txid, Typeable txid) =>
Tracer m (TraceTxLogic peeraddr txid tx)
-> (tx -> SizeInBytes)
-> SharedTxStateVar m peeraddr txid tx
-> peeraddr
-> Map txid SizeInBytes
-> Map txid tx
-> m (Maybe TxSubmissionProtocolError)
collectTxs Tracer m (TraceTxLogic peeraddr txid tx)
tracer tx -> SizeInBytes
txSize SharedTxStateVar m peeraddr txid tx
sharedVar peeraddr
peeraddr Map txid SizeInBytes
txidsRequested Map txid tx
txsMap = do
  r <- STM
  m
  (Either TxSubmissionProtocolError (SharedTxState peeraddr txid tx))
-> m (Either
        TxSubmissionProtocolError (SharedTxState peeraddr txid tx))
forall a. HasCallStack => STM m a -> m a
forall (m :: * -> *) a.
(MonadSTM m, HasCallStack) =>
STM m a -> m a
atomically (STM
   m
   (Either TxSubmissionProtocolError (SharedTxState peeraddr txid tx))
 -> m (Either
         TxSubmissionProtocolError (SharedTxState peeraddr txid tx)))
-> STM
     m
     (Either TxSubmissionProtocolError (SharedTxState peeraddr txid tx))
-> m (Either
        TxSubmissionProtocolError (SharedTxState peeraddr txid tx))
forall a b. (a -> b) -> a -> b
$ do
    st <- SharedTxStateVar m peeraddr txid tx
-> STM m (SharedTxState peeraddr txid tx)
forall (m :: * -> *) a. MonadSTM m => StrictTVar m a -> STM m a
readTVar SharedTxStateVar m peeraddr txid tx
sharedVar
    case collectTxsImpl txSize peeraddr txidsRequested txsMap st of
      r :: Either TxSubmissionProtocolError (SharedTxState peeraddr txid tx)
r@(Right SharedTxState peeraddr txid tx
st') -> SharedTxStateVar m peeraddr txid tx
-> SharedTxState peeraddr txid tx -> STM m ()
forall (m :: * -> *) a.
MonadSTM m =>
StrictTVar m a -> a -> STM m ()
writeTVar SharedTxStateVar m peeraddr txid tx
sharedVar SharedTxState peeraddr txid tx
st'
                    STM m ()
-> Either
     TxSubmissionProtocolError (SharedTxState peeraddr txid tx)
-> STM
     m
     (Either TxSubmissionProtocolError (SharedTxState peeraddr txid tx))
forall (f :: * -> *) a b. Functor f => f a -> b -> f b
$> Either TxSubmissionProtocolError (SharedTxState peeraddr txid tx)
r
      r :: Either TxSubmissionProtocolError (SharedTxState peeraddr txid tx)
r@Left {}     -> Either TxSubmissionProtocolError (SharedTxState peeraddr txid tx)
-> STM
     m
     (Either TxSubmissionProtocolError (SharedTxState peeraddr txid tx))
forall a. a -> STM m a
forall (f :: * -> *) a. Applicative f => a -> f a
pure Either TxSubmissionProtocolError (SharedTxState peeraddr txid tx)
r
  case r of
    Right SharedTxState peeraddr txid tx
st -> Tracer m (TraceTxLogic peeraddr txid tx)
-> TraceTxLogic peeraddr txid tx -> m ()
forall (m :: * -> *) a. Tracer m a -> a -> m ()
traceWith Tracer m (TraceTxLogic peeraddr txid tx)
tracer (String
-> SharedTxState peeraddr txid tx -> TraceTxLogic peeraddr txid tx
forall peeraddr txid tx.
String
-> SharedTxState peeraddr txid tx -> TraceTxLogic peeraddr txid tx
TraceSharedTxState String
"collectTxs" SharedTxState peeraddr txid tx
st)
             m ()
-> Maybe TxSubmissionProtocolError
-> m (Maybe TxSubmissionProtocolError)
forall (f :: * -> *) a b. Functor f => f a -> b -> f b
$> Maybe TxSubmissionProtocolError
forall a. Maybe a
Nothing
    Left TxSubmissionProtocolError
e   -> Maybe TxSubmissionProtocolError
-> m (Maybe TxSubmissionProtocolError)
forall a. a -> m a
forall (m :: * -> *) a. Monad m => a -> m a
return (TxSubmissionProtocolError -> Maybe TxSubmissionProtocolError
forall a. a -> Maybe a
Just TxSubmissionProtocolError
e)