{-# LANGUAGE BangPatterns        #-}
{-# LANGUAGE GADTs               #-}
{-# LANGUAGE NamedFieldPuns      #-}
{-# LANGUAGE RecordWildCards     #-}
{-# LANGUAGE ScopedTypeVariables #-}

module Ouroboros.Network.TxSubmission.Outbound
  ( txSubmissionOutbound
  , TraceTxSubmissionOutbound (..)
  , TxSubmissionProtocolError (..)
  ) where

import Data.Foldable (find)
import Data.List.NonEmpty qualified as NonEmpty
import Data.Maybe (catMaybes, isNothing, mapMaybe)
import Data.Sequence.Strict (StrictSeq)
import Data.Sequence.Strict qualified as Seq

import Control.Exception (assert)
import Control.Monad (unless, when)
import Control.Monad.Class.MonadSTM
import Control.Monad.Class.MonadThrow
import Control.Tracer (Tracer, traceWith)

import Ouroboros.Network.ControlMessage (ControlMessage, ControlMessageSTM,
           timeoutWithControlMessage)
import Ouroboros.Network.NodeToNode.Version (NodeToNodeVersion)
import Ouroboros.Network.Protocol.TxSubmission2.Client
import Ouroboros.Network.Protocol.TxSubmission2.Type
import Ouroboros.Network.TxSubmission.Mempool.Reader (MempoolSnapshot (..),
           TxSubmissionMempoolReader (..))


data TraceTxSubmissionOutbound txid tx
  = TraceTxSubmissionOutboundRecvMsgRequestTxs
      [txid]
      -- ^ The IDs of the transactions requested.
  | TraceTxSubmissionOutboundSendMsgReplyTxs
      [tx]
      -- ^ The transactions to be sent in the response.
  | TraceControlMessage ControlMessage
  deriving Int -> TraceTxSubmissionOutbound txid tx -> ShowS
[TraceTxSubmissionOutbound txid tx] -> ShowS
TraceTxSubmissionOutbound txid tx -> String
(Int -> TraceTxSubmissionOutbound txid tx -> ShowS)
-> (TraceTxSubmissionOutbound txid tx -> String)
-> ([TraceTxSubmissionOutbound txid tx] -> ShowS)
-> Show (TraceTxSubmissionOutbound txid tx)
forall a.
(Int -> a -> ShowS) -> (a -> String) -> ([a] -> ShowS) -> Show a
forall txid tx.
(Show txid, Show tx) =>
Int -> TraceTxSubmissionOutbound txid tx -> ShowS
forall txid tx.
(Show txid, Show tx) =>
[TraceTxSubmissionOutbound txid tx] -> ShowS
forall txid tx.
(Show txid, Show tx) =>
TraceTxSubmissionOutbound txid tx -> String
$cshowsPrec :: forall txid tx.
(Show txid, Show tx) =>
Int -> TraceTxSubmissionOutbound txid tx -> ShowS
showsPrec :: Int -> TraceTxSubmissionOutbound txid tx -> ShowS
$cshow :: forall txid tx.
(Show txid, Show tx) =>
TraceTxSubmissionOutbound txid tx -> String
show :: TraceTxSubmissionOutbound txid tx -> String
$cshowList :: forall txid tx.
(Show txid, Show tx) =>
[TraceTxSubmissionOutbound txid tx] -> ShowS
showList :: [TraceTxSubmissionOutbound txid tx] -> ShowS
Show

data TxSubmissionProtocolError =
       ProtocolErrorAckedTooManyTxids
     | ProtocolErrorRequestedNothing
     | ProtocolErrorRequestedTooManyTxids NumTxIdsToReq NumTxIdsToAck
     | ProtocolErrorRequestBlocking
     | ProtocolErrorRequestNonBlocking
     | ProtocolErrorRequestedUnavailableTx
  deriving Int -> TxSubmissionProtocolError -> ShowS
[TxSubmissionProtocolError] -> ShowS
TxSubmissionProtocolError -> String
(Int -> TxSubmissionProtocolError -> ShowS)
-> (TxSubmissionProtocolError -> String)
-> ([TxSubmissionProtocolError] -> ShowS)
-> Show TxSubmissionProtocolError
forall a.
(Int -> a -> ShowS) -> (a -> String) -> ([a] -> ShowS) -> Show a
$cshowsPrec :: Int -> TxSubmissionProtocolError -> ShowS
showsPrec :: Int -> TxSubmissionProtocolError -> ShowS
$cshow :: TxSubmissionProtocolError -> String
show :: TxSubmissionProtocolError -> String
$cshowList :: [TxSubmissionProtocolError] -> ShowS
showList :: [TxSubmissionProtocolError] -> ShowS
Show

instance Exception TxSubmissionProtocolError where
  displayException :: TxSubmissionProtocolError -> String
displayException TxSubmissionProtocolError
ProtocolErrorAckedTooManyTxids =
      String
"The peer tried to acknowledged more txids than are available to do so."

  displayException (ProtocolErrorRequestedTooManyTxids NumTxIdsToReq
reqNo NumTxIdsToAck
maxUnacked) =
      String
"The peer requested " String -> ShowS
forall a. [a] -> [a] -> [a]
++ NumTxIdsToReq -> String
forall a. Show a => a -> String
show NumTxIdsToReq
reqNo String -> ShowS
forall a. [a] -> [a] -> [a]
++ String
" txids which would put the "
   String -> ShowS
forall a. [a] -> [a] -> [a]
++ String
"total in flight over the limit of " String -> ShowS
forall a. [a] -> [a] -> [a]
++ NumTxIdsToAck -> String
forall a. Show a => a -> String
show NumTxIdsToAck
maxUnacked

  displayException TxSubmissionProtocolError
ProtocolErrorRequestedNothing =
      String
"The peer requested zero txids."

  displayException TxSubmissionProtocolError
ProtocolErrorRequestBlocking =
      String
"The peer made a blocking request for more txids when there are still "
   String -> ShowS
forall a. [a] -> [a] -> [a]
++ String
"unacknowledged txids. It should have used a non-blocking request."

  displayException TxSubmissionProtocolError
ProtocolErrorRequestNonBlocking =
      String
"The peer made a non-blocking request for more txids when there are "
   String -> ShowS
forall a. [a] -> [a] -> [a]
++ String
"no unacknowledged txids. It should have used a blocking request."

  displayException TxSubmissionProtocolError
ProtocolErrorRequestedUnavailableTx =
      String
"The peer requested a transaction which is not available, either "
   String -> ShowS
forall a. [a] -> [a] -> [a]
++ String
"because it was never available or because it was previously requested."


txSubmissionOutbound
  :: forall txid tx idx m.
     (Ord txid, Ord idx, MonadSTM m, MonadThrow m)
  => Tracer m (TraceTxSubmissionOutbound txid tx)
  -> NumTxIdsToAck  -- ^ Maximum number of unacknowledged txids allowed
  -> TxSubmissionMempoolReader txid tx idx m
  -> NodeToNodeVersion
  -> ControlMessageSTM m
  -> TxSubmissionClient txid tx m ()
txSubmissionOutbound :: forall txid tx idx (m :: * -> *).
(Ord txid, Ord idx, MonadSTM m, MonadThrow m) =>
Tracer m (TraceTxSubmissionOutbound txid tx)
-> NumTxIdsToAck
-> TxSubmissionMempoolReader txid tx idx m
-> NodeToNodeVersion
-> ControlMessageSTM m
-> TxSubmissionClient txid tx m ()
txSubmissionOutbound Tracer m (TraceTxSubmissionOutbound txid tx)
tracer NumTxIdsToAck
maxUnacked TxSubmissionMempoolReader{idx
STM m (MempoolSnapshot txid tx idx)
mempoolGetSnapshot :: STM m (MempoolSnapshot txid tx idx)
mempoolZeroIdx :: idx
mempoolGetSnapshot :: forall txid tx idx (m :: * -> *).
TxSubmissionMempoolReader txid tx idx m
-> STM m (MempoolSnapshot txid tx idx)
mempoolZeroIdx :: forall txid tx idx (m :: * -> *).
TxSubmissionMempoolReader txid tx idx m -> idx
..} NodeToNodeVersion
_version ControlMessageSTM m
controlMessageSTM =
    m (ClientStIdle txid tx m ()) -> TxSubmissionClient txid tx m ()
forall txid tx (m :: * -> *) a.
m (ClientStIdle txid tx m a) -> TxSubmissionClient txid tx m a
TxSubmissionClient (ClientStIdle txid tx m () -> m (ClientStIdle txid tx m ())
forall a. a -> m a
forall (f :: * -> *) a. Applicative f => a -> f a
pure (StrictSeq (txid, idx) -> idx -> ClientStIdle txid tx m ()
client StrictSeq (txid, idx)
forall a. StrictSeq a
Seq.empty idx
mempoolZeroIdx))
  where
    client :: StrictSeq (txid, idx) -> idx -> ClientStIdle txid tx m ()
    client :: StrictSeq (txid, idx) -> idx -> ClientStIdle txid tx m ()
client !StrictSeq (txid, idx)
unackedSeq !idx
lastIdx =
        ClientStIdle { SingBlockingStyle blocking
-> NumTxIdsToAck
-> NumTxIdsToReq
-> m (ClientStTxIds blocking txid tx m ())
forall (blocking :: StBlockingStyle).
SingBlockingStyle blocking
-> NumTxIdsToAck
-> NumTxIdsToReq
-> m (ClientStTxIds blocking txid tx m ())
recvMsgRequestTxIds :: forall (blocking :: StBlockingStyle).
SingBlockingStyle blocking
-> NumTxIdsToAck
-> NumTxIdsToReq
-> m (ClientStTxIds blocking txid tx m ())
recvMsgRequestTxIds :: forall (blocking :: StBlockingStyle).
SingBlockingStyle blocking
-> NumTxIdsToAck
-> NumTxIdsToReq
-> m (ClientStTxIds blocking txid tx m ())
recvMsgRequestTxIds, [txid] -> m (ClientStTxs txid tx m ())
recvMsgRequestTxs :: [txid] -> m (ClientStTxs txid tx m ())
recvMsgRequestTxs :: [txid] -> m (ClientStTxs txid tx m ())
recvMsgRequestTxs }
      where
        recvMsgRequestTxIds :: forall blocking.
                               SingBlockingStyle blocking
                            -> NumTxIdsToAck
                            -> NumTxIdsToReq
                            -> m (ClientStTxIds blocking txid tx m ())
        recvMsgRequestTxIds :: forall (blocking :: StBlockingStyle).
SingBlockingStyle blocking
-> NumTxIdsToAck
-> NumTxIdsToReq
-> m (ClientStTxIds blocking txid tx m ())
recvMsgRequestTxIds SingBlockingStyle blocking
blocking NumTxIdsToAck
ackNo NumTxIdsToReq
reqNo = do

          Bool -> m () -> m ()
forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
when (NumTxIdsToAck -> Word16
getNumTxIdsToAck NumTxIdsToAck
ackNo Word16 -> Word16 -> Bool
forall a. Ord a => a -> a -> Bool
> Int -> Word16
forall a b. (Integral a, Num b) => a -> b
fromIntegral (StrictSeq (txid, idx) -> Int
forall a. StrictSeq a -> Int
Seq.length StrictSeq (txid, idx)
unackedSeq)) (m () -> m ()) -> m () -> m ()
forall a b. (a -> b) -> a -> b
$
            TxSubmissionProtocolError -> m ()
forall e a. Exception e => e -> m a
forall (m :: * -> *) e a. (MonadThrow m, Exception e) => e -> m a
throwIO TxSubmissionProtocolError
ProtocolErrorAckedTooManyTxids

          Bool -> m () -> m ()
forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
when (  Int -> Word16
forall a b. (Integral a, Num b) => a -> b
fromIntegral (StrictSeq (txid, idx) -> Int
forall a. StrictSeq a -> Int
Seq.length StrictSeq (txid, idx)
unackedSeq)
                Word16 -> Word16 -> Word16
forall a. Num a => a -> a -> a
- NumTxIdsToAck -> Word16
getNumTxIdsToAck NumTxIdsToAck
ackNo
                Word16 -> Word16 -> Word16
forall a. Num a => a -> a -> a
+ NumTxIdsToReq -> Word16
getNumTxIdsToReq NumTxIdsToReq
reqNo
                Word16 -> Word16 -> Bool
forall a. Ord a => a -> a -> Bool
> NumTxIdsToAck -> Word16
getNumTxIdsToAck NumTxIdsToAck
maxUnacked) (m () -> m ()) -> m () -> m ()
forall a b. (a -> b) -> a -> b
$
            TxSubmissionProtocolError -> m ()
forall e a. Exception e => e -> m a
forall (m :: * -> *) e a. (MonadThrow m, Exception e) => e -> m a
throwIO (NumTxIdsToReq -> NumTxIdsToAck -> TxSubmissionProtocolError
ProtocolErrorRequestedTooManyTxids NumTxIdsToReq
reqNo NumTxIdsToAck
maxUnacked)

          -- Update our tracking state to remove the number of txids that the
          -- peer has acknowledged.
          let !unackedSeq' :: StrictSeq (txid, idx)
unackedSeq' = Int -> StrictSeq (txid, idx) -> StrictSeq (txid, idx)
forall a. Int -> StrictSeq a -> StrictSeq a
Seq.drop (NumTxIdsToAck -> Int
forall a b. (Integral a, Num b) => a -> b
fromIntegral NumTxIdsToAck
ackNo) StrictSeq (txid, idx)
unackedSeq

          -- Update our tracking state with any extra txs available.
          let update :: [(txid, idx, SizeInBytes)]
-> ([(txid, SizeInBytes)], ClientStIdle txid tx m ())
update [(txid, idx, SizeInBytes)]
txs =
                -- These txs should all be fresh
                Bool
-> ([(txid, SizeInBytes)], ClientStIdle txid tx m ())
-> ([(txid, SizeInBytes)], ClientStIdle txid tx m ())
forall a. HasCallStack => Bool -> a -> a
assert (((txid, idx, SizeInBytes) -> Bool)
-> [(txid, idx, SizeInBytes)] -> Bool
forall (t :: * -> *) a. Foldable t => (a -> Bool) -> t a -> Bool
all (\(txid
_, idx
idx, SizeInBytes
_) -> idx
idx idx -> idx -> Bool
forall a. Ord a => a -> a -> Bool
> idx
lastIdx) [(txid, idx, SizeInBytes)]
txs) (([(txid, SizeInBytes)], ClientStIdle txid tx m ())
 -> ([(txid, SizeInBytes)], ClientStIdle txid tx m ()))
-> ([(txid, SizeInBytes)], ClientStIdle txid tx m ())
-> ([(txid, SizeInBytes)], ClientStIdle txid tx m ())
forall a b. (a -> b) -> a -> b
$
                  let !unackedSeq'' :: StrictSeq (txid, idx)
unackedSeq'' = StrictSeq (txid, idx)
unackedSeq' StrictSeq (txid, idx)
-> StrictSeq (txid, idx) -> StrictSeq (txid, idx)
forall a. Semigroup a => a -> a -> a
<> [(txid, idx)] -> StrictSeq (txid, idx)
forall a. [a] -> StrictSeq a
Seq.fromList
                                        [ (txid
txid, idx
idx) | (txid
txid, idx
idx, SizeInBytes
_) <- [(txid, idx, SizeInBytes)]
txs ]
                      !lastIdx' :: idx
lastIdx'
                        | [(txid, idx, SizeInBytes)] -> Bool
forall a. [a] -> Bool
forall (t :: * -> *) a. Foldable t => t a -> Bool
null [(txid, idx, SizeInBytes)]
txs  = idx
lastIdx
                        | Bool
otherwise = idx
idx where (txid
_, idx
idx, SizeInBytes
_) = [(txid, idx, SizeInBytes)] -> (txid, idx, SizeInBytes)
forall a. HasCallStack => [a] -> a
last [(txid, idx, SizeInBytes)]
txs
                      txs'         :: [(txid, SizeInBytes)]
                      txs' :: [(txid, SizeInBytes)]
txs'          = [ (txid
txid, SizeInBytes
size) | (txid
txid, idx
_, SizeInBytes
size) <- [(txid, idx, SizeInBytes)]
txs ]
                      client' :: ClientStIdle txid tx m ()
client'       = StrictSeq (txid, idx) -> idx -> ClientStIdle txid tx m ()
client StrictSeq (txid, idx)
unackedSeq'' idx
lastIdx'
                  in  ([(txid, SizeInBytes)]
txs', ClientStIdle txid tx m ()
client')

          -- Grab info about any new txs after the last tx idx we've seen,
          -- up to the number that the peer has requested.
          case SingBlockingStyle blocking
blocking of
            SingBlockingStyle blocking
SingBlocking -> do
              Bool -> m () -> m ()
forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
when (NumTxIdsToReq
reqNo NumTxIdsToReq -> NumTxIdsToReq -> Bool
forall a. Eq a => a -> a -> Bool
== NumTxIdsToReq
0) (m () -> m ()) -> m () -> m ()
forall a b. (a -> b) -> a -> b
$
                TxSubmissionProtocolError -> m ()
forall e a. Exception e => e -> m a
forall (m :: * -> *) e a. (MonadThrow m, Exception e) => e -> m a
throwIO TxSubmissionProtocolError
ProtocolErrorRequestedNothing
              Bool -> m () -> m ()
forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
unless (StrictSeq (txid, idx) -> Bool
forall a. StrictSeq a -> Bool
Seq.null StrictSeq (txid, idx)
unackedSeq') (m () -> m ()) -> m () -> m ()
forall a b. (a -> b) -> a -> b
$
                TxSubmissionProtocolError -> m ()
forall e a. Exception e => e -> m a
forall (m :: * -> *) e a. (MonadThrow m, Exception e) => e -> m a
throwIO TxSubmissionProtocolError
ProtocolErrorRequestBlocking

              mbtxs <- ControlMessageSTM m
-> STM m [(txid, idx, SizeInBytes)]
-> m (Maybe [(txid, idx, SizeInBytes)])
forall (m :: * -> *) a.
MonadSTM m =>
ControlMessageSTM m -> STM m a -> m (Maybe a)
timeoutWithControlMessage ControlMessageSTM m
controlMessageSTM (STM m [(txid, idx, SizeInBytes)]
 -> m (Maybe [(txid, idx, SizeInBytes)]))
-> STM m [(txid, idx, SizeInBytes)]
-> m (Maybe [(txid, idx, SizeInBytes)])
forall a b. (a -> b) -> a -> b
$
                do
                  MempoolSnapshot{mempoolTxIdsAfter} <- STM m (MempoolSnapshot txid tx idx)
mempoolGetSnapshot
                  let txs = idx -> [(txid, idx, SizeInBytes)]
mempoolTxIdsAfter idx
lastIdx
                  check (not $ null txs)
                  pure (take (fromIntegral reqNo) txs)

              case mbtxs of
                Maybe [(txid, idx, SizeInBytes)]
Nothing -> ClientStTxIds blocking txid tx m ()
-> m (ClientStTxIds blocking txid tx m ())
forall a. a -> m a
forall (f :: * -> *) a. Applicative f => a -> f a
pure (() -> ClientStTxIds 'StBlocking txid tx m ()
forall a txid tx (m :: * -> *).
a -> ClientStTxIds 'StBlocking txid tx m a
SendMsgDone ())
                Just [(txid, idx, SizeInBytes)]
txs ->
                  let !([(txid, SizeInBytes)]
txs', ClientStIdle txid tx m ()
client') = [(txid, idx, SizeInBytes)]
-> ([(txid, SizeInBytes)], ClientStIdle txid tx m ())
update [(txid, idx, SizeInBytes)]
txs
                      txs'' :: NonEmpty (txid, SizeInBytes)
txs'' = case [(txid, SizeInBytes)] -> Maybe (NonEmpty (txid, SizeInBytes))
forall a. [a] -> Maybe (NonEmpty a)
NonEmpty.nonEmpty [(txid, SizeInBytes)]
txs' of
                        Just NonEmpty (txid, SizeInBytes)
x -> NonEmpty (txid, SizeInBytes)
x
                        -- Assert txs is non-empty: we blocked until txs was non-null,
                        -- and we know reqNo > 0, hence `take reqNo txs` is non-null.
                        Maybe (NonEmpty (txid, SizeInBytes))
Nothing -> String -> NonEmpty (txid, SizeInBytes)
forall a. HasCallStack => String -> a
error String
"txSubmissionOutbound: empty transaction's list"
                  in  ClientStTxIds blocking txid tx m ()
-> m (ClientStTxIds blocking txid tx m ())
forall a. a -> m a
forall (f :: * -> *) a. Applicative f => a -> f a
pure (BlockingReplyList blocking (txid, SizeInBytes)
-> ClientStIdle txid tx m () -> ClientStTxIds blocking txid tx m ()
forall (blocking :: StBlockingStyle) txid tx (m :: * -> *) a.
BlockingReplyList blocking (txid, SizeInBytes)
-> ClientStIdle txid tx m a -> ClientStTxIds blocking txid tx m a
SendMsgReplyTxIds (NonEmpty (txid, SizeInBytes)
-> BlockingReplyList 'StBlocking (txid, SizeInBytes)
forall a. NonEmpty a -> BlockingReplyList 'StBlocking a
BlockingReply NonEmpty (txid, SizeInBytes)
txs'') ClientStIdle txid tx m ()
client')

            SingBlockingStyle blocking
SingNonBlocking -> do
              Bool -> m () -> m ()
forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
when (NumTxIdsToReq
reqNo NumTxIdsToReq -> NumTxIdsToReq -> Bool
forall a. Eq a => a -> a -> Bool
== NumTxIdsToReq
0 Bool -> Bool -> Bool
&& NumTxIdsToAck
ackNo NumTxIdsToAck -> NumTxIdsToAck -> Bool
forall a. Eq a => a -> a -> Bool
== NumTxIdsToAck
0) (m () -> m ()) -> m () -> m ()
forall a b. (a -> b) -> a -> b
$
                TxSubmissionProtocolError -> m ()
forall e a. Exception e => e -> m a
forall (m :: * -> *) e a. (MonadThrow m, Exception e) => e -> m a
throwIO TxSubmissionProtocolError
ProtocolErrorRequestedNothing
              Bool -> m () -> m ()
forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
when (StrictSeq (txid, idx) -> Bool
forall a. StrictSeq a -> Bool
Seq.null StrictSeq (txid, idx)
unackedSeq') (m () -> m ()) -> m () -> m ()
forall a b. (a -> b) -> a -> b
$
                TxSubmissionProtocolError -> m ()
forall e a. Exception e => e -> m a
forall (m :: * -> *) e a. (MonadThrow m, Exception e) => e -> m a
throwIO TxSubmissionProtocolError
ProtocolErrorRequestNonBlocking

              txs <- STM m [(txid, idx, SizeInBytes)] -> m [(txid, idx, SizeInBytes)]
forall a. HasCallStack => STM m a -> m a
forall (m :: * -> *) a.
(MonadSTM m, HasCallStack) =>
STM m a -> m a
atomically (STM m [(txid, idx, SizeInBytes)] -> m [(txid, idx, SizeInBytes)])
-> STM m [(txid, idx, SizeInBytes)] -> m [(txid, idx, SizeInBytes)]
forall a b. (a -> b) -> a -> b
$ do
                MempoolSnapshot{mempoolTxIdsAfter} <- STM m (MempoolSnapshot txid tx idx)
mempoolGetSnapshot
                let txs = idx -> [(txid, idx, SizeInBytes)]
mempoolTxIdsAfter idx
lastIdx
                return (take (fromIntegral reqNo) txs)

              let !(txs', client') = update txs
              pure (SendMsgReplyTxIds (NonBlockingReply txs') client')

        recvMsgRequestTxs :: [txid]
                          -> m (ClientStTxs txid tx m ())
        recvMsgRequestTxs :: [txid] -> m (ClientStTxs txid tx m ())
recvMsgRequestTxs [txid]
txids = do
          -- Trace the IDs of the transactions requested.
          Tracer m (TraceTxSubmissionOutbound txid tx)
-> TraceTxSubmissionOutbound txid tx -> m ()
forall (m :: * -> *) a. Tracer m a -> a -> m ()
traceWith Tracer m (TraceTxSubmissionOutbound txid tx)
tracer ([txid] -> TraceTxSubmissionOutbound txid tx
forall txid tx. [txid] -> TraceTxSubmissionOutbound txid tx
TraceTxSubmissionOutboundRecvMsgRequestTxs [txid]
txids)

          MempoolSnapshot{mempoolLookupTx} <- STM m (MempoolSnapshot txid tx idx)
-> m (MempoolSnapshot txid tx idx)
forall a. HasCallStack => STM m a -> m a
forall (m :: * -> *) a.
(MonadSTM m, HasCallStack) =>
STM m a -> m a
atomically STM m (MempoolSnapshot txid tx idx)
mempoolGetSnapshot

          -- The window size is expected to be small (currently 10) so the find is acceptable.
          let txidxs  = [ ((txid, idx) -> Bool) -> StrictSeq (txid, idx) -> Maybe (txid, idx)
forall (t :: * -> *) a. Foldable t => (a -> Bool) -> t a -> Maybe a
find (\(txid
t,idx
_) -> txid
t txid -> txid -> Bool
forall a. Eq a => a -> a -> Bool
== txid
txid) StrictSeq (txid, idx)
unackedSeq | txid
txid <- [txid]
txids ]
              txidxs' = ((txid, idx) -> idx) -> [(txid, idx)] -> [idx]
forall a b. (a -> b) -> [a] -> [b]
map (txid, idx) -> idx
forall a b. (a, b) -> b
snd ([(txid, idx)] -> [idx]) -> [(txid, idx)] -> [idx]
forall a b. (a -> b) -> a -> b
$ [Maybe (txid, idx)] -> [(txid, idx)]
forall a. [Maybe a] -> [a]
catMaybes [Maybe (txid, idx)]
txidxs

          when (any isNothing txidxs) $
            throwIO ProtocolErrorRequestedUnavailableTx

          -- The 'mempoolLookupTx' will return nothing if the transaction is no
          -- longer in the mempool. This is good. Neither the sending nor
          -- receiving side wants to forward txs that are no longer of interest.
          let txs          = (idx -> Maybe tx) -> [idx] -> [tx]
forall a b. (a -> Maybe b) -> [a] -> [b]
mapMaybe idx -> Maybe tx
mempoolLookupTx [idx]
txidxs'
              client'      = StrictSeq (txid, idx) -> idx -> ClientStIdle txid tx m ()
client StrictSeq (txid, idx)
unackedSeq idx
lastIdx

          -- Trace the transactions to be sent in the response.
          traceWith tracer (TraceTxSubmissionOutboundSendMsgReplyTxs txs)

          return $ SendMsgReplyTxs txs client'