{-# LANGUAGE BangPatterns        #-}
{-# LANGUAGE BlockArguments      #-}
{-# LANGUAGE DerivingVia         #-}
{-# LANGUAGE LambdaCase          #-}
{-# LANGUAGE NamedFieldPuns      #-}
{-# LANGUAGE ScopedTypeVariables #-}
{-# LANGUAGE TupleSections       #-}
{-# OPTIONS_GHC -Wno-unrecognised-pragmas #-}

module Ouroboros.Network.TxSubmission.Inbound.V2.Decision
  ( TxDecision (..)
  , emptyTxDecision
    -- * Internal API exposed for testing
  , makeDecisions
  , filterActivePeers
  , pickTxsToDownload
  ) where

import Control.Arrow ((>>>))
import Control.Exception (assert)

import Data.Bifunctor (second)
import Data.Hashable
import Data.List qualified as List
import Data.Map.Merge.Strict qualified as Map
import Data.Map.Strict (Map)
import Data.Map.Strict qualified as Map
import Data.Maybe (mapMaybe)
import Data.Set (Set)
import Data.Set qualified as Set
import GHC.Stack (HasCallStack)
import System.Random (random)

import Data.Sequence.Strict qualified as StrictSeq
import Ouroboros.Network.Protocol.TxSubmission2.Type
import Ouroboros.Network.TxSubmission.Inbound.V2.Policy
import Ouroboros.Network.TxSubmission.Inbound.V2.State
import Ouroboros.Network.TxSubmission.Inbound.V2.Types


-- | Make download decisions.
--
makeDecisions
    :: forall peeraddr txid tx.
       ( Ord peeraddr
       , Ord txid
       , Hashable peeraddr
       )
    => TxDecisionPolicy
    -- ^ decision policy
    -> SharedTxState peeraddr txid tx
    -- ^ decision context
    -> Map peeraddr (PeerTxState txid tx)
    -- ^ list of available peers.
    --
    -- This is a subset of `peerTxStates` of peers which either:
    -- * can be used to download a `tx`,
    -- * can acknowledge some `txid`s.
    --
    -> ( SharedTxState peeraddr txid tx
       , Map peeraddr (TxDecision txid tx)
       )
makeDecisions :: forall peeraddr txid tx.
(Ord peeraddr, Ord txid, Hashable peeraddr) =>
TxDecisionPolicy
-> SharedTxState peeraddr txid tx
-> Map peeraddr (PeerTxState txid tx)
-> (SharedTxState peeraddr txid tx,
    Map peeraddr (TxDecision txid tx))
makeDecisions TxDecisionPolicy
policy SharedTxState peeraddr txid tx
st =
    let (Int
salt, StdGen
rng') = StdGen -> (Int, StdGen)
forall g. RandomGen g => g -> (Int, g)
forall a g. (Random a, RandomGen g) => g -> (a, g)
random (SharedTxState peeraddr txid tx -> StdGen
forall peeraddr txid tx. SharedTxState peeraddr txid tx -> StdGen
peerRng SharedTxState peeraddr txid tx
st)
        st' :: SharedTxState peeraddr txid tx
st' = SharedTxState peeraddr txid tx
st { peerRng = rng' }
    in  (SharedTxState peeraddr txid tx, [(peeraddr, TxDecision txid tx)])
-> (SharedTxState peeraddr txid tx,
    Map peeraddr (TxDecision txid tx))
forall a.
(a, [(peeraddr, TxDecision txid tx)])
-> (a, Map peeraddr (TxDecision txid tx))
fn
      ((SharedTxState peeraddr txid tx, [(peeraddr, TxDecision txid tx)])
 -> (SharedTxState peeraddr txid tx,
     Map peeraddr (TxDecision txid tx)))
-> (Map peeraddr (PeerTxState txid tx)
    -> (SharedTxState peeraddr txid tx,
        [(peeraddr, TxDecision txid tx)]))
-> Map peeraddr (PeerTxState txid tx)
-> (SharedTxState peeraddr txid tx,
    Map peeraddr (TxDecision txid tx))
forall b c a. (b -> c) -> (a -> b) -> a -> c
. TxDecisionPolicy
-> SharedTxState peeraddr txid tx
-> [(peeraddr, PeerTxState txid tx)]
-> (SharedTxState peeraddr txid tx,
    [(peeraddr, TxDecision txid tx)])
forall peeraddr txid tx.
(Ord peeraddr, Ord txid) =>
TxDecisionPolicy
-> SharedTxState peeraddr txid tx
-> [(peeraddr, PeerTxState txid tx)]
-> (SharedTxState peeraddr txid tx,
    [(peeraddr, TxDecision txid tx)])
pickTxsToDownload TxDecisionPolicy
policy SharedTxState peeraddr txid tx
st'
      ([(peeraddr, PeerTxState txid tx)]
 -> (SharedTxState peeraddr txid tx,
     [(peeraddr, TxDecision txid tx)]))
-> (Map peeraddr (PeerTxState txid tx)
    -> [(peeraddr, PeerTxState txid tx)])
-> Map peeraddr (PeerTxState txid tx)
-> (SharedTxState peeraddr txid tx,
    [(peeraddr, TxDecision txid tx)])
forall b c a. (b -> c) -> (a -> b) -> a -> c
. Int
-> Map peeraddr (PeerTxState txid tx)
-> [(peeraddr, PeerTxState txid tx)]
forall peeraddr txid tx.
Hashable peeraddr =>
Int
-> Map peeraddr (PeerTxState txid tx)
-> [(peeraddr, PeerTxState txid tx)]
orderByRejections Int
salt
  where
    fn :: forall a.
          (a, [(peeraddr, TxDecision txid tx)])
       -> (a, Map peeraddr (TxDecision txid tx))
    fn :: forall a.
(a, [(peeraddr, TxDecision txid tx)])
-> (a, Map peeraddr (TxDecision txid tx))
fn (a
a, [(peeraddr, TxDecision txid tx)]
as) = (a
a, [(peeraddr, TxDecision txid tx)]
-> Map peeraddr (TxDecision txid tx)
forall k a. Ord k => [(k, a)] -> Map k a
Map.fromList [(peeraddr, TxDecision txid tx)]
as)


-- | Order peers by how useful the TXs they have provided are.
--
-- TXs delivered late will fail to apply because they were included in
-- a recently adopted block. Peers can race against each other by setting
-- `txInflightMultiplicity` to > 1. In case of a tie a hash of the peeraddr
-- is used as a tie breaker. Since every invocation use a new salt a given
-- peeraddr does not have an advantage over time.
--
orderByRejections :: Hashable peeraddr
                  => Int
                  -> Map peeraddr (PeerTxState txid tx)
                  -> [ (peeraddr, PeerTxState txid tx)]
orderByRejections :: forall peeraddr txid tx.
Hashable peeraddr =>
Int
-> Map peeraddr (PeerTxState txid tx)
-> [(peeraddr, PeerTxState txid tx)]
orderByRejections Int
salt =
        ((peeraddr, PeerTxState txid tx) -> (Double, Int))
-> [(peeraddr, PeerTxState txid tx)]
-> [(peeraddr, PeerTxState txid tx)]
forall b a. Ord b => (a -> b) -> [a] -> [a]
List.sortOn (\(peeraddr
peeraddr, PeerTxState txid tx
ps) -> (PeerTxState txid tx -> Double
forall txid tx. PeerTxState txid tx -> Double
score PeerTxState txid tx
ps, Int -> peeraddr -> Int
forall a. Hashable a => Int -> a -> Int
hashWithSalt Int
salt peeraddr
peeraddr))
      ([(peeraddr, PeerTxState txid tx)]
 -> [(peeraddr, PeerTxState txid tx)])
-> (Map peeraddr (PeerTxState txid tx)
    -> [(peeraddr, PeerTxState txid tx)])
-> Map peeraddr (PeerTxState txid tx)
-> [(peeraddr, PeerTxState txid tx)]
forall b c a. (b -> c) -> (a -> b) -> a -> c
. Map peeraddr (PeerTxState txid tx)
-> [(peeraddr, PeerTxState txid tx)]
forall k a. Map k a -> [(k, a)]
Map.toList


-- | Internal state of `pickTxsToDownload` computation.
--
data St peeraddr txid tx =
   St { forall peeraddr txid tx. St peeraddr txid tx -> SizeInBytes
stInflightSize             :: !SizeInBytes,
        -- ^ size of all `tx`s in-flight.

        forall peeraddr txid tx. St peeraddr txid tx -> Map txid Int
stInflight                 :: !(Map txid Int),
        -- ^ `txid`s in-flight.

        forall peeraddr txid tx. St peeraddr txid tx -> Map txid Int
stAcknowledged             :: !(Map txid Int),
        -- ^ acknowledged `txid` with multiplicities.  It is used to update
        -- `referenceCounts`.

        forall peeraddr txid tx. St peeraddr txid tx -> Set txid
stInSubmissionToMempoolTxs :: Set txid
        -- ^ TXs on their way to the mempool. Used to prevent issueing new
        -- fetch requests for them.
      }


-- | Distribute `tx`'s to download among available peers.  Peers are considered
-- in the given order.
--
-- * pick txs from the set of available tx's (in `txid` order, note these sets
--   might be different for different peers).
-- * pick txs until the peers in-flight limit (we can go over the limit by one tx)
--   (`txsSizeInflightPerPeer` limit)
-- * pick txs until the overall in-flight limit (we can go over the limit by one tx)
--   (`maxTxsSizeInflight` limit)
-- * each tx can be downloaded simultaneously from at most
--   `txInflightMultiplicity` peers.
--
pickTxsToDownload
  :: forall peeraddr txid tx.
     ( Ord peeraddr
     , Ord txid
     )
  => TxDecisionPolicy
  -- ^ decision policy
  -> SharedTxState peeraddr txid tx
  -- ^ shared state

  -> [(peeraddr, PeerTxState txid tx)]
  -> ( SharedTxState peeraddr txid tx
     , [(peeraddr, TxDecision txid tx)]
     )

pickTxsToDownload :: forall peeraddr txid tx.
(Ord peeraddr, Ord txid) =>
TxDecisionPolicy
-> SharedTxState peeraddr txid tx
-> [(peeraddr, PeerTxState txid tx)]
-> (SharedTxState peeraddr txid tx,
    [(peeraddr, TxDecision txid tx)])
pickTxsToDownload policy :: TxDecisionPolicy
policy@TxDecisionPolicy { SizeInBytes
txsSizeInflightPerPeer :: TxDecisionPolicy -> SizeInBytes
txsSizeInflightPerPeer :: SizeInBytes
txsSizeInflightPerPeer,
                                            SizeInBytes
maxTxsSizeInflight :: TxDecisionPolicy -> SizeInBytes
maxTxsSizeInflight :: SizeInBytes
maxTxsSizeInflight,
                                            Int
txInflightMultiplicity :: TxDecisionPolicy -> Int
txInflightMultiplicity :: Int
txInflightMultiplicity }
                  sharedState :: SharedTxState peeraddr txid tx
sharedState@SharedTxState { Map peeraddr (PeerTxState txid tx)
peerTxStates :: forall peeraddr txid tx.
SharedTxState peeraddr txid tx
-> Map peeraddr (PeerTxState txid tx)
peerTxStates :: Map peeraddr (PeerTxState txid tx)
peerTxStates,
                                              Map txid Int
inflightTxs :: Map txid Int
inflightTxs :: forall peeraddr txid tx.
SharedTxState peeraddr txid tx -> Map txid Int
inflightTxs,
                                              SizeInBytes
inflightTxsSize :: SizeInBytes
inflightTxsSize :: forall peeraddr txid tx.
SharedTxState peeraddr txid tx -> SizeInBytes
inflightTxsSize,
                                              Map txid (Maybe tx)
bufferedTxs :: Map txid (Maybe tx)
bufferedTxs :: forall peeraddr txid tx.
SharedTxState peeraddr txid tx -> Map txid (Maybe tx)
bufferedTxs,
                                              Map txid Int
inSubmissionToMempoolTxs :: Map txid Int
inSubmissionToMempoolTxs :: forall peeraddr txid tx.
SharedTxState peeraddr txid tx -> Map txid Int
inSubmissionToMempoolTxs,
                                              Map txid Int
referenceCounts :: forall peeraddr txid tx.
SharedTxState peeraddr txid tx -> Map txid Int
referenceCounts :: Map txid Int
referenceCounts } =
    -- outer fold: fold `[(peeraddr, PeerTxState txid tx)]`
    (St peeraddr txid tx
 -> (peeraddr, PeerTxState txid tx)
 -> (St peeraddr txid tx,
     ((peeraddr, PeerTxState txid tx), TxDecision txid tx)))
-> St peeraddr txid tx
-> [(peeraddr, PeerTxState txid tx)]
-> (St peeraddr txid tx,
    [((peeraddr, PeerTxState txid tx), TxDecision txid tx)])
forall (t :: * -> *) s a b.
Traversable t =>
(s -> a -> (s, b)) -> s -> t a -> (s, t b)
List.mapAccumR
      St peeraddr txid tx
-> (peeraddr, PeerTxState txid tx)
-> (St peeraddr txid tx,
    ((peeraddr, PeerTxState txid tx), TxDecision txid tx))
accumFn
      -- initial state
      St { stInflight :: Map txid Int
stInflight                 = Map txid Int
inflightTxs,
           stInflightSize :: SizeInBytes
stInflightSize             = SizeInBytes
inflightTxsSize,
           stAcknowledged :: Map txid Int
stAcknowledged             = Map txid Int
forall k a. Map k a
Map.empty,
           stInSubmissionToMempoolTxs :: Set txid
stInSubmissionToMempoolTxs = Map txid Int -> Set txid
forall k a. Map k a -> Set k
Map.keysSet Map txid Int
inSubmissionToMempoolTxs }

    ([(peeraddr, PeerTxState txid tx)]
 -> (St peeraddr txid tx,
     [((peeraddr, PeerTxState txid tx), TxDecision txid tx)]))
-> ((St peeraddr txid tx,
     [((peeraddr, PeerTxState txid tx), TxDecision txid tx)])
    -> (SharedTxState peeraddr txid tx,
        [(peeraddr, TxDecision txid tx)]))
-> [(peeraddr, PeerTxState txid tx)]
-> (SharedTxState peeraddr txid tx,
    [(peeraddr, TxDecision txid tx)])
forall {k} (cat :: k -> k -> *) (a :: k) (b :: k) (c :: k).
Category cat =>
cat a b -> cat b c -> cat a c
>>>
      (St peeraddr txid tx,
 [((peeraddr, PeerTxState txid tx), TxDecision txid tx)])
-> (SharedTxState peeraddr txid tx,
    [(peeraddr, TxDecision txid tx)])
gn
  where
    accumFn :: St peeraddr txid tx
            -> (peeraddr, PeerTxState txid tx)
            -> ( St peeraddr txid tx
               , ( (peeraddr, PeerTxState txid tx)
                 , TxDecision txid tx
                 )
               )
    accumFn :: St peeraddr txid tx
-> (peeraddr, PeerTxState txid tx)
-> (St peeraddr txid tx,
    ((peeraddr, PeerTxState txid tx), TxDecision txid tx))
accumFn
      st :: St peeraddr txid tx
st@St { Map txid Int
stInflight :: forall peeraddr txid tx. St peeraddr txid tx -> Map txid Int
stInflight :: Map txid Int
stInflight,
              SizeInBytes
stInflightSize :: forall peeraddr txid tx. St peeraddr txid tx -> SizeInBytes
stInflightSize :: SizeInBytes
stInflightSize,
              Map txid Int
stAcknowledged :: forall peeraddr txid tx. St peeraddr txid tx -> Map txid Int
stAcknowledged :: Map txid Int
stAcknowledged,
              Set txid
stInSubmissionToMempoolTxs :: forall peeraddr txid tx. St peeraddr txid tx -> Set txid
stInSubmissionToMempoolTxs :: Set txid
stInSubmissionToMempoolTxs }
      ( peeraddr
peeraddr
      , peerTxState :: PeerTxState txid tx
peerTxState@PeerTxState { Map txid SizeInBytes
availableTxIds :: Map txid SizeInBytes
availableTxIds :: forall txid tx. PeerTxState txid tx -> Map txid SizeInBytes
availableTxIds,
                                  Set txid
unknownTxs :: Set txid
unknownTxs :: forall txid tx. PeerTxState txid tx -> Set txid
unknownTxs,
                                  Set txid
requestedTxsInflight :: Set txid
requestedTxsInflight :: forall txid tx. PeerTxState txid tx -> Set txid
requestedTxsInflight,
                                  SizeInBytes
requestedTxsInflightSize :: SizeInBytes
requestedTxsInflightSize :: forall txid tx. PeerTxState txid tx -> SizeInBytes
requestedTxsInflightSize
                                }
      )
      =
      let sizeInflightAll   :: SizeInBytes
          sizeInflightOther :: SizeInBytes

          sizeInflightAll :: SizeInBytes
sizeInflightAll   = SizeInBytes
stInflightSize
          sizeInflightOther :: SizeInBytes
sizeInflightOther = SizeInBytes
sizeInflightAll SizeInBytes -> SizeInBytes -> SizeInBytes
forall a. Num a => a -> a -> a
- SizeInBytes
requestedTxsInflightSize

      in if SizeInBytes
sizeInflightAll SizeInBytes -> SizeInBytes -> Bool
forall a. Ord a => a -> a -> Bool
>= SizeInBytes
maxTxsSizeInflight
        then let ( NumTxIdsToAck
numTxIdsToAck
                   , NumTxIdsToReq
numTxIdsToReq
                   , txsToMempool :: TxsToMempool txid tx
txsToMempool@TxsToMempool { [(txid, tx)]
listOfTxsToMempool :: [(txid, tx)]
listOfTxsToMempool :: forall txid tx. TxsToMempool txid tx -> [(txid, tx)]
listOfTxsToMempool }
                   , RefCountDiff { Map txid Int
txIdsToAck :: Map txid Int
txIdsToAck :: forall txid. RefCountDiff txid -> Map txid Int
txIdsToAck }
                   , PeerTxState txid tx
peerTxState'
                   ) = TxDecisionPolicy
-> SharedTxState peeraddr txid tx
-> PeerTxState txid tx
-> (NumTxIdsToAck, NumTxIdsToReq, TxsToMempool txid tx,
    RefCountDiff txid, PeerTxState txid tx)
forall peeraddr tx txid.
(Ord txid, HasCallStack) =>
TxDecisionPolicy
-> SharedTxState peeraddr txid tx
-> PeerTxState txid tx
-> (NumTxIdsToAck, NumTxIdsToReq, TxsToMempool txid tx,
    RefCountDiff txid, PeerTxState txid tx)
acknowledgeTxIds TxDecisionPolicy
policy SharedTxState peeraddr txid tx
sharedState PeerTxState txid tx
peerTxState

                 stAcknowledged' :: Map txid Int
stAcknowledged' = (Int -> Int -> Int) -> Map txid Int -> Map txid Int -> Map txid Int
forall k a. Ord k => (a -> a -> a) -> Map k a -> Map k a -> Map k a
Map.unionWith Int -> Int -> Int
forall a. Num a => a -> a -> a
(+) Map txid Int
stAcknowledged Map txid Int
txIdsToAck
                 stInSubmissionToMempoolTxs' :: Set txid
stInSubmissionToMempoolTxs' = Set txid
stInSubmissionToMempoolTxs
                                            Set txid -> Set txid -> Set txid
forall a. Semigroup a => a -> a -> a
<> [txid] -> Set txid
forall a. Ord a => [a] -> Set a
Set.fromList (((txid, tx) -> txid) -> [(txid, tx)] -> [txid]
forall a b. (a -> b) -> [a] -> [b]
map (txid, tx) -> txid
forall a b. (a, b) -> a
fst [(txid, tx)]
listOfTxsToMempool)
             in
             if PeerTxState txid tx -> NumTxIdsToReq
forall txid tx. PeerTxState txid tx -> NumTxIdsToReq
requestedTxIdsInflight PeerTxState txid tx
peerTxState' NumTxIdsToReq -> NumTxIdsToReq -> Bool
forall a. Ord a => a -> a -> Bool
> NumTxIdsToReq
0
               then
                 -- we have txids to request
                 ( St peeraddr txid tx
st { stAcknowledged             = stAcknowledged'
                      , stInSubmissionToMempoolTxs = stInSubmissionToMempoolTxs' }
                 , ( (peeraddr
peeraddr, PeerTxState txid tx
peerTxState')
                     , TxDecision { txdTxIdsToAcknowledge :: NumTxIdsToAck
txdTxIdsToAcknowledge = NumTxIdsToAck
numTxIdsToAck,
                                    txdTxIdsToRequest :: NumTxIdsToReq
txdTxIdsToRequest     = NumTxIdsToReq
numTxIdsToReq,
                                    txdPipelineTxIds :: Bool
txdPipelineTxIds      = Bool -> Bool
not
                                                          (Bool -> Bool)
-> (PeerTxState txid tx -> Bool) -> PeerTxState txid tx -> Bool
forall b c a. (b -> c) -> (a -> b) -> a -> c
. StrictSeq txid -> Bool
forall a. StrictSeq a -> Bool
StrictSeq.null
                                                          (StrictSeq txid -> Bool)
-> (PeerTxState txid tx -> StrictSeq txid)
-> PeerTxState txid tx
-> Bool
forall b c a. (b -> c) -> (a -> b) -> a -> c
. PeerTxState txid tx -> StrictSeq txid
forall txid tx. PeerTxState txid tx -> StrictSeq txid
unacknowledgedTxIds
                                                          (PeerTxState txid tx -> Bool) -> PeerTxState txid tx -> Bool
forall a b. (a -> b) -> a -> b
$ PeerTxState txid tx
peerTxState',
                                    txdTxsToRequest :: Map txid SizeInBytes
txdTxsToRequest       = Map txid SizeInBytes
forall k a. Map k a
Map.empty,
                                    txdTxsToMempool :: TxsToMempool txid tx
txdTxsToMempool       = TxsToMempool txid tx
txsToMempool
                                  }
                     )
                 )
               else
                 -- there are no `txid`s to request, nor we can request `tx`s due
                 -- to in-flight size limits
                 ( St peeraddr txid tx
st
                 , ( (peeraddr
peeraddr, PeerTxState txid tx
peerTxState')
                   , TxDecision txid tx
forall txid tx. TxDecision txid tx
emptyTxDecision
                   )
                 )
        else
          let requestedTxsInflightSize' :: SizeInBytes
              txsToRequestMap :: Map txid SizeInBytes

              (SizeInBytes
requestedTxsInflightSize', Map txid SizeInBytes
txsToRequestMap) =
                -- inner fold: fold available `txid`s
                --
                -- Note: although `Map.foldrWithKey` could be used here, it
                -- does not allow to short circuit the fold, unlike
                -- `foldWithState`.
                ((txid, (SizeInBytes, Int))
 -> SizeInBytes -> Maybe (SizeInBytes, (txid, SizeInBytes)))
-> [(txid, (SizeInBytes, Int))]
-> SizeInBytes
-> (SizeInBytes, Map txid SizeInBytes)
forall s a b c.
Ord b =>
(a -> s -> Maybe (s, (b, c))) -> [a] -> s -> (s, Map b c)
foldWithState
                  (\(txid
txid, (SizeInBytes
txSize, Int
inflightMultiplicity)) SizeInBytes
sizeInflight ->
                    if -- note that we pick `txid`'s as long the `s` is
                       -- smaller or equal to `txsSizeInflightPerPeer`.
                       SizeInBytes
sizeInflight SizeInBytes -> SizeInBytes -> Bool
forall a. Ord a => a -> a -> Bool
<= SizeInBytes
txsSizeInflightPerPeer
                       -- overall `tx`'s in-flight must be smaller than
                       -- `maxTxsSizeInflight`
                    Bool -> Bool -> Bool
&& SizeInBytes
sizeInflight SizeInBytes -> SizeInBytes -> SizeInBytes
forall a. Num a => a -> a -> a
+ SizeInBytes
sizeInflightOther SizeInBytes -> SizeInBytes -> Bool
forall a. Ord a => a -> a -> Bool
<= SizeInBytes
maxTxsSizeInflight
                       -- the transaction must not be downloaded from more
                       -- than `txInflightMultiplicity` peers simultaneously
                    Bool -> Bool -> Bool
&& Int
inflightMultiplicity Int -> Int -> Bool
forall a. Ord a => a -> a -> Bool
< Int
txInflightMultiplicity
                    -- TODO: we must validate that `txSize` is smaller than
                    -- maximum txs size
                    then (SizeInBytes, (txid, SizeInBytes))
-> Maybe (SizeInBytes, (txid, SizeInBytes))
forall a. a -> Maybe a
Just (SizeInBytes
sizeInflight SizeInBytes -> SizeInBytes -> SizeInBytes
forall a. Num a => a -> a -> a
+ SizeInBytes
txSize, (txid
txid, SizeInBytes
txSize))
                    else Maybe (SizeInBytes, (txid, SizeInBytes))
forall a. Maybe a
Nothing
                  )
                  (Map txid (SizeInBytes, Int) -> [(txid, (SizeInBytes, Int))]
forall k a. Map k a -> [(k, a)]
Map.assocs (Map txid (SizeInBytes, Int) -> [(txid, (SizeInBytes, Int))])
-> Map txid (SizeInBytes, Int) -> [(txid, (SizeInBytes, Int))]
forall a b. (a -> b) -> a -> b
$
                    -- merge `availableTxIds` with `stInflight`, so we don't
                    -- need to lookup into `stInflight` on every `txid` which
                    -- is in `availableTxIds`.
                    SimpleWhenMissing txid SizeInBytes (SizeInBytes, Int)
-> SimpleWhenMissing txid Int (SizeInBytes, Int)
-> SimpleWhenMatched txid SizeInBytes Int (SizeInBytes, Int)
-> Map txid SizeInBytes
-> Map txid Int
-> Map txid (SizeInBytes, Int)
forall k a c b.
Ord k =>
SimpleWhenMissing k a c
-> SimpleWhenMissing k b c
-> SimpleWhenMatched k a b c
-> Map k a
-> Map k b
-> Map k c
Map.merge ((txid -> SizeInBytes -> Maybe (SizeInBytes, Int))
-> SimpleWhenMissing txid SizeInBytes (SizeInBytes, Int)
forall (f :: * -> *) k x y.
Applicative f =>
(k -> x -> Maybe y) -> WhenMissing f k x y
Map.mapMaybeMissing \txid
_txid -> (SizeInBytes, Int) -> Maybe (SizeInBytes, Int)
forall a. a -> Maybe a
Just ((SizeInBytes, Int) -> Maybe (SizeInBytes, Int))
-> (SizeInBytes -> (SizeInBytes, Int))
-> SizeInBytes
-> Maybe (SizeInBytes, Int)
forall b c a. (b -> c) -> (a -> b) -> a -> c
. (,Int
0))
                               SimpleWhenMissing txid Int (SizeInBytes, Int)
forall (f :: * -> *) k x y. Applicative f => WhenMissing f k x y
Map.dropMissing
                              ((txid -> SizeInBytes -> Int -> (SizeInBytes, Int))
-> SimpleWhenMatched txid SizeInBytes Int (SizeInBytes, Int)
forall (f :: * -> *) k x y z.
Applicative f =>
(k -> x -> y -> z) -> WhenMatched f k x y z
Map.zipWithMatched \txid
_txid -> (,))

                              Map txid SizeInBytes
availableTxIds
                              Map txid Int
stInflight
                    -- remove `tx`s which were already downloaded by some
                    -- other peer or are in-flight or unknown by this peer.
                    Map txid (SizeInBytes, Int)
-> Set txid -> Map txid (SizeInBytes, Int)
forall k a. Ord k => Map k a -> Set k -> Map k a
`Map.withoutKeys`
                    (Map txid (Maybe tx) -> Set txid
forall k a. Map k a -> Set k
Map.keysSet Map txid (Maybe tx)
bufferedTxs Set txid -> Set txid -> Set txid
forall a. Semigroup a => a -> a -> a
<> Set txid
requestedTxsInflight Set txid -> Set txid -> Set txid
forall a. Semigroup a => a -> a -> a
<> Set txid
unknownTxs
                        Set txid -> Set txid -> Set txid
forall a. Semigroup a => a -> a -> a
<> Set txid
stInSubmissionToMempoolTxs)

                  )
                  SizeInBytes
requestedTxsInflightSize
                  -- pick from `txid`'s which are available from that given
                  -- peer.  Since we are folding a dictionary each `txid`
                  -- will be selected only once from a given peer (at least
                  -- in each round).

              txsToRequest :: Set txid
txsToRequest = Map txid SizeInBytes -> Set txid
forall k a. Map k a -> Set k
Map.keysSet Map txid SizeInBytes
txsToRequestMap
              peerTxState' :: PeerTxState txid tx
peerTxState' = PeerTxState txid tx
peerTxState {
                  requestedTxsInflightSize = requestedTxsInflightSize',
                  requestedTxsInflight     = requestedTxsInflight
                                          <> txsToRequest
                }

              ( NumTxIdsToAck
numTxIdsToAck
                , NumTxIdsToReq
numTxIdsToReq
                , txsToMempool :: TxsToMempool txid tx
txsToMempool@TxsToMempool { [(txid, tx)]
listOfTxsToMempool :: forall txid tx. TxsToMempool txid tx -> [(txid, tx)]
listOfTxsToMempool :: [(txid, tx)]
listOfTxsToMempool }
                , RefCountDiff { Map txid Int
txIdsToAck :: forall txid. RefCountDiff txid -> Map txid Int
txIdsToAck :: Map txid Int
txIdsToAck }
                , PeerTxState txid tx
peerTxState''
                ) = TxDecisionPolicy
-> SharedTxState peeraddr txid tx
-> PeerTxState txid tx
-> (NumTxIdsToAck, NumTxIdsToReq, TxsToMempool txid tx,
    RefCountDiff txid, PeerTxState txid tx)
forall peeraddr tx txid.
(Ord txid, HasCallStack) =>
TxDecisionPolicy
-> SharedTxState peeraddr txid tx
-> PeerTxState txid tx
-> (NumTxIdsToAck, NumTxIdsToReq, TxsToMempool txid tx,
    RefCountDiff txid, PeerTxState txid tx)
acknowledgeTxIds TxDecisionPolicy
policy SharedTxState peeraddr txid tx
sharedState PeerTxState txid tx
peerTxState'

              stAcknowledged' :: Map txid Int
stAcknowledged' = (Int -> Int -> Int) -> Map txid Int -> Map txid Int -> Map txid Int
forall k a. Ord k => (a -> a -> a) -> Map k a -> Map k a -> Map k a
Map.unionWith Int -> Int -> Int
forall a. Num a => a -> a -> a
(+) Map txid Int
stAcknowledged Map txid Int
txIdsToAck

              stInflightDelta :: Map txid Int
              stInflightDelta :: Map txid Int
stInflightDelta = (txid -> Int) -> Set txid -> Map txid Int
forall k a. (k -> a) -> Set k -> Map k a
Map.fromSet (\txid
_ -> Int
1) Set txid
txsToRequest
                                -- note: this is right since every `txid`
                                -- could be picked at most once

              stInflight' :: Map txid Int
              stInflight' :: Map txid Int
stInflight' = (Int -> Int -> Int) -> Map txid Int -> Map txid Int -> Map txid Int
forall k a. Ord k => (a -> a -> a) -> Map k a -> Map k a -> Map k a
Map.unionWith Int -> Int -> Int
forall a. Num a => a -> a -> a
(+) Map txid Int
stInflightDelta Map txid Int
stInflight

              stInSubmissionToMempoolTxs' :: Set txid
stInSubmissionToMempoolTxs' = Set txid
stInSubmissionToMempoolTxs
                                         Set txid -> Set txid -> Set txid
forall a. Semigroup a => a -> a -> a
<> [txid] -> Set txid
forall a. Ord a => [a] -> Set a
Set.fromList (((txid, tx) -> txid) -> [(txid, tx)] -> [txid]
forall a b. (a -> b) -> [a] -> [b]
map (txid, tx) -> txid
forall a b. (a, b) -> a
fst [(txid, tx)]
listOfTxsToMempool)
          in
            if PeerTxState txid tx -> NumTxIdsToReq
forall txid tx. PeerTxState txid tx -> NumTxIdsToReq
requestedTxIdsInflight PeerTxState txid tx
peerTxState'' NumTxIdsToReq -> NumTxIdsToReq -> Bool
forall a. Ord a => a -> a -> Bool
> NumTxIdsToReq
0
              then
                -- we can request `txid`s & `tx`s
                ( St { stInflight :: Map txid Int
stInflight                 = Map txid Int
stInflight',
                       stInflightSize :: SizeInBytes
stInflightSize             = SizeInBytes
sizeInflightOther SizeInBytes -> SizeInBytes -> SizeInBytes
forall a. Num a => a -> a -> a
+ SizeInBytes
requestedTxsInflightSize',
                       stAcknowledged :: Map txid Int
stAcknowledged             = Map txid Int
stAcknowledged',
                       stInSubmissionToMempoolTxs :: Set txid
stInSubmissionToMempoolTxs = Set txid
stInSubmissionToMempoolTxs' }
                , ( (peeraddr
peeraddr, PeerTxState txid tx
peerTxState'')
                  , TxDecision { txdTxIdsToAcknowledge :: NumTxIdsToAck
txdTxIdsToAcknowledge = NumTxIdsToAck
numTxIdsToAck,
                                 txdPipelineTxIds :: Bool
txdPipelineTxIds      = Bool -> Bool
not
                                                       (Bool -> Bool)
-> (PeerTxState txid tx -> Bool) -> PeerTxState txid tx -> Bool
forall b c a. (b -> c) -> (a -> b) -> a -> c
. StrictSeq txid -> Bool
forall a. StrictSeq a -> Bool
StrictSeq.null
                                                       (StrictSeq txid -> Bool)
-> (PeerTxState txid tx -> StrictSeq txid)
-> PeerTxState txid tx
-> Bool
forall b c a. (b -> c) -> (a -> b) -> a -> c
. PeerTxState txid tx -> StrictSeq txid
forall txid tx. PeerTxState txid tx -> StrictSeq txid
unacknowledgedTxIds
                                                       (PeerTxState txid tx -> Bool) -> PeerTxState txid tx -> Bool
forall a b. (a -> b) -> a -> b
$ PeerTxState txid tx
peerTxState'',
                                 txdTxIdsToRequest :: NumTxIdsToReq
txdTxIdsToRequest     = NumTxIdsToReq
numTxIdsToReq,
                                 txdTxsToRequest :: Map txid SizeInBytes
txdTxsToRequest       = Map txid SizeInBytes
txsToRequestMap,
                                 txdTxsToMempool :: TxsToMempool txid tx
txdTxsToMempool       = TxsToMempool txid tx
txsToMempool
                               }
                  )
                )
              else
                -- there are no `txid`s to request, only `tx`s.
                ( St peeraddr txid tx
st { stInflight                 = stInflight',
                       stInflightSize             = sizeInflightOther + requestedTxsInflightSize',
                       stInSubmissionToMempoolTxs = stInSubmissionToMempoolTxs'
                     }
                , ( (peeraddr
peeraddr, PeerTxState txid tx
peerTxState'')
                  , TxDecision txid tx
forall txid tx. TxDecision txid tx
emptyTxDecision { txdTxsToRequest = txsToRequestMap }
                  )
                )

    gn :: ( St peeraddr txid tx
          , [((peeraddr, PeerTxState txid tx), TxDecision txid tx)]
          )
       -> ( SharedTxState peeraddr txid tx
          , [(peeraddr, TxDecision txid tx)]
          )
    gn :: (St peeraddr txid tx,
 [((peeraddr, PeerTxState txid tx), TxDecision txid tx)])
-> (SharedTxState peeraddr txid tx,
    [(peeraddr, TxDecision txid tx)])
gn
      ( St { Map txid Int
stInflight :: forall peeraddr txid tx. St peeraddr txid tx -> Map txid Int
stInflight :: Map txid Int
stInflight,
             SizeInBytes
stInflightSize :: forall peeraddr txid tx. St peeraddr txid tx -> SizeInBytes
stInflightSize :: SizeInBytes
stInflightSize,
             Map txid Int
stAcknowledged :: forall peeraddr txid tx. St peeraddr txid tx -> Map txid Int
stAcknowledged :: Map txid Int
stAcknowledged }
      , [((peeraddr, PeerTxState txid tx), TxDecision txid tx)]
as
      )
      =
      let peerTxStates' :: Map peeraddr (PeerTxState txid tx)
peerTxStates' = [(peeraddr, PeerTxState txid tx)]
-> Map peeraddr (PeerTxState txid tx)
forall k a. Ord k => [(k, a)] -> Map k a
Map.fromList ((\((peeraddr, PeerTxState txid tx)
a,TxDecision txid tx
_) -> (peeraddr, PeerTxState txid tx)
a) (((peeraddr, PeerTxState txid tx), TxDecision txid tx)
 -> (peeraddr, PeerTxState txid tx))
-> [((peeraddr, PeerTxState txid tx), TxDecision txid tx)]
-> [(peeraddr, PeerTxState txid tx)]
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> [((peeraddr, PeerTxState txid tx), TxDecision txid tx)]
as)
                       Map peeraddr (PeerTxState txid tx)
-> Map peeraddr (PeerTxState txid tx)
-> Map peeraddr (PeerTxState txid tx)
forall a. Semigroup a => a -> a -> a
<> Map peeraddr (PeerTxState txid tx)
peerTxStates

          referenceCounts' :: Map txid Int
referenceCounts' =
            SimpleWhenMissing txid Int Int
-> SimpleWhenMissing txid Int Int
-> SimpleWhenMatched txid Int Int Int
-> Map txid Int
-> Map txid Int
-> Map txid Int
forall k a c b.
Ord k =>
SimpleWhenMissing k a c
-> SimpleWhenMissing k b c
-> SimpleWhenMatched k a b c
-> Map k a
-> Map k b
-> Map k c
Map.merge ((txid -> Int -> Maybe Int) -> SimpleWhenMissing txid Int Int
forall (f :: * -> *) k x y.
Applicative f =>
(k -> x -> Maybe y) -> WhenMissing f k x y
Map.mapMaybeMissing \txid
_ Int
x -> Int -> Maybe Int
forall a. a -> Maybe a
Just Int
x)
                      ((txid -> Int -> Maybe Int) -> SimpleWhenMissing txid Int Int
forall (f :: * -> *) k x y.
Applicative f =>
(k -> x -> Maybe y) -> WhenMissing f k x y
Map.mapMaybeMissing \txid
_ Int
_ -> Bool -> Maybe Int -> Maybe Int
forall a. HasCallStack => Bool -> a -> a
assert Bool
False Maybe Int
forall a. Maybe a
Nothing)
                      ((txid -> Int -> Int -> Maybe Int)
-> SimpleWhenMatched txid Int Int Int
forall (f :: * -> *) k x y z.
Applicative f =>
(k -> x -> y -> Maybe z) -> WhenMatched f k x y z
Map.zipWithMaybeMatched \txid
_ Int
x Int
y -> if Int
x Int -> Int -> Bool
forall a. Ord a => a -> a -> Bool
> Int
y then Int -> Maybe Int
forall a. a -> Maybe a
Just (Int -> Maybe Int) -> Int -> Maybe Int
forall a b. (a -> b) -> a -> b
$! Int
x Int -> Int -> Int
forall a. Num a => a -> a -> a
- Int
y
                                                                  else Maybe Int
forall a. Maybe a
Nothing)
                      Map txid Int
referenceCounts
                      Map txid Int
stAcknowledged

          liveSet :: Set txid
liveSet = Map txid Int -> Set txid
forall k a. Map k a -> Set k
Map.keysSet Map txid Int
referenceCounts'

          bufferedTxs' :: Map txid (Maybe tx)
bufferedTxs' = Map txid (Maybe tx)
bufferedTxs
                         Map txid (Maybe tx) -> Set txid -> Map txid (Maybe tx)
forall k a. Ord k => Map k a -> Set k -> Map k a
`Map.restrictKeys`
                         Set txid
liveSet

          inSubmissionToMempoolTxs' :: Map txid Int
inSubmissionToMempoolTxs' =
            (Map txid Int
 -> ((peeraddr, PeerTxState txid tx), TxDecision txid tx)
 -> Map txid Int)
-> Map txid Int
-> [((peeraddr, PeerTxState txid tx), TxDecision txid tx)]
-> Map txid Int
forall b a. (b -> a -> b) -> b -> [a] -> b
forall (t :: * -> *) b a.
Foldable t =>
(b -> a -> b) -> b -> t a -> b
List.foldl' Map txid Int
-> ((peeraddr, PeerTxState txid tx), TxDecision txid tx)
-> Map txid Int
forall a. Map txid Int -> (a, TxDecision txid tx) -> Map txid Int
updateInSubmissionToMempoolTxs Map txid Int
inSubmissionToMempoolTxs [((peeraddr, PeerTxState txid tx), TxDecision txid tx)]
as

      in ( SharedTxState peeraddr txid tx
sharedState {
             peerTxStates             = peerTxStates',
             inflightTxs              = stInflight,
             inflightTxsSize          = stInflightSize,
             bufferedTxs              = bufferedTxs',
             referenceCounts          = referenceCounts',
             inSubmissionToMempoolTxs = inSubmissionToMempoolTxs'}
         , -- exclude empty results
           (((peeraddr, PeerTxState txid tx), TxDecision txid tx)
 -> Maybe (peeraddr, TxDecision txid tx))
-> [((peeraddr, PeerTxState txid tx), TxDecision txid tx)]
-> [(peeraddr, TxDecision txid tx)]
forall a b. (a -> Maybe b) -> [a] -> [b]
mapMaybe (\((peeraddr
a, PeerTxState txid tx
_), TxDecision txid tx
b) -> case TxDecision txid tx
b of
                      TxDecision { txdTxIdsToAcknowledge :: forall txid tx. TxDecision txid tx -> NumTxIdsToAck
txdTxIdsToAcknowledge = NumTxIdsToAck
0,
                                   txdTxIdsToRequest :: forall txid tx. TxDecision txid tx -> NumTxIdsToReq
txdTxIdsToRequest     = NumTxIdsToReq
0,
                                   Map txid SizeInBytes
txdTxsToRequest :: forall txid tx. TxDecision txid tx -> Map txid SizeInBytes
txdTxsToRequest :: Map txid SizeInBytes
txdTxsToRequest,
                                   txdTxsToMempool :: forall txid tx. TxDecision txid tx -> TxsToMempool txid tx
txdTxsToMempool = TxsToMempool { [(txid, tx)]
listOfTxsToMempool :: forall txid tx. TxsToMempool txid tx -> [(txid, tx)]
listOfTxsToMempool :: [(txid, tx)]
listOfTxsToMempool } }
                                 | Map txid SizeInBytes -> Bool
forall a. Map txid a -> Bool
forall (t :: * -> *) a. Foldable t => t a -> Bool
null Map txid SizeInBytes
txdTxsToRequest
                                 , [(txid, tx)] -> Bool
forall a. [a] -> Bool
forall (t :: * -> *) a. Foldable t => t a -> Bool
null [(txid, tx)]
listOfTxsToMempool
                                 -> Maybe (peeraddr, TxDecision txid tx)
forall a. Maybe a
Nothing
                      TxDecision txid tx
_          -> (peeraddr, TxDecision txid tx)
-> Maybe (peeraddr, TxDecision txid tx)
forall a. a -> Maybe a
Just (peeraddr
a, TxDecision txid tx
b)
                    )
                    [((peeraddr, PeerTxState txid tx), TxDecision txid tx)]
as
         )

      where
        updateInSubmissionToMempoolTxs
          :: forall a.
             Map txid Int
          -> (a, TxDecision txid tx)
          -> Map txid Int
        updateInSubmissionToMempoolTxs :: forall a. Map txid Int -> (a, TxDecision txid tx) -> Map txid Int
updateInSubmissionToMempoolTxs Map txid Int
m (a
_,TxDecision { TxsToMempool txid tx
txdTxsToMempool :: forall txid tx. TxDecision txid tx -> TxsToMempool txid tx
txdTxsToMempool :: TxsToMempool txid tx
txdTxsToMempool } ) =
            (Map txid Int -> (txid, tx) -> Map txid Int)
-> Map txid Int -> [(txid, tx)] -> Map txid Int
forall b a. (b -> a -> b) -> b -> [a] -> b
forall (t :: * -> *) b a.
Foldable t =>
(b -> a -> b) -> b -> t a -> b
List.foldl' Map txid Int -> (txid, tx) -> Map txid Int
fn Map txid Int
m (TxsToMempool txid tx -> [(txid, tx)]
forall txid tx. TxsToMempool txid tx -> [(txid, tx)]
listOfTxsToMempool TxsToMempool txid tx
txdTxsToMempool)
          where
            fn :: Map txid Int
               -> (txid,tx)
               -> Map txid Int
            fn :: Map txid Int -> (txid, tx) -> Map txid Int
fn Map txid Int
x (txid
txid,tx
_) = (Maybe Int -> Maybe Int) -> txid -> Map txid Int -> Map txid Int
forall k a.
Ord k =>
(Maybe a -> Maybe a) -> k -> Map k a -> Map k a
Map.alter (\case Maybe Int
Nothing -> Int -> Maybe Int
forall a. a -> Maybe a
Just Int
1
                                             Just Int
n  -> Int -> Maybe Int
forall a. a -> Maybe a
Just (Int -> Maybe Int) -> Int -> Maybe Int
forall a b. (a -> b) -> a -> b
$! Int -> Int
forall a. Enum a => a -> a
succ Int
n) txid
txid Map txid Int
x


-- | Filter peers which can either download a `tx` or acknowledge `txid`s.
--
filterActivePeers
    :: forall peeraddr txid tx.
       Ord txid
    => HasCallStack
    => TxDecisionPolicy
    -> SharedTxState peeraddr txid tx
    -> Map peeraddr (PeerTxState txid tx)
filterActivePeers :: forall peeraddr txid tx.
(Ord txid, HasCallStack) =>
TxDecisionPolicy
-> SharedTxState peeraddr txid tx
-> Map peeraddr (PeerTxState txid tx)
filterActivePeers
    policy :: TxDecisionPolicy
policy@TxDecisionPolicy {
      NumTxIdsToReq
maxUnacknowledgedTxIds :: NumTxIdsToReq
maxUnacknowledgedTxIds :: TxDecisionPolicy -> NumTxIdsToReq
maxUnacknowledgedTxIds,
      SizeInBytes
txsSizeInflightPerPeer :: TxDecisionPolicy -> SizeInBytes
txsSizeInflightPerPeer :: SizeInBytes
txsSizeInflightPerPeer,
      SizeInBytes
maxTxsSizeInflight :: TxDecisionPolicy -> SizeInBytes
maxTxsSizeInflight :: SizeInBytes
maxTxsSizeInflight,
      Int
txInflightMultiplicity :: TxDecisionPolicy -> Int
txInflightMultiplicity :: Int
txInflightMultiplicity
    }
    sharedTxState :: SharedTxState peeraddr txid tx
sharedTxState@SharedTxState {
      Map peeraddr (PeerTxState txid tx)
peerTxStates :: forall peeraddr txid tx.
SharedTxState peeraddr txid tx
-> Map peeraddr (PeerTxState txid tx)
peerTxStates :: Map peeraddr (PeerTxState txid tx)
peerTxStates,
      Map txid (Maybe tx)
bufferedTxs :: forall peeraddr txid tx.
SharedTxState peeraddr txid tx -> Map txid (Maybe tx)
bufferedTxs :: Map txid (Maybe tx)
bufferedTxs,
      Map txid Int
inflightTxs :: forall peeraddr txid tx.
SharedTxState peeraddr txid tx -> Map txid Int
inflightTxs :: Map txid Int
inflightTxs,
      SizeInBytes
inflightTxsSize :: forall peeraddr txid tx.
SharedTxState peeraddr txid tx -> SizeInBytes
inflightTxsSize :: SizeInBytes
inflightTxsSize,
      Map txid Int
inSubmissionToMempoolTxs :: forall peeraddr txid tx.
SharedTxState peeraddr txid tx -> Map txid Int
inSubmissionToMempoolTxs :: Map txid Int
inSubmissionToMempoolTxs
    }
    | SizeInBytes
inflightTxsSize SizeInBytes -> SizeInBytes -> Bool
forall a. Ord a => a -> a -> Bool
> SizeInBytes
maxTxsSizeInflight
      -- we might be able to request txids, we cannot download txs
    = (PeerTxState txid tx -> Bool)
-> Map peeraddr (PeerTxState txid tx)
-> Map peeraddr (PeerTxState txid tx)
forall a k. (a -> Bool) -> Map k a -> Map k a
Map.filter PeerTxState txid tx -> Bool
fn Map peeraddr (PeerTxState txid tx)
peerTxStates
    | Bool
otherwise
      -- we might be able to request txids or txs.
    = (PeerTxState txid tx -> Bool)
-> Map peeraddr (PeerTxState txid tx)
-> Map peeraddr (PeerTxState txid tx)
forall a k. (a -> Bool) -> Map k a -> Map k a
Map.filter PeerTxState txid tx -> Bool
gn Map peeraddr (PeerTxState txid tx)
peerTxStates
  where
    unrequestable :: Set txid
unrequestable = Map txid Int -> Set txid
forall k a. Map k a -> Set k
Map.keysSet ((Int -> Bool) -> Map txid Int -> Map txid Int
forall a k. (a -> Bool) -> Map k a -> Map k a
Map.filter (Int -> Int -> Bool
forall a. Ord a => a -> a -> Bool
>= Int
txInflightMultiplicity) Map txid Int
inflightTxs)
                 Set txid -> Set txid -> Set txid
forall a. Semigroup a => a -> a -> a
<> Map txid (Maybe tx) -> Set txid
forall k a. Map k a -> Set k
Map.keysSet Map txid (Maybe tx)
bufferedTxs

    fn :: PeerTxState txid tx -> Bool
    fn :: PeerTxState txid tx -> Bool
fn peerTxState :: PeerTxState txid tx
peerTxState@PeerTxState {
        NumTxIdsToReq
requestedTxIdsInflight :: forall txid tx. PeerTxState txid tx -> NumTxIdsToReq
requestedTxIdsInflight :: NumTxIdsToReq
requestedTxIdsInflight
       } =
           NumTxIdsToReq
requestedTxIdsInflight NumTxIdsToReq -> NumTxIdsToReq -> Bool
forall a. Eq a => a -> a -> Bool
== NumTxIdsToReq
0
           -- if a peer has txids in-flight, we cannot request more txids or txs.
        Bool -> Bool -> Bool
&& NumTxIdsToReq
requestedTxIdsInflight NumTxIdsToReq -> NumTxIdsToReq -> NumTxIdsToReq
forall a. Num a => a -> a -> a
+ NumTxIdsToReq
numOfUnacked NumTxIdsToReq -> NumTxIdsToReq -> Bool
forall a. Ord a => a -> a -> Bool
<= NumTxIdsToReq
maxUnacknowledgedTxIds
        Bool -> Bool -> Bool
&& NumTxIdsToReq
txIdsToRequest NumTxIdsToReq -> NumTxIdsToReq -> Bool
forall a. Ord a => a -> a -> Bool
> NumTxIdsToReq
0
      where
        -- Split `unacknowledgedTxIds'` into the longest prefix of `txid`s which
        -- can be acknowledged and the unacknowledged `txid`s.
        (NumTxIdsToReq
txIdsToRequest, StrictSeq txid
_, StrictSeq txid
unackedTxIds) = TxDecisionPolicy
-> SharedTxState peeraddr txid tx
-> PeerTxState txid tx
-> (NumTxIdsToReq, StrictSeq txid, StrictSeq txid)
forall txid peer tx.
(Ord txid, HasCallStack) =>
TxDecisionPolicy
-> SharedTxState peer txid tx
-> PeerTxState txid tx
-> (NumTxIdsToReq, StrictSeq txid, StrictSeq txid)
splitAcknowledgedTxIds TxDecisionPolicy
policy SharedTxState peeraddr txid tx
sharedTxState PeerTxState txid tx
peerTxState
        numOfUnacked :: NumTxIdsToReq
numOfUnacked = Int -> NumTxIdsToReq
forall a b. (Integral a, Num b) => a -> b
fromIntegral (StrictSeq txid -> Int
forall a. StrictSeq a -> Int
StrictSeq.length StrictSeq txid
unackedTxIds)

    gn :: PeerTxState txid tx -> Bool
    gn :: PeerTxState txid tx -> Bool
gn peerTxState :: PeerTxState txid tx
peerTxState@PeerTxState { StrictSeq txid
unacknowledgedTxIds :: forall txid tx. PeerTxState txid tx -> StrictSeq txid
unacknowledgedTxIds :: StrictSeq txid
unacknowledgedTxIds,
                                 NumTxIdsToReq
requestedTxIdsInflight :: forall txid tx. PeerTxState txid tx -> NumTxIdsToReq
requestedTxIdsInflight :: NumTxIdsToReq
requestedTxIdsInflight,
                                 Set txid
requestedTxsInflight :: forall txid tx. PeerTxState txid tx -> Set txid
requestedTxsInflight :: Set txid
requestedTxsInflight,
                                 SizeInBytes
requestedTxsInflightSize :: forall txid tx. PeerTxState txid tx -> SizeInBytes
requestedTxsInflightSize :: SizeInBytes
requestedTxsInflightSize,
                                 Map txid SizeInBytes
availableTxIds :: forall txid tx. PeerTxState txid tx -> Map txid SizeInBytes
availableTxIds :: Map txid SizeInBytes
availableTxIds,
                                 Set txid
unknownTxs :: forall txid tx. PeerTxState txid tx -> Set txid
unknownTxs :: Set txid
unknownTxs
                               } =
          (    NumTxIdsToReq
requestedTxIdsInflight NumTxIdsToReq -> NumTxIdsToReq -> Bool
forall a. Eq a => a -> a -> Bool
== NumTxIdsToReq
0
            Bool -> Bool -> Bool
&& NumTxIdsToReq
requestedTxIdsInflight NumTxIdsToReq -> NumTxIdsToReq -> NumTxIdsToReq
forall a. Num a => a -> a -> a
+ NumTxIdsToReq
numOfUnacked NumTxIdsToReq -> NumTxIdsToReq -> Bool
forall a. Ord a => a -> a -> Bool
<= NumTxIdsToReq
maxUnacknowledgedTxIds
            Bool -> Bool -> Bool
&& NumTxIdsToReq
txIdsToRequest NumTxIdsToReq -> NumTxIdsToReq -> Bool
forall a. Ord a => a -> a -> Bool
> NumTxIdsToReq
0
          )
        Bool -> Bool -> Bool
|| (Bool
underSizeLimit Bool -> Bool -> Bool
&& Bool -> Bool
not (Map txid SizeInBytes -> Bool
forall k a. Map k a -> Bool
Map.null Map txid SizeInBytes
downloadable))
      where
        numOfUnacked :: NumTxIdsToReq
numOfUnacked   = Int -> NumTxIdsToReq
forall a b. (Integral a, Num b) => a -> b
fromIntegral (StrictSeq txid -> Int
forall a. StrictSeq a -> Int
StrictSeq.length StrictSeq txid
unacknowledgedTxIds)
        underSizeLimit :: Bool
underSizeLimit = SizeInBytes
requestedTxsInflightSize SizeInBytes -> SizeInBytes -> Bool
forall a. Ord a => a -> a -> Bool
<= SizeInBytes
txsSizeInflightPerPeer
        downloadable :: Map txid SizeInBytes
downloadable   = Map txid SizeInBytes
availableTxIds
            Map txid SizeInBytes -> Set txid -> Map txid SizeInBytes
forall k a. Ord k => Map k a -> Set k -> Map k a
`Map.withoutKeys` Set txid
requestedTxsInflight
            Map txid SizeInBytes -> Set txid -> Map txid SizeInBytes
forall k a. Ord k => Map k a -> Set k -> Map k a
`Map.withoutKeys` Set txid
unknownTxs
            Map txid SizeInBytes -> Set txid -> Map txid SizeInBytes
forall k a. Ord k => Map k a -> Set k -> Map k a
`Map.withoutKeys` Set txid
unrequestable
            Map txid SizeInBytes -> Set txid -> Map txid SizeInBytes
forall k a. Ord k => Map k a -> Set k -> Map k a
`Map.withoutKeys` Map txid Int -> Set txid
forall k a. Map k a -> Set k
Map.keysSet Map txid Int
inSubmissionToMempoolTxs

        -- Split `unacknowledgedTxIds'` into the longest prefix of `txid`s which
        -- can be acknowledged and the unacknowledged `txid`s.
        (NumTxIdsToReq
txIdsToRequest, StrictSeq txid
_, StrictSeq txid
_) = TxDecisionPolicy
-> SharedTxState peeraddr txid tx
-> PeerTxState txid tx
-> (NumTxIdsToReq, StrictSeq txid, StrictSeq txid)
forall txid peer tx.
(Ord txid, HasCallStack) =>
TxDecisionPolicy
-> SharedTxState peer txid tx
-> PeerTxState txid tx
-> (NumTxIdsToReq, StrictSeq txid, StrictSeq txid)
splitAcknowledgedTxIds TxDecisionPolicy
policy SharedTxState peeraddr txid tx
sharedTxState PeerTxState txid tx
peerTxState

--
-- Auxiliary functions
--

-- | A fold with state implemented as a `foldr` to take advantage of fold-build
-- fusion optimisation.
--
foldWithState
  :: forall s a b c.
     Ord b
  => (a -> s -> Maybe (s, (b, c)))
  -> [a] -> s -> (s, Map b c)
{-# INLINE foldWithState #-}

foldWithState :: forall s a b c.
Ord b =>
(a -> s -> Maybe (s, (b, c))) -> [a] -> s -> (s, Map b c)
foldWithState a -> s -> Maybe (s, (b, c))
f = (a -> (s -> (s, Map b c)) -> s -> (s, Map b c))
-> (s -> (s, Map b c)) -> [a] -> s -> (s, Map b c)
forall a b. (a -> b -> b) -> b -> [a] -> b
forall (t :: * -> *) a b.
Foldable t =>
(a -> b -> b) -> b -> t a -> b
foldr a -> (s -> (s, Map b c)) -> s -> (s, Map b c)
cons s -> (s, Map b c)
nil
  where
    cons :: a
         -> (s -> (s, Map b c))
         -> (s -> (s, Map b c))
    cons :: a -> (s -> (s, Map b c)) -> s -> (s, Map b c)
cons a
a s -> (s, Map b c)
k = \ !s
s ->
      case a -> s -> Maybe (s, (b, c))
f a
a s
s of
        Maybe (s, (b, c))
Nothing -> s -> (s, Map b c)
nil s
s
        Just (!s
s', (!b
b, !c
c)) ->
          case b -> c -> Map b c -> Map b c
forall k a. Ord k => k -> a -> Map k a -> Map k a
Map.insert b
b c
c (Map b c -> Map b c) -> (s, Map b c) -> (s, Map b c)
forall b c a. (b -> c) -> (a, b) -> (a, c)
forall (p :: * -> * -> *) b c a.
Bifunctor p =>
(b -> c) -> p a b -> p a c
`second` s -> (s, Map b c)
k s
s' of
            r :: (s, Map b c)
r@(!s
_s, !Map b c
_bs) -> (s, Map b c)
r

    nil :: s -> (s, Map b c)
    nil :: s -> (s, Map b c)
nil = \ !s
s -> (s
s, Map b c
forall k a. Map k a
Map.empty)