{-# LANGUAGE BangPatterns        #-}
{-# LANGUAGE BlockArguments      #-}
{-# LANGUAGE LambdaCase          #-}
{-# LANGUAGE NamedFieldPuns      #-}
{-# LANGUAGE ScopedTypeVariables #-}

module Ouroboros.Network.TxSubmission.Inbound.V2.Registry
  ( TxChannels (..)
  , TxChannelsVar
  , TxMempoolSem
  , SharedTxStateVar
  , newSharedTxStateVar
  , newTxChannelsVar
  , newTxMempoolSem
  , PeerTxAPI (..)
  , decisionLogicThreads
  , withPeer
  ) where

import Control.Concurrent.Class.MonadMVar.Strict
import Control.Concurrent.Class.MonadSTM.Strict
import Control.Concurrent.Class.MonadSTM.TSem
import Control.Monad.Class.MonadAsync
import Control.Monad.Class.MonadFork
import Control.Monad.Class.MonadThrow
import Control.Monad.Class.MonadTime.SI
import Control.Monad.Class.MonadTimer.SI

import Data.Foldable as Foldable (foldl', traverse_)
import Data.Hashable
import Data.Map.Strict (Map)
import Data.Map.Strict qualified as Map
import Data.Maybe (fromMaybe)
import Data.Sequence.Strict (StrictSeq)
import Data.Sequence.Strict qualified as StrictSeq
import Data.Set qualified as Set
import Data.Typeable (Typeable)
import Data.Void (Void)

import Control.Tracer (Tracer, traceWith)
import Ouroboros.Network.Protocol.TxSubmission2.Type
import Ouroboros.Network.TxSubmission.Inbound.V2.Decision
import Ouroboros.Network.TxSubmission.Inbound.V2.Policy
import Ouroboros.Network.TxSubmission.Inbound.V2.State
import Ouroboros.Network.TxSubmission.Inbound.V2.Types
import Ouroboros.Network.TxSubmission.Mempool.Reader

-- | Communication channels between `TxSubmission` client mini-protocol and
-- decision logic.
--
newtype TxChannels m peeraddr txid tx = TxChannels {
      forall (m :: * -> *) peeraddr txid tx.
TxChannels m peeraddr txid tx
-> Map peeraddr (StrictMVar m (TxDecision txid tx))
txChannelMap :: Map peeraddr (StrictMVar m (TxDecision txid tx))
    }

type TxChannelsVar m peeraddr txid tx = StrictMVar m (TxChannels m peeraddr txid tx)

newTxChannelsVar :: MonadMVar m => m (TxChannelsVar m peeraddr txid tx)
newTxChannelsVar :: forall (m :: * -> *) peeraddr txid tx.
MonadMVar m =>
m (TxChannelsVar m peeraddr txid tx)
newTxChannelsVar = TxChannels m peeraddr txid tx
-> m (StrictMVar m (TxChannels m peeraddr txid tx))
forall (m :: * -> *) a. MonadMVar m => a -> m (StrictMVar m a)
newMVar (Map peeraddr (StrictMVar m (TxDecision txid tx))
-> TxChannels m peeraddr txid tx
forall (m :: * -> *) peeraddr txid tx.
Map peeraddr (StrictMVar m (TxDecision txid tx))
-> TxChannels m peeraddr txid tx
TxChannels Map peeraddr (StrictMVar m (TxDecision txid tx))
forall k a. Map k a
Map.empty)

newtype TxMempoolSem m = TxMempoolSem (TSem m)

newTxMempoolSem :: MonadSTM m => m (TxMempoolSem m)
newTxMempoolSem :: forall (m :: * -> *). MonadSTM m => m (TxMempoolSem m)
newTxMempoolSem = TSem m -> TxMempoolSem m
forall (m :: * -> *). TSem m -> TxMempoolSem m
TxMempoolSem (TSem m -> TxMempoolSem m) -> m (TSem m) -> m (TxMempoolSem m)
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> STM m (TSem m) -> m (TSem m)
forall a. HasCallStack => STM m a -> m a
forall (m :: * -> *) a.
(MonadSTM m, HasCallStack) =>
STM m a -> m a
atomically (Integer -> STM m (TSem m)
forall (m :: * -> *). MonadSTM m => Integer -> STM m (TSem m)
newTSem Integer
1)

-- | API to access `PeerTxState` inside `PeerTxStateVar`.
--
data PeerTxAPI m txid tx = PeerTxAPI {
    forall (m :: * -> *) txid tx.
PeerTxAPI m txid tx -> m (TxDecision txid tx)
readTxDecision      :: m (TxDecision txid tx),
    -- ^ a blocking action which reads `TxDecision`

    forall (m :: * -> *) txid tx.
PeerTxAPI m txid tx
-> NumTxIdsToReq -> StrictSeq txid -> Map txid SizeInBytes -> m ()
handleReceivedTxIds :: NumTxIdsToReq
                        -> StrictSeq txid
                        -- ^ received txids
                        -> Map txid SizeInBytes
                        -- ^ received sizes of advertised tx's
                        -> m (),
    -- ^ handle received txids

    forall (m :: * -> *) txid tx.
PeerTxAPI m txid tx
-> Map txid SizeInBytes
-> Map txid tx
-> m (Maybe TxSubmissionProtocolError)
handleReceivedTxs   :: Map txid SizeInBytes
                        -- ^ requested txids
                        -> Map txid tx
                        -- ^ received txs
                        -> m (Maybe TxSubmissionProtocolError),
    -- ^ handle received txs

    forall (m :: * -> *) txid tx.
PeerTxAPI m txid tx
-> Tracer m (TraceTxSubmissionInbound txid tx)
-> txid
-> tx
-> m ()
submitTxToMempool     :: Tracer m (TraceTxSubmissionInbound txid tx)
                          -> txid -> tx -> m ()
    -- ^ submit the given (txid, tx) to the mempool.
  }


data TxMempoolResult = TxAccepted | TxRejected

-- | A bracket function which registers / de-registers a new peer in
-- `SharedTxStateVar` and `PeerTxStateVar`s,  which exposes `PeerTxStateAPI`.
-- `PeerTxStateAPI` is only safe inside the  `withPeer` scope.
--
withPeer
    :: forall tx peeraddr txid idx m a.
       ( MonadMask m
       , MonadMVar m
       , MonadSTM  m
       , MonadMonotonicTime m
       , Ord txid
       , Show txid
       , Typeable txid
       , Ord peeraddr
       , Show peeraddr
       )
    => Tracer m (TraceTxLogic peeraddr txid tx)
    -> TxChannelsVar m peeraddr txid tx
    -> TxMempoolSem m
    -> TxDecisionPolicy
    -> SharedTxStateVar m peeraddr txid tx
    -> TxSubmissionMempoolReader txid tx idx m
    -> TxSubmissionMempoolWriter txid tx idx m
    -> (tx -> SizeInBytes)
    -> peeraddr
    --  ^ new peer
    -> (PeerTxAPI m txid tx -> m a)
    -- ^ callback which gives access to `PeerTxStateAPI`
    -> m a
withPeer :: forall tx peeraddr txid idx (m :: * -> *) a.
(MonadMask m, MonadMVar m, MonadSTM m, MonadMonotonicTime m,
 Ord txid, Show txid, Typeable txid, Ord peeraddr, Show peeraddr) =>
Tracer m (TraceTxLogic peeraddr txid tx)
-> TxChannelsVar m peeraddr txid tx
-> TxMempoolSem m
-> TxDecisionPolicy
-> SharedTxStateVar m peeraddr txid tx
-> TxSubmissionMempoolReader txid tx idx m
-> TxSubmissionMempoolWriter txid tx idx m
-> (tx -> SizeInBytes)
-> peeraddr
-> (PeerTxAPI m txid tx -> m a)
-> m a
withPeer Tracer m (TraceTxLogic peeraddr txid tx)
tracer
         TxChannelsVar m peeraddr txid tx
channelsVar
         (TxMempoolSem TSem m
mempoolSem)
         policy :: TxDecisionPolicy
policy@TxDecisionPolicy { DiffTime
bufferedTxsMinLifetime :: DiffTime
bufferedTxsMinLifetime :: TxDecisionPolicy -> DiffTime
bufferedTxsMinLifetime }
         SharedTxStateVar m peeraddr txid tx
sharedStateVar
         TxSubmissionMempoolReader { STM m (MempoolSnapshot txid tx idx)
mempoolGetSnapshot :: STM m (MempoolSnapshot txid tx idx)
mempoolGetSnapshot :: forall txid tx idx (m :: * -> *).
TxSubmissionMempoolReader txid tx idx m
-> STM m (MempoolSnapshot txid tx idx)
mempoolGetSnapshot }
         TxSubmissionMempoolWriter { [tx] -> m [txid]
mempoolAddTxs :: [tx] -> m [txid]
mempoolAddTxs :: forall txid tx idx (m :: * -> *).
TxSubmissionMempoolWriter txid tx idx m -> [tx] -> m [txid]
mempoolAddTxs }
         tx -> SizeInBytes
txSize
         peeraddr
peeraddr PeerTxAPI m txid tx -> m a
io =
    m (PeerTxAPI m txid tx)
-> (PeerTxAPI m txid tx -> m ())
-> (PeerTxAPI m txid tx -> m a)
-> m a
forall a b c. m a -> (a -> m b) -> (a -> m c) -> m c
forall (m :: * -> *) a b c.
MonadThrow m =>
m a -> (a -> m b) -> (a -> m c) -> m c
bracket
      (do -- create a communication channel
          !peerTxAPI <-
            TxChannelsVar m peeraddr txid tx
-> (TxChannels m peeraddr txid tx
    -> m (TxChannels m peeraddr txid tx, PeerTxAPI m txid tx))
-> m (PeerTxAPI m txid tx)
forall (m :: * -> *) a b.
MonadMVar m =>
StrictMVar m a -> (a -> m (a, b)) -> m b
modifyMVar TxChannelsVar m peeraddr txid tx
channelsVar
              \ TxChannels { Map peeraddr (StrictMVar m (TxDecision txid tx))
txChannelMap :: forall (m :: * -> *) peeraddr txid tx.
TxChannels m peeraddr txid tx
-> Map peeraddr (StrictMVar m (TxDecision txid tx))
txChannelMap :: Map peeraddr (StrictMVar m (TxDecision txid tx))
txChannelMap } -> do
                chann <- m (StrictMVar m (TxDecision txid tx))
forall (m :: * -> *) a. MonadMVar m => m (StrictMVar m a)
newEmptyMVar
                let (chann', txChannelMap') =
                      Map.alterF (\Maybe (StrictMVar m (TxDecision txid tx))
mbChann ->
                                   let !chann'' :: StrictMVar m (TxDecision txid tx)
chann'' = StrictMVar m (TxDecision txid tx)
-> Maybe (StrictMVar m (TxDecision txid tx))
-> StrictMVar m (TxDecision txid tx)
forall a. a -> Maybe a -> a
fromMaybe StrictMVar m (TxDecision txid tx)
chann Maybe (StrictMVar m (TxDecision txid tx))
mbChann
                                   in (StrictMVar m (TxDecision txid tx)
chann'', StrictMVar m (TxDecision txid tx)
-> Maybe (StrictMVar m (TxDecision txid tx))
forall a. a -> Maybe a
Just StrictMVar m (TxDecision txid tx)
chann''))
                                 peeraddr
                                 txChannelMap
                return
                  ( TxChannels { txChannelMap = txChannelMap' }
                  , PeerTxAPI { readTxDecision = takeMVar chann',
                                handleReceivedTxIds,
                                handleReceivedTxs,
                                submitTxToMempool }
                  )

          atomically $ modifyTVar sharedStateVar registerPeer
          return peerTxAPI
      )
      -- the handler is a short blocking operation, thus we need to use
      -- `uninterruptibleMask_`
      (\PeerTxAPI m txid tx
_ -> m () -> m ()
forall a. m a -> m a
forall (m :: * -> *) a. MonadMask m => m a -> m a
uninterruptibleMask_ do
        STM m () -> m ()
forall a. HasCallStack => STM m a -> m a
forall (m :: * -> *) a.
(MonadSTM m, HasCallStack) =>
STM m a -> m a
atomically (STM m () -> m ()) -> STM m () -> m ()
forall a b. (a -> b) -> a -> b
$ SharedTxStateVar m peeraddr txid tx
-> (SharedTxState peeraddr txid tx
    -> SharedTxState peeraddr txid tx)
-> STM m ()
forall (m :: * -> *) a.
MonadSTM m =>
StrictTVar m a -> (a -> a) -> STM m ()
modifyTVar SharedTxStateVar m peeraddr txid tx
sharedStateVar SharedTxState peeraddr txid tx -> SharedTxState peeraddr txid tx
unregisterPeer
        TxChannelsVar m peeraddr txid tx
-> (TxChannels m peeraddr txid tx
    -> m (TxChannels m peeraddr txid tx))
-> m ()
forall (m :: * -> *) a.
MonadMVar m =>
StrictMVar m a -> (a -> m a) -> m ()
modifyMVar_ TxChannelsVar m peeraddr txid tx
channelsVar
          \ TxChannels { Map peeraddr (StrictMVar m (TxDecision txid tx))
txChannelMap :: forall (m :: * -> *) peeraddr txid tx.
TxChannels m peeraddr txid tx
-> Map peeraddr (StrictMVar m (TxDecision txid tx))
txChannelMap :: Map peeraddr (StrictMVar m (TxDecision txid tx))
txChannelMap } ->
            TxChannels m peeraddr txid tx -> m (TxChannels m peeraddr txid tx)
forall a. a -> m a
forall (m :: * -> *) a. Monad m => a -> m a
return TxChannels { txChannelMap :: Map peeraddr (StrictMVar m (TxDecision txid tx))
txChannelMap = peeraddr
-> Map peeraddr (StrictMVar m (TxDecision txid tx))
-> Map peeraddr (StrictMVar m (TxDecision txid tx))
forall k a. Ord k => k -> Map k a -> Map k a
Map.delete peeraddr
peeraddr Map peeraddr (StrictMVar m (TxDecision txid tx))
txChannelMap }
      )
      PeerTxAPI m txid tx -> m a
io
  where
    registerPeer :: SharedTxState peeraddr txid tx
                 -> SharedTxState peeraddr txid tx
    registerPeer :: SharedTxState peeraddr txid tx -> SharedTxState peeraddr txid tx
registerPeer st :: SharedTxState peeraddr txid tx
st@SharedTxState { Map peeraddr (PeerTxState txid tx)
peerTxStates :: Map peeraddr (PeerTxState txid tx)
peerTxStates :: forall peeraddr txid tx.
SharedTxState peeraddr txid tx
-> Map peeraddr (PeerTxState txid tx)
peerTxStates } =
      SharedTxState peeraddr txid tx
st { peerTxStates =
             Map.insert
               peeraddr
               PeerTxState {
                 availableTxIds           = Map.empty,
                 requestedTxIdsInflight   = 0,
                 requestedTxsInflightSize = 0,
                 requestedTxsInflight     = Set.empty,
                 unacknowledgedTxIds      = StrictSeq.empty,
                 unknownTxs               = Set.empty,
                 score                    = 0,
                 scoreTs                  = Time 0,
                 downloadedTxs            = Map.empty,
                 toMempoolTxs             = Map.empty }
               peerTxStates
         }

    -- TODO: this function needs to be tested!
    -- Issue: https://github.com/IntersectMBO/ouroboros-network/issues/5151
    unregisterPeer :: SharedTxState peeraddr txid tx
                   -> SharedTxState peeraddr txid tx
    unregisterPeer :: SharedTxState peeraddr txid tx -> SharedTxState peeraddr txid tx
unregisterPeer st :: SharedTxState peeraddr txid tx
st@SharedTxState { Map peeraddr (PeerTxState txid tx)
peerTxStates :: forall peeraddr txid tx.
SharedTxState peeraddr txid tx
-> Map peeraddr (PeerTxState txid tx)
peerTxStates :: Map peeraddr (PeerTxState txid tx)
peerTxStates,
                                      Map txid (Maybe tx)
bufferedTxs :: Map txid (Maybe tx)
bufferedTxs :: forall peeraddr txid tx.
SharedTxState peeraddr txid tx -> Map txid (Maybe tx)
bufferedTxs,
                                      Map txid Int
referenceCounts :: Map txid Int
referenceCounts :: forall peeraddr txid tx.
SharedTxState peeraddr txid tx -> Map txid Int
referenceCounts,
                                      Map txid Int
inflightTxs :: Map txid Int
inflightTxs :: forall peeraddr txid tx.
SharedTxState peeraddr txid tx -> Map txid Int
inflightTxs,
                                      SizeInBytes
inflightTxsSize :: SizeInBytes
inflightTxsSize :: forall peeraddr txid tx.
SharedTxState peeraddr txid tx -> SizeInBytes
inflightTxsSize,
                                      Map txid Int
inSubmissionToMempoolTxs :: Map txid Int
inSubmissionToMempoolTxs :: forall peeraddr txid tx.
SharedTxState peeraddr txid tx -> Map txid Int
inSubmissionToMempoolTxs } =
        SharedTxState peeraddr txid tx
st { peerTxStates             = peerTxStates',
             bufferedTxs              = bufferedTxs',
             referenceCounts          = referenceCounts',
             inflightTxs              = inflightTxs',
             inflightTxsSize          = inflightTxsSize',
             inSubmissionToMempoolTxs = inSubmissionToMempoolTxs' }
      where
        (PeerTxState { StrictSeq txid
unacknowledgedTxIds :: forall txid tx. PeerTxState txid tx -> StrictSeq txid
unacknowledgedTxIds :: StrictSeq txid
unacknowledgedTxIds,
                       Set txid
requestedTxsInflight :: forall txid tx. PeerTxState txid tx -> Set txid
requestedTxsInflight :: Set txid
requestedTxsInflight,
                       SizeInBytes
requestedTxsInflightSize :: forall txid tx. PeerTxState txid tx -> SizeInBytes
requestedTxsInflightSize :: SizeInBytes
requestedTxsInflightSize,
                       Map txid tx
toMempoolTxs :: forall txid tx. PeerTxState txid tx -> Map txid tx
toMempoolTxs :: Map txid tx
toMempoolTxs }
          , Map peeraddr (PeerTxState txid tx)
peerTxStates')
          =
          (Maybe (PeerTxState txid tx)
 -> (PeerTxState txid tx, Maybe (PeerTxState txid tx)))
-> peeraddr
-> Map peeraddr (PeerTxState txid tx)
-> (PeerTxState txid tx, Map peeraddr (PeerTxState txid tx))
forall (f :: * -> *) k a.
(Functor f, Ord k) =>
(Maybe a -> f (Maybe a)) -> k -> Map k a -> f (Map k a)
Map.alterF
            (\case
              Maybe (PeerTxState txid tx)
Nothing -> [Char] -> (PeerTxState txid tx, Maybe (PeerTxState txid tx))
forall a. HasCallStack => [Char] -> a
error ([Char]
"TxSubmission.withPeer: invariant violation for peer " [Char] -> [Char] -> [Char]
forall a. [a] -> [a] -> [a]
++ peeraddr -> [Char]
forall a. Show a => a -> [Char]
show peeraddr
peeraddr)
              Just PeerTxState txid tx
a  -> (PeerTxState txid tx
a, Maybe (PeerTxState txid tx)
forall a. Maybe a
Nothing))
            peeraddr
peeraddr
            Map peeraddr (PeerTxState txid tx)
peerTxStates

        referenceCounts' :: Map txid Int
referenceCounts' =
          (Map txid Int -> txid -> Map txid Int)
-> Map txid Int -> StrictSeq txid -> Map txid Int
forall b a. (b -> a -> b) -> b -> StrictSeq a -> b
forall (t :: * -> *) b a.
Foldable t =>
(b -> a -> b) -> b -> t a -> b
Foldable.foldl'
            ((txid -> Map txid Int -> Map txid Int)
-> Map txid Int -> txid -> Map txid Int
forall a b c. (a -> b -> c) -> b -> a -> c
flip ((txid -> Map txid Int -> Map txid Int)
 -> Map txid Int -> txid -> Map txid Int)
-> (txid -> Map txid Int -> Map txid Int)
-> Map txid Int
-> txid
-> Map txid Int
forall a b. (a -> b) -> a -> b
$ (Int -> Maybe Int) -> txid -> Map txid Int -> Map txid Int
forall k a. Ord k => (a -> Maybe a) -> k -> Map k a -> Map k a
Map.update \Int
cnt ->
              if Int
cnt Int -> Int -> Bool
forall a. Ord a => a -> a -> Bool
> Int
1
              then Int -> Maybe Int
forall a. a -> Maybe a
Just (Int -> Maybe Int) -> Int -> Maybe Int
forall a b. (a -> b) -> a -> b
$! Int -> Int
forall a. Enum a => a -> a
pred Int
cnt
              else Maybe Int
forall a. Maybe a
Nothing
            )
          Map txid Int
referenceCounts
          StrictSeq txid
unacknowledgedTxIds

        liveSet :: Set txid
liveSet = Map txid Int -> Set txid
forall k a. Map k a -> Set k
Map.keysSet Map txid Int
referenceCounts'

        bufferedTxs' :: Map txid (Maybe tx)
bufferedTxs' = Map txid (Maybe tx)
bufferedTxs
                       Map txid (Maybe tx) -> Set txid -> Map txid (Maybe tx)
forall k a. Ord k => Map k a -> Set k -> Map k a
`Map.restrictKeys`
                       Set txid
liveSet

        inflightTxs' :: Map txid Int
inflightTxs' = (Map txid Int -> txid -> Map txid Int)
-> Map txid Int -> Set txid -> Map txid Int
forall b a. (b -> a -> b) -> b -> Set a -> b
forall (t :: * -> *) b a.
Foldable t =>
(b -> a -> b) -> b -> t a -> b
Foldable.foldl' Map txid Int -> txid -> Map txid Int
forall {k} {a}.
(Ord k, Ord a, Num a, Enum a) =>
Map k a -> k -> Map k a
purgeInflightTxs Map txid Int
inflightTxs Set txid
requestedTxsInflight
        inflightTxsSize' :: SizeInBytes
inflightTxsSize' = SizeInBytes
inflightTxsSize SizeInBytes -> SizeInBytes -> SizeInBytes
forall a. Num a => a -> a -> a
- SizeInBytes
requestedTxsInflightSize

        -- When we unregister a peer, we need to subtract all txs in the
        -- `toMempoolTxs`, as they will not be submitted to the mempool.
        inSubmissionToMempoolTxs' :: Map txid Int
inSubmissionToMempoolTxs' =
          (Map txid Int -> txid -> Map txid Int)
-> Map txid Int -> Set txid -> Map txid Int
forall b a. (b -> a -> b) -> b -> Set a -> b
forall (t :: * -> *) b a.
Foldable t =>
(b -> a -> b) -> b -> t a -> b
Foldable.foldl' ((txid -> Map txid Int -> Map txid Int)
-> Map txid Int -> txid -> Map txid Int
forall a b c. (a -> b -> c) -> b -> a -> c
flip ((txid -> Map txid Int -> Map txid Int)
 -> Map txid Int -> txid -> Map txid Int)
-> (txid -> Map txid Int -> Map txid Int)
-> Map txid Int
-> txid
-> Map txid Int
forall a b. (a -> b) -> a -> b
$ (Int -> Maybe Int) -> txid -> Map txid Int -> Map txid Int
forall k a. Ord k => (a -> Maybe a) -> k -> Map k a -> Map k a
Map.update \Int
cnt ->
               if Int
cnt Int -> Int -> Bool
forall a. Ord a => a -> a -> Bool
> Int
1
               then Int -> Maybe Int
forall a. a -> Maybe a
Just (Int -> Maybe Int) -> Int -> Maybe Int
forall a b. (a -> b) -> a -> b
$! Int -> Int
forall a. Enum a => a -> a
pred Int
cnt
               else Maybe Int
forall a. Maybe a
Nothing
            )
          Map txid Int
inSubmissionToMempoolTxs
          (Map txid tx -> Set txid
forall k a. Map k a -> Set k
Map.keysSet Map txid tx
toMempoolTxs)

        purgeInflightTxs :: Map k a -> k -> Map k a
purgeInflightTxs Map k a
m k
txid = (Maybe a -> Maybe a) -> k -> Map k a -> Map k a
forall k a.
Ord k =>
(Maybe a -> Maybe a) -> k -> Map k a -> Map k a
Map.alter Maybe a -> Maybe a
forall {a}. (Ord a, Num a, Enum a) => Maybe a -> Maybe a
fn k
txid Map k a
m
          where
            fn :: Maybe a -> Maybe a
fn (Just a
n) | a
n a -> a -> Bool
forall a. Ord a => a -> a -> Bool
> a
1 = a -> Maybe a
forall a. a -> Maybe a
Just (a -> Maybe a) -> a -> Maybe a
forall a b. (a -> b) -> a -> b
$! a -> a
forall a. Enum a => a -> a
pred a
n
            fn Maybe a
_                = Maybe a
forall a. Maybe a
Nothing

    --
    -- PeerTxAPI
    --

    submitTxToMempool :: Tracer m (TraceTxSubmissionInbound txid tx) -> txid -> tx -> m ()
    submitTxToMempool :: Tracer m (TraceTxSubmissionInbound txid tx) -> txid -> tx -> m ()
submitTxToMempool Tracer m (TraceTxSubmissionInbound txid tx)
txTracer txid
txid tx
tx =
        m () -> m () -> m () -> m ()
forall a b c. m a -> m b -> m c -> m c
forall (m :: * -> *) a b c.
MonadThrow m =>
m a -> m b -> m c -> m c
bracket_ (STM m () -> m ()
forall a. HasCallStack => STM m a -> m a
forall (m :: * -> *) a.
(MonadSTM m, HasCallStack) =>
STM m a -> m a
atomically (STM m () -> m ()) -> STM m () -> m ()
forall a b. (a -> b) -> a -> b
$ TSem m -> STM m ()
forall (m :: * -> *). MonadSTM m => TSem m -> STM m ()
waitTSem TSem m
mempoolSem)
                 (STM m () -> m ()
forall a. HasCallStack => STM m a -> m a
forall (m :: * -> *) a.
(MonadSTM m, HasCallStack) =>
STM m a -> m a
atomically (STM m () -> m ()) -> STM m () -> m ()
forall a b. (a -> b) -> a -> b
$ TSem m -> STM m ()
forall (m :: * -> *). MonadSTM m => TSem m -> STM m ()
signalTSem TSem m
mempoolSem)
          (m () -> m ()) -> m () -> m ()
forall a b. (a -> b) -> a -> b
$ do
            start <- m Time
forall (m :: * -> *). MonadMonotonicTime m => m Time
getMonotonicTime
            res <- addTx
            end <- getMonotonicTime
            atomically $ modifyTVar sharedStateVar (updateBufferedTx end res)
            let duration = Time
end Time -> Time -> DiffTime
`diffTime` Time
start
            case res of
              TxMempoolResult
TxAccepted -> Tracer m (TraceTxSubmissionInbound txid tx)
-> TraceTxSubmissionInbound txid tx -> m ()
forall (m :: * -> *) a. Tracer m a -> a -> m ()
traceWith Tracer m (TraceTxSubmissionInbound txid tx)
txTracer ([txid] -> DiffTime -> TraceTxSubmissionInbound txid tx
forall txid tx.
[txid] -> DiffTime -> TraceTxSubmissionInbound txid tx
TraceTxInboundAddedToMempool [txid
txid] DiffTime
duration)
              TxMempoolResult
TxRejected -> Tracer m (TraceTxSubmissionInbound txid tx)
-> TraceTxSubmissionInbound txid tx -> m ()
forall (m :: * -> *) a. Tracer m a -> a -> m ()
traceWith Tracer m (TraceTxSubmissionInbound txid tx)
txTracer ([txid] -> DiffTime -> TraceTxSubmissionInbound txid tx
forall txid tx.
[txid] -> DiffTime -> TraceTxSubmissionInbound txid tx
TraceTxInboundRejectedFromMempool [txid
txid] DiffTime
duration)

      where
        -- add the tx to the mempool
        addTx :: m TxMempoolResult
        addTx :: m TxMempoolResult
addTx = do
          mpSnapshot <- STM m (MempoolSnapshot txid tx idx)
-> m (MempoolSnapshot txid tx idx)
forall a. HasCallStack => STM m a -> m a
forall (m :: * -> *) a.
(MonadSTM m, HasCallStack) =>
STM m a -> m a
atomically STM m (MempoolSnapshot txid tx idx)
mempoolGetSnapshot

          -- Note that checking if the mempool contains a TX before
          -- spending several ms attempting to add it to the pool has
          -- been judged immoral.
          if mempoolHasTx mpSnapshot txid
             then do
               !now <- getMonotonicTime
               !s <- countRejectedTxs now 1
               traceWith txTracer $ TraceTxSubmissionProcessed ProcessedTxCount {
                    ptxcAccepted = 0
                  , ptxcRejected = 1
                  , ptxcScore    = s
                  }
               return TxRejected
             else do
               acceptedTxs <- mempoolAddTxs [tx]
               end <- getMonotonicTime
               if null acceptedTxs
                  then do
                      !s <- countRejectedTxs end 1
                      traceWith txTracer $ TraceTxSubmissionProcessed ProcessedTxCount {
                          ptxcAccepted = 0
                        , ptxcRejected = 1
                        , ptxcScore    = s
                        }
                      return TxRejected
                  else do
                      !s <- countRejectedTxs end 0
                      traceWith txTracer $ TraceTxSubmissionProcessed ProcessedTxCount {
                          ptxcAccepted = 1
                        , ptxcRejected = 0
                        , ptxcScore    = s
                        }
                      return TxAccepted

        updateBufferedTx :: Time
                         -> TxMempoolResult
                         -> SharedTxState peeraddr txid tx
                         -> SharedTxState peeraddr txid tx
        updateBufferedTx :: Time
-> TxMempoolResult
-> SharedTxState peeraddr txid tx
-> SharedTxState peeraddr txid tx
updateBufferedTx Time
_ TxMempoolResult
TxRejected st :: SharedTxState peeraddr txid tx
st@SharedTxState { Map peeraddr (PeerTxState txid tx)
peerTxStates :: forall peeraddr txid tx.
SharedTxState peeraddr txid tx
-> Map peeraddr (PeerTxState txid tx)
peerTxStates :: Map peeraddr (PeerTxState txid tx)
peerTxStates
                                                       , Map txid Int
inSubmissionToMempoolTxs :: forall peeraddr txid tx.
SharedTxState peeraddr txid tx -> Map txid Int
inSubmissionToMempoolTxs :: Map txid Int
inSubmissionToMempoolTxs } =
            SharedTxState peeraddr txid tx
st { peerTxStates = peerTxStates'
               , inSubmissionToMempoolTxs = inSubmissionToMempoolTxs' }
          where
            inSubmissionToMempoolTxs' :: Map txid Int
inSubmissionToMempoolTxs' =
              (Int -> Maybe Int) -> txid -> Map txid Int -> Map txid Int
forall k a. Ord k => (a -> Maybe a) -> k -> Map k a -> Map k a
Map.update (\case Int
1 -> Maybe Int
forall a. Maybe a
Nothing; Int
n -> Int -> Maybe Int
forall a. a -> Maybe a
Just (Int -> Maybe Int) -> Int -> Maybe Int
forall a b. (a -> b) -> a -> b
$! Int -> Int
forall a. Enum a => a -> a
pred Int
n)
                         txid
txid Map txid Int
inSubmissionToMempoolTxs

            peerTxStates' :: Map peeraddr (PeerTxState txid tx)
peerTxStates' = (PeerTxState txid tx -> Maybe (PeerTxState txid tx))
-> peeraddr
-> Map peeraddr (PeerTxState txid tx)
-> Map peeraddr (PeerTxState txid tx)
forall k a. Ord k => (a -> Maybe a) -> k -> Map k a -> Map k a
Map.update PeerTxState txid tx -> Maybe (PeerTxState txid tx)
forall {tx}. PeerTxState txid tx -> Maybe (PeerTxState txid tx)
fn peeraddr
peeraddr Map peeraddr (PeerTxState txid tx)
peerTxStates
              where
                fn :: PeerTxState txid tx -> Maybe (PeerTxState txid tx)
fn PeerTxState txid tx
ps = PeerTxState txid tx -> Maybe (PeerTxState txid tx)
forall a. a -> Maybe a
Just (PeerTxState txid tx -> Maybe (PeerTxState txid tx))
-> PeerTxState txid tx -> Maybe (PeerTxState txid tx)
forall a b. (a -> b) -> a -> b
$! PeerTxState txid tx
ps { toMempoolTxs = Map.delete txid (toMempoolTxs ps)}

        updateBufferedTx Time
now TxMempoolResult
TxAccepted
                         st :: SharedTxState peeraddr txid tx
st@SharedTxState { Map peeraddr (PeerTxState txid tx)
peerTxStates :: forall peeraddr txid tx.
SharedTxState peeraddr txid tx
-> Map peeraddr (PeerTxState txid tx)
peerTxStates :: Map peeraddr (PeerTxState txid tx)
peerTxStates
                                          , Map txid (Maybe tx)
bufferedTxs :: forall peeraddr txid tx.
SharedTxState peeraddr txid tx -> Map txid (Maybe tx)
bufferedTxs :: Map txid (Maybe tx)
bufferedTxs
                                          , Map txid Int
referenceCounts :: forall peeraddr txid tx.
SharedTxState peeraddr txid tx -> Map txid Int
referenceCounts :: Map txid Int
referenceCounts
                                          , Map Time [txid]
timedTxs :: Map Time [txid]
timedTxs :: forall peeraddr txid tx.
SharedTxState peeraddr txid tx -> Map Time [txid]
timedTxs
                                          , Map txid Int
inSubmissionToMempoolTxs :: forall peeraddr txid tx.
SharedTxState peeraddr txid tx -> Map txid Int
inSubmissionToMempoolTxs :: Map txid Int
inSubmissionToMempoolTxs } =
            SharedTxState peeraddr txid tx
st { peerTxStates = peerTxStates'
               , bufferedTxs = bufferedTxs'
               , timedTxs = timedTxs'
               , referenceCounts = referenceCounts'
               , inSubmissionToMempoolTxs = inSubmissionToMempoolTxs'
               }
          where
            inSubmissionToMempoolTxs' :: Map txid Int
inSubmissionToMempoolTxs' =
              (Int -> Maybe Int) -> txid -> Map txid Int -> Map txid Int
forall k a. Ord k => (a -> Maybe a) -> k -> Map k a -> Map k a
Map.update (\case Int
1 -> Maybe Int
forall a. Maybe a
Nothing; Int
n -> Int -> Maybe Int
forall a. a -> Maybe a
Just (Int -> Maybe Int) -> Int -> Maybe Int
forall a b. (a -> b) -> a -> b
$! Int -> Int
forall a. Enum a => a -> a
pred Int
n)
                         txid
txid Map txid Int
inSubmissionToMempoolTxs

            timedTxs' :: Map Time [txid]
timedTxs' = (Maybe [txid] -> Maybe [txid])
-> Time -> Map Time [txid] -> Map Time [txid]
forall k a.
Ord k =>
(Maybe a -> Maybe a) -> k -> Map k a -> Map k a
Map.alter Maybe [txid] -> Maybe [txid]
fn (DiffTime -> Time -> Time
addTime DiffTime
bufferedTxsMinLifetime Time
now) Map Time [txid]
timedTxs
              where
                fn :: Maybe [txid] -> Maybe [txid]
                fn :: Maybe [txid] -> Maybe [txid]
fn Maybe [txid]
Nothing      = [txid] -> Maybe [txid]
forall a. a -> Maybe a
Just [txid
txid]
                fn (Just [txid]
txids) = [txid] -> Maybe [txid]
forall a. a -> Maybe a
Just ([txid] -> Maybe [txid]) -> [txid] -> Maybe [txid]
forall a b. (a -> b) -> a -> b
$! (txid
txidtxid -> [txid] -> [txid]
forall a. a -> [a] -> [a]
:[txid]
txids)

            referenceCounts' :: Map txid Int
referenceCounts' = (Maybe Int -> Maybe Int) -> txid -> Map txid Int -> Map txid Int
forall k a.
Ord k =>
(Maybe a -> Maybe a) -> k -> Map k a -> Map k a
Map.alter Maybe Int -> Maybe Int
fn txid
txid Map txid Int
referenceCounts
              where
                fn :: Maybe Int -> Maybe Int
                fn :: Maybe Int -> Maybe Int
fn Maybe Int
Nothing  = Int -> Maybe Int
forall a. a -> Maybe a
Just Int
1
                fn (Just Int
n) = Int -> Maybe Int
forall a. a -> Maybe a
Just (Int -> Maybe Int) -> Int -> Maybe Int
forall a b. (a -> b) -> a -> b
$! Int -> Int
forall a. Enum a => a -> a
succ Int
n

            bufferedTxs' :: Map txid (Maybe tx)
bufferedTxs' =  txid -> Maybe tx -> Map txid (Maybe tx) -> Map txid (Maybe tx)
forall k a. Ord k => k -> a -> Map k a -> Map k a
Map.insert txid
txid (tx -> Maybe tx
forall a. a -> Maybe a
Just tx
tx) Map txid (Maybe tx)
bufferedTxs

            peerTxStates' :: Map peeraddr (PeerTxState txid tx)
peerTxStates' = (PeerTxState txid tx -> Maybe (PeerTxState txid tx))
-> peeraddr
-> Map peeraddr (PeerTxState txid tx)
-> Map peeraddr (PeerTxState txid tx)
forall k a. Ord k => (a -> Maybe a) -> k -> Map k a -> Map k a
Map.update PeerTxState txid tx -> Maybe (PeerTxState txid tx)
forall {tx}. PeerTxState txid tx -> Maybe (PeerTxState txid tx)
fn peeraddr
peeraddr Map peeraddr (PeerTxState txid tx)
peerTxStates
              where
                fn :: PeerTxState txid tx -> Maybe (PeerTxState txid tx)
fn PeerTxState txid tx
ps = PeerTxState txid tx -> Maybe (PeerTxState txid tx)
forall a. a -> Maybe a
Just (PeerTxState txid tx -> Maybe (PeerTxState txid tx))
-> PeerTxState txid tx -> Maybe (PeerTxState txid tx)
forall a b. (a -> b) -> a -> b
$! PeerTxState txid tx
ps { toMempoolTxs = Map.delete txid (toMempoolTxs ps)}

    handleReceivedTxIds :: NumTxIdsToReq
                        -> StrictSeq txid
                        -> Map txid SizeInBytes
                        -> m ()
    handleReceivedTxIds :: NumTxIdsToReq -> StrictSeq txid -> Map txid SizeInBytes -> m ()
handleReceivedTxIds NumTxIdsToReq
numTxIdsToReq StrictSeq txid
txidsSeq Map txid SizeInBytes
txidsMap =
      Tracer m (TraceTxLogic peeraddr txid tx)
-> SharedTxStateVar m peeraddr txid tx
-> STM m (MempoolSnapshot txid tx idx)
-> peeraddr
-> NumTxIdsToReq
-> StrictSeq txid
-> Map txid SizeInBytes
-> m ()
forall (m :: * -> *) peeraddr idx tx txid.
(MonadSTM m, Ord txid, Ord peeraddr) =>
Tracer m (TraceTxLogic peeraddr txid tx)
-> SharedTxStateVar m peeraddr txid tx
-> STM m (MempoolSnapshot txid tx idx)
-> peeraddr
-> NumTxIdsToReq
-> StrictSeq txid
-> Map txid SizeInBytes
-> m ()
receivedTxIds Tracer m (TraceTxLogic peeraddr txid tx)
tracer
                    SharedTxStateVar m peeraddr txid tx
sharedStateVar
                    STM m (MempoolSnapshot txid tx idx)
mempoolGetSnapshot
                    peeraddr
peeraddr
                    NumTxIdsToReq
numTxIdsToReq
                    StrictSeq txid
txidsSeq
                    Map txid SizeInBytes
txidsMap


    handleReceivedTxs :: Map txid SizeInBytes
                      -- ^ requested txids with their announced size
                      -> Map txid tx
                      -- ^ received txs
                      -> m (Maybe TxSubmissionProtocolError)
    handleReceivedTxs :: Map txid SizeInBytes
-> Map txid tx -> m (Maybe TxSubmissionProtocolError)
handleReceivedTxs Map txid SizeInBytes
txids Map txid tx
txs =
      Tracer m (TraceTxLogic peeraddr txid tx)
-> (tx -> SizeInBytes)
-> SharedTxStateVar m peeraddr txid tx
-> peeraddr
-> Map txid SizeInBytes
-> Map txid tx
-> m (Maybe TxSubmissionProtocolError)
forall (m :: * -> *) peeraddr tx txid.
(MonadSTM m, Ord txid, Ord peeraddr, Show txid, Typeable txid) =>
Tracer m (TraceTxLogic peeraddr txid tx)
-> (tx -> SizeInBytes)
-> SharedTxStateVar m peeraddr txid tx
-> peeraddr
-> Map txid SizeInBytes
-> Map txid tx
-> m (Maybe TxSubmissionProtocolError)
collectTxs Tracer m (TraceTxLogic peeraddr txid tx)
tracer tx -> SizeInBytes
txSize SharedTxStateVar m peeraddr txid tx
sharedStateVar peeraddr
peeraddr Map txid SizeInBytes
txids Map txid tx
txs

    -- Update `score` & `scoreTs` fields of `PeerTxState`, return the new
    -- updated `score`.
    --
    -- PRECONDITION: the `Double` argument is non-negative.
    countRejectedTxs :: Time
                     -> Double
                     -> m Double
    countRejectedTxs :: Time -> Double -> m Double
countRejectedTxs Time
_ Double
n | Double
n Double -> Double -> Bool
forall a. Ord a => a -> a -> Bool
< Double
0 =
      [Char] -> m Double
forall a. HasCallStack => [Char] -> a
error ([Char]
"TxSubmission.countRejectedTxs: invariant violation for peer " [Char] -> [Char] -> [Char]
forall a. [a] -> [a] -> [a]
++ peeraddr -> [Char]
forall a. Show a => a -> [Char]
show peeraddr
peeraddr)
    countRejectedTxs Time
now Double
n = STM m Double -> m Double
forall a. HasCallStack => STM m a -> m a
forall (m :: * -> *) a.
(MonadSTM m, HasCallStack) =>
STM m a -> m a
atomically (STM m Double -> m Double) -> STM m Double -> m Double
forall a b. (a -> b) -> a -> b
$ SharedTxStateVar m peeraddr txid tx
-> (SharedTxState peeraddr txid tx
    -> (Double, SharedTxState peeraddr txid tx))
-> STM m Double
forall (m :: * -> *) s a.
MonadSTM m =>
StrictTVar m s -> (s -> (a, s)) -> STM m a
stateTVar SharedTxStateVar m peeraddr txid tx
sharedStateVar ((SharedTxState peeraddr txid tx
  -> (Double, SharedTxState peeraddr txid tx))
 -> STM m Double)
-> (SharedTxState peeraddr txid tx
    -> (Double, SharedTxState peeraddr txid tx))
-> STM m Double
forall a b. (a -> b) -> a -> b
$ \SharedTxState peeraddr txid tx
st ->
        let (Double
result, Map peeraddr (PeerTxState txid tx)
peerTxStates') = (Maybe (PeerTxState txid tx)
 -> (Double, Maybe (PeerTxState txid tx)))
-> peeraddr
-> Map peeraddr (PeerTxState txid tx)
-> (Double, Map peeraddr (PeerTxState txid tx))
forall (f :: * -> *) k a.
(Functor f, Ord k) =>
(Maybe a -> f (Maybe a)) -> k -> Map k a -> f (Map k a)
Map.alterF Maybe (PeerTxState txid tx)
-> (Double, Maybe (PeerTxState txid tx))
fn peeraddr
peeraddr (SharedTxState peeraddr txid tx
-> Map peeraddr (PeerTxState txid tx)
forall peeraddr txid tx.
SharedTxState peeraddr txid tx
-> Map peeraddr (PeerTxState txid tx)
peerTxStates SharedTxState peeraddr txid tx
st)
        in (Double
result, SharedTxState peeraddr txid tx
st { peerTxStates = peerTxStates' })
      where
        fn :: Maybe (PeerTxState txid tx) -> (Double, Maybe (PeerTxState txid tx))
        fn :: Maybe (PeerTxState txid tx)
-> (Double, Maybe (PeerTxState txid tx))
fn Maybe (PeerTxState txid tx)
Nothing   = [Char] -> (Double, Maybe (PeerTxState txid tx))
forall a. HasCallStack => [Char] -> a
error ([Char]
"TxSubmission.withPeer: invariant violation for peer " [Char] -> [Char] -> [Char]
forall a. [a] -> [a] -> [a]
++ peeraddr -> [Char]
forall a. Show a => a -> [Char]
show peeraddr
peeraddr)
        fn (Just PeerTxState txid tx
ps) = (PeerTxState txid tx -> Double
forall txid tx. PeerTxState txid tx -> Double
score PeerTxState txid tx
ps', PeerTxState txid tx -> Maybe (PeerTxState txid tx)
forall a. a -> Maybe a
Just (PeerTxState txid tx -> Maybe (PeerTxState txid tx))
-> PeerTxState txid tx -> Maybe (PeerTxState txid tx)
forall a b. (a -> b) -> a -> b
$! PeerTxState txid tx
ps')
          where
            ps' :: PeerTxState txid tx
ps' = TxDecisionPolicy
-> Time -> Double -> PeerTxState txid tx -> PeerTxState txid tx
forall txid tx.
TxDecisionPolicy
-> Time -> Double -> PeerTxState txid tx -> PeerTxState txid tx
updateRejects TxDecisionPolicy
policy Time
now Double
n PeerTxState txid tx
ps


updateRejects :: TxDecisionPolicy
              -> Time
              -> Double
              -> PeerTxState txid tx
              -> PeerTxState txid tx
updateRejects :: forall txid tx.
TxDecisionPolicy
-> Time -> Double -> PeerTxState txid tx -> PeerTxState txid tx
updateRejects TxDecisionPolicy
_ Time
now Double
0 PeerTxState txid tx
pts | PeerTxState txid tx -> Double
forall txid tx. PeerTxState txid tx -> Double
score PeerTxState txid tx
pts Double -> Double -> Bool
forall a. Eq a => a -> a -> Bool
== Double
0 = PeerTxState txid tx
pts {scoreTs = now}
updateRejects TxDecisionPolicy { Double
scoreRate :: Double
scoreRate :: TxDecisionPolicy -> Double
scoreRate, Double
scoreMax :: Double
scoreMax :: TxDecisionPolicy -> Double
scoreMax } Time
now Double
n
              pts :: PeerTxState txid tx
pts@PeerTxState { Double
score :: forall txid tx. PeerTxState txid tx -> Double
score :: Double
score, Time
scoreTs :: forall txid tx. PeerTxState txid tx -> Time
scoreTs :: Time
scoreTs } =
    let duration :: DiffTime
duration = Time -> Time -> DiffTime
diffTime Time
now Time
scoreTs
        !drain :: Double
drain   = DiffTime -> Double
forall a b. (Real a, Fractional b) => a -> b
realToFrac DiffTime
duration Double -> Double -> Double
forall a. Num a => a -> a -> a
* Double
scoreRate
        !drained :: Double
drained = Double -> Double -> Double
forall a. Ord a => a -> a -> a
max Double
0 (Double -> Double) -> Double -> Double
forall a b. (a -> b) -> a -> b
$ Double
score Double -> Double -> Double
forall a. Num a => a -> a -> a
- Double
drain in
    PeerTxState txid tx
pts { score   = min scoreMax $ drained + n
        , scoreTs = now
        }


drainRejectionThread
    :: forall m peeraddr txid tx.
       ( MonadDelay m
       , MonadSTM m
       , MonadThread m
       , Ord txid
       )
    => Tracer m (TraceTxLogic peeraddr txid tx)
    -> TxDecisionPolicy
    -> SharedTxStateVar m peeraddr txid tx
    -> m Void
drainRejectionThread :: forall (m :: * -> *) peeraddr txid tx.
(MonadDelay m, MonadSTM m, MonadThread m, Ord txid) =>
Tracer m (TraceTxLogic peeraddr txid tx)
-> TxDecisionPolicy
-> SharedTxStateVar m peeraddr txid tx
-> m Void
drainRejectionThread Tracer m (TraceTxLogic peeraddr txid tx)
tracer TxDecisionPolicy
policy SharedTxStateVar m peeraddr txid tx
sharedStateVar = do
    [Char] -> m ()
forall (m :: * -> *). MonadThread m => [Char] -> m ()
labelThisThread [Char]
"tx-rejection-drain"
    now <- m Time
forall (m :: * -> *). MonadMonotonicTime m => m Time
getMonotonicTime
    go $ addTime drainInterval now
  where
    drainInterval :: DiffTime
    drainInterval :: DiffTime
drainInterval = DiffTime
7

    go :: Time -> m Void
    go :: Time -> m Void
go !Time
nextDrain = do
      DiffTime -> m ()
forall (m :: * -> *). MonadDelay m => DiffTime -> m ()
threadDelay DiffTime
1

      !now <- m Time
forall (m :: * -> *). MonadMonotonicTime m => m Time
getMonotonicTime
      st'' <- atomically $ do
        st <- readTVar sharedStateVar
        let ptss = if Time
now Time -> Time -> Bool
forall a. Ord a => a -> a -> Bool
> Time
nextDrain then (PeerTxState txid tx -> PeerTxState txid tx)
-> Map peeraddr (PeerTxState txid tx)
-> Map peeraddr (PeerTxState txid tx)
forall a b k. (a -> b) -> Map k a -> Map k b
Map.map (TxDecisionPolicy
-> Time -> Double -> PeerTxState txid tx -> PeerTxState txid tx
forall txid tx.
TxDecisionPolicy
-> Time -> Double -> PeerTxState txid tx -> PeerTxState txid tx
updateRejects TxDecisionPolicy
policy Time
now Double
0) (SharedTxState peeraddr txid tx
-> Map peeraddr (PeerTxState txid tx)
forall peeraddr txid tx.
SharedTxState peeraddr txid tx
-> Map peeraddr (PeerTxState txid tx)
peerTxStates SharedTxState peeraddr txid tx
st)
                                      else SharedTxState peeraddr txid tx
-> Map peeraddr (PeerTxState txid tx)
forall peeraddr txid tx.
SharedTxState peeraddr txid tx
-> Map peeraddr (PeerTxState txid tx)
peerTxStates SharedTxState peeraddr txid tx
st
            st' = Time
-> SharedTxState peeraddr txid tx -> SharedTxState peeraddr txid tx
forall peeraddr tx txid.
Ord txid =>
Time
-> SharedTxState peeraddr txid tx -> SharedTxState peeraddr txid tx
tickTimedTxs Time
now SharedTxState peeraddr txid tx
st
                    { peerTxStates = ptss }
        writeTVar sharedStateVar st'
        return st'
      traceWith tracer (TraceSharedTxState "drainRejectionThread" st'')

      if now > nextDrain
         then go $ addTime drainInterval now
         else go nextDrain


decisionLogicThread
    :: forall m peeraddr txid tx.
       ( MonadDelay m
       , MonadMVar  m
       , MonadSTM   m
       , MonadMask  m
       , MonadFork  m
       , Ord peeraddr
       , Ord txid
       , Hashable peeraddr
       )
    => Tracer m (TraceTxLogic peeraddr txid tx)
    -> Tracer m TxSubmissionCounters
    -> TxDecisionPolicy
    -> TxChannelsVar m peeraddr txid tx
    -> SharedTxStateVar m peeraddr txid tx
    -> m Void
decisionLogicThread :: forall (m :: * -> *) peeraddr txid tx.
(MonadDelay m, MonadMVar m, MonadSTM m, MonadMask m, MonadFork m,
 Ord peeraddr, Ord txid, Hashable peeraddr) =>
Tracer m (TraceTxLogic peeraddr txid tx)
-> Tracer m TxSubmissionCounters
-> TxDecisionPolicy
-> TxChannelsVar m peeraddr txid tx
-> SharedTxStateVar m peeraddr txid tx
-> m Void
decisionLogicThread Tracer m (TraceTxLogic peeraddr txid tx)
tracer Tracer m TxSubmissionCounters
counterTracer TxDecisionPolicy
policy TxChannelsVar m peeraddr txid tx
txChannelsVar SharedTxStateVar m peeraddr txid tx
sharedStateVar = do
    [Char] -> m ()
forall (m :: * -> *). MonadThread m => [Char] -> m ()
labelThisThread [Char]
"tx-decision"
    m Void
go
  where
    go :: m Void
    go :: m Void
go = do
      -- We rate limit the decision making process, it could overwhelm the CPU
      -- if there are too many inbound connections.
      DiffTime -> m ()
forall (m :: * -> *). MonadDelay m => DiffTime -> m ()
threadDelay DiffTime
_DECISION_LOOP_DELAY

      (decisions, st) <- STM
  m
  (Map peeraddr (TxDecision txid tx), SharedTxState peeraddr txid tx)
-> m (Map peeraddr (TxDecision txid tx),
      SharedTxState peeraddr txid tx)
forall a. HasCallStack => STM m a -> m a
forall (m :: * -> *) a.
(MonadSTM m, HasCallStack) =>
STM m a -> m a
atomically do
        sharedTxState <- SharedTxStateVar m peeraddr txid tx
-> STM m (SharedTxState peeraddr txid tx)
forall (m :: * -> *) a. MonadSTM m => StrictTVar m a -> STM m a
readTVar SharedTxStateVar m peeraddr txid tx
sharedStateVar
        let activePeers = TxDecisionPolicy
-> SharedTxState peeraddr txid tx
-> Map peeraddr (PeerTxState txid tx)
forall peeraddr txid tx.
(Ord txid, HasCallStack) =>
TxDecisionPolicy
-> SharedTxState peeraddr txid tx
-> Map peeraddr (PeerTxState txid tx)
filterActivePeers TxDecisionPolicy
policy SharedTxState peeraddr txid tx
sharedTxState

        -- block until at least one peer is active
        check (not (Map.null activePeers))

        let (sharedState, decisions) = makeDecisions policy sharedTxState activePeers
        writeTVar sharedStateVar sharedState
        return (decisions, sharedState)
      traceWith tracer (TraceSharedTxState "decisionLogicThread" st)
      traceWith tracer (TraceTxDecisions decisions)
      TxChannels { txChannelMap } <- readMVar txChannelsVar
      traverse_
        (\(StrictMVar m (TxDecision txid tx)
mvar, TxDecision txid tx
d) -> StrictMVar m (TxDecision txid tx)
-> TxDecision txid tx
-> (TxDecision txid tx -> m (TxDecision txid tx))
-> m ()
forall a. StrictMVar m a -> a -> (a -> m a) -> m ()
modifyMVarWithDefault_ StrictMVar m (TxDecision txid tx)
mvar TxDecision txid tx
d (\TxDecision txid tx
d' -> TxDecision txid tx -> m (TxDecision txid tx)
forall a. a -> m a
forall (f :: * -> *) a. Applicative f => a -> f a
pure (TxDecision txid tx
d' TxDecision txid tx -> TxDecision txid tx -> TxDecision txid tx
forall a. Semigroup a => a -> a -> a
<> TxDecision txid tx
d)))
        (Map.intersectionWith (,)
          txChannelMap
          decisions)
      traceWith counterTracer (mkTxSubmissionCounters st)
      go

    -- Variant of modifyMVar_ that puts a default value if the MVar is empty.
    modifyMVarWithDefault_ :: StrictMVar m a -> a -> (a -> m a) -> m ()
    modifyMVarWithDefault_ :: forall a. StrictMVar m a -> a -> (a -> m a) -> m ()
modifyMVarWithDefault_ StrictMVar m a
m a
d a -> m a
io =
      ((forall a. m a -> m a) -> m ()) -> m ()
forall b. ((forall a. m a -> m a) -> m b) -> m b
forall (m :: * -> *) b.
MonadMask m =>
((forall a. m a -> m a) -> m b) -> m b
mask (((forall a. m a -> m a) -> m ()) -> m ())
-> ((forall a. m a -> m a) -> m ()) -> m ()
forall a b. (a -> b) -> a -> b
$ \forall a. m a -> m a
restore -> do
        mbA  <- StrictMVar m a -> m (Maybe a)
forall (m :: * -> *) a.
MonadMVar m =>
StrictMVar m a -> m (Maybe a)
tryTakeMVar StrictMVar m a
m
        case mbA of
          Just a
a -> do
            a' <- m a -> m a
forall a. m a -> m a
restore (a -> m a
io a
a) m a -> m () -> m a
forall a b. m a -> m b -> m a
forall (m :: * -> *) a b. MonadCatch m => m a -> m b -> m a
`onException` StrictMVar m a -> a -> m ()
forall (m :: * -> *) a. MonadMVar m => StrictMVar m a -> a -> m ()
putMVar StrictMVar m a
m a
a
            putMVar m a'
          Maybe a
Nothing -> StrictMVar m a -> a -> m ()
forall (m :: * -> *) a. MonadMVar m => StrictMVar m a -> a -> m ()
putMVar StrictMVar m a
m a
d


-- | Run `decisionLogicThread` and `drainRejectionThread`.
--
decisionLogicThreads
    :: forall m peeraddr txid tx.
       ( MonadDelay m
       , MonadMVar  m
       , MonadMask  m
       , MonadAsync m
       , MonadFork  m
       , Ord peeraddr
       , Ord txid
       , Hashable peeraddr
       )
    => Tracer m (TraceTxLogic peeraddr txid tx)
    -> Tracer m TxSubmissionCounters
    -> TxDecisionPolicy
    -> TxChannelsVar m peeraddr txid tx
    -> SharedTxStateVar m peeraddr txid tx
    -> m Void
decisionLogicThreads :: forall (m :: * -> *) peeraddr txid tx.
(MonadDelay m, MonadMVar m, MonadMask m, MonadAsync m, MonadFork m,
 Ord peeraddr, Ord txid, Hashable peeraddr) =>
Tracer m (TraceTxLogic peeraddr txid tx)
-> Tracer m TxSubmissionCounters
-> TxDecisionPolicy
-> TxChannelsVar m peeraddr txid tx
-> SharedTxStateVar m peeraddr txid tx
-> m Void
decisionLogicThreads Tracer m (TraceTxLogic peeraddr txid tx)
tracer Tracer m TxSubmissionCounters
counterTracer TxDecisionPolicy
policy TxChannelsVar m peeraddr txid tx
txChannelsVar SharedTxStateVar m peeraddr txid tx
sharedStateVar =
  (Void -> Void -> Void) -> (Void, Void) -> Void
forall a b c. (a -> b -> c) -> (a, b) -> c
uncurry Void -> Void -> Void
forall a. Semigroup a => a -> a -> a
(<>) ((Void, Void) -> Void) -> m (Void, Void) -> m Void
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$>
    Tracer m (TraceTxLogic peeraddr txid tx)
-> TxDecisionPolicy
-> SharedTxStateVar m peeraddr txid tx
-> m Void
forall (m :: * -> *) peeraddr txid tx.
(MonadDelay m, MonadSTM m, MonadThread m, Ord txid) =>
Tracer m (TraceTxLogic peeraddr txid tx)
-> TxDecisionPolicy
-> SharedTxStateVar m peeraddr txid tx
-> m Void
drainRejectionThread Tracer m (TraceTxLogic peeraddr txid tx)
tracer TxDecisionPolicy
policy SharedTxStateVar m peeraddr txid tx
sharedStateVar
    m Void -> m Void -> m (Void, Void)
forall a b. m a -> m b -> m (a, b)
forall (m :: * -> *) a b. MonadAsync m => m a -> m b -> m (a, b)
`concurrently`
    Tracer m (TraceTxLogic peeraddr txid tx)
-> Tracer m TxSubmissionCounters
-> TxDecisionPolicy
-> TxChannelsVar m peeraddr txid tx
-> SharedTxStateVar m peeraddr txid tx
-> m Void
forall (m :: * -> *) peeraddr txid tx.
(MonadDelay m, MonadMVar m, MonadSTM m, MonadMask m, MonadFork m,
 Ord peeraddr, Ord txid, Hashable peeraddr) =>
Tracer m (TraceTxLogic peeraddr txid tx)
-> Tracer m TxSubmissionCounters
-> TxDecisionPolicy
-> TxChannelsVar m peeraddr txid tx
-> SharedTxStateVar m peeraddr txid tx
-> m Void
decisionLogicThread Tracer m (TraceTxLogic peeraddr txid tx)
tracer Tracer m TxSubmissionCounters
counterTracer TxDecisionPolicy
policy TxChannelsVar m peeraddr txid tx
txChannelsVar SharedTxStateVar m peeraddr txid tx
sharedStateVar


-- `5ms` delay
_DECISION_LOOP_DELAY :: DiffTime
_DECISION_LOOP_DELAY :: DiffTime
_DECISION_LOOP_DELAY = DiffTime
0.005