{-# LANGUAGE BangPatterns        #-}
{-# LANGUAGE DataKinds           #-}
{-# LANGUAGE GADTs               #-}
{-# LANGUAGE KindSignatures      #-}
{-# LANGUAGE NamedFieldPuns      #-}
{-# LANGUAGE ScopedTypeVariables #-}

module Ouroboros.Network.Protocol.ObjectDiffusion.Examples
  ( testObjectDiffusionOutbound
  , TraceObjectDiffusionTestImplem (..)
  , testObjectDiffusionInbound
  , InboundState (..)
  , initialInboundState
  , WithCaughtUpDetection (..)
  ) where


import Network.TypedProtocol.Core

import Ouroboros.Network.Protocol.ObjectDiffusion.Inbound
import Ouroboros.Network.Protocol.ObjectDiffusion.Outbound
import Ouroboros.Network.Protocol.ObjectDiffusion.Type (BlockingReplyList (..),
           NumObjectIdsAck (..), NumObjectIdsReq (..), SingBlockingStyle (..))

import Control.Exception (assert)
import Control.Monad (when)
import Control.Tracer (Tracer, traceWith)
import Data.Foldable qualified as Foldable
import Data.List.NonEmpty (NonEmpty (..))
import Data.List.NonEmpty qualified as NonEmpty
import Data.Map.Strict (Map)
import Data.Map.Strict qualified as Map
import Data.Sequence.Strict (StrictSeq)
import Data.Sequence.Strict qualified as Seq
import Data.Set (Set)
import Data.Set qualified as Set
import Data.Word (Word16)

-- | This helper typeclass allows the inbound and outbound tests implementation
-- to finish the protocol gracefully when all the desired objects have been sent.
--
-- | Normally, the outbound side should always respond with a non-empty list of
-- object IDs to a blocking request, but instead it can respond with the
-- 'caughtUpSentinel' (still inhabiting 'NonEmpty objectId') to indicate that
-- there are no more objects to send, and the inbound side can use 'ifCaughtUp'
-- to detect this and terminate the protocol gracefully.
--
-- | This suggests that the concrete type used for 'objectId' has some special
-- values that are not actually valid object IDs, but are used to signal this
-- condition. This is a bit hacky, but it allows us to keep the example
-- implementations simple and focused on the pipelining aspect, without having
-- to introduce additional protocol messages or state to handle termination.
class Eq objectId => WithCaughtUpDetection objectId where
  -- | This is a special value that the outbound implementation can use to
  -- signal to the inbound implementation that there are no more objects to send.
  -- This ought to be a value that uses non-normal object IDs, but still
  -- inhabits 'NonEmpty objectId'.
  caughtUpSentinel :: NonEmpty objectId

  -- | This is a helper function used in the inbound implementation to terminate
  -- the protocol gracefully when all objects that the outbound peer wanted to
  -- send have actually been sent.
  ifCaughtUp
    :: InboundStIdle 'Z objectId object m a
    -> (NonEmpty objectId -> InboundStIdle 'Z objectId object m a)
    -> NonEmpty objectId
    -> InboundStIdle 'Z objectId object m a
  ifCaughtUp InboundStIdle 'Z objectId object m a
fCaughtUp NonEmpty objectId -> InboundStIdle 'Z objectId object m a
fElse NonEmpty objectId
objectIds =
    if NonEmpty objectId
objectIds NonEmpty objectId -> NonEmpty objectId -> Bool
forall a. Eq a => a -> a -> Bool
== NonEmpty objectId
forall objectId.
WithCaughtUpDetection objectId =>
NonEmpty objectId
caughtUpSentinel
       then InboundStIdle 'Z objectId object m a
fCaughtUp
       else NonEmpty objectId -> InboundStIdle 'Z objectId object m a
fElse NonEmpty objectId
objectIds

--
-- Outbound implementation
--

data TraceObjectDiffusionTestImplem objectId object =
    EventRecvMsgRequestObjectIds
      (StrictSeq objectId)
      (Map objectId object)
      [object]
      NumObjectIdsAck
      NumObjectIdsReq
  | EventRecvMsgRequestObjects
      (StrictSeq objectId)
      (Map objectId object)
      [object]
      [objectId]
  deriving Int -> TraceObjectDiffusionTestImplem objectId object -> ShowS
[TraceObjectDiffusionTestImplem objectId object] -> ShowS
TraceObjectDiffusionTestImplem objectId object -> String
(Int -> TraceObjectDiffusionTestImplem objectId object -> ShowS)
-> (TraceObjectDiffusionTestImplem objectId object -> String)
-> ([TraceObjectDiffusionTestImplem objectId object] -> ShowS)
-> Show (TraceObjectDiffusionTestImplem objectId object)
forall a.
(Int -> a -> ShowS) -> (a -> String) -> ([a] -> ShowS) -> Show a
forall objectId object.
(Show objectId, Show object) =>
Int -> TraceObjectDiffusionTestImplem objectId object -> ShowS
forall objectId object.
(Show objectId, Show object) =>
[TraceObjectDiffusionTestImplem objectId object] -> ShowS
forall objectId object.
(Show objectId, Show object) =>
TraceObjectDiffusionTestImplem objectId object -> String
$cshowsPrec :: forall objectId object.
(Show objectId, Show object) =>
Int -> TraceObjectDiffusionTestImplem objectId object -> ShowS
showsPrec :: Int -> TraceObjectDiffusionTestImplem objectId object -> ShowS
$cshow :: forall objectId object.
(Show objectId, Show object) =>
TraceObjectDiffusionTestImplem objectId object -> String
show :: TraceObjectDiffusionTestImplem objectId object -> String
$cshowList :: forall objectId object.
(Show objectId, Show object) =>
[TraceObjectDiffusionTestImplem objectId object] -> ShowS
showList :: [TraceObjectDiffusionTestImplem objectId object] -> ShowS
Show


testObjectDiffusionOutbound
  :: forall objectId object m.
     (Ord objectId, Show objectId, Monad m, WithCaughtUpDetection objectId)
  => Tracer m (TraceObjectDiffusionTestImplem objectId object)
  -> (object -> objectId)
  -> Word16  -- ^ Maximum number of unacknowledged object IDs allowed
  -> [object]
  -> ObjectDiffusionOutbound objectId object m ()
testObjectDiffusionOutbound :: forall objectId object (m :: * -> *).
(Ord objectId, Show objectId, Monad m,
 WithCaughtUpDetection objectId) =>
Tracer m (TraceObjectDiffusionTestImplem objectId object)
-> (object -> objectId)
-> Word16
-> [object]
-> ObjectDiffusionOutbound objectId object m ()
testObjectDiffusionOutbound Tracer m (TraceObjectDiffusionTestImplem objectId object)
tracer object -> objectId
objectId Word16
maxUnacked =
  m (OutboundStIdle objectId object m ())
-> ObjectDiffusionOutbound objectId object m ()
forall objectId object (m :: * -> *) a.
m (OutboundStIdle objectId object m a)
-> ObjectDiffusionOutbound objectId object m a
ObjectDiffusionOutbound (m (OutboundStIdle objectId object m ())
 -> ObjectDiffusionOutbound objectId object m ())
-> ([object] -> m (OutboundStIdle objectId object m ()))
-> [object]
-> ObjectDiffusionOutbound objectId object m ()
forall b c a. (b -> c) -> (a -> b) -> a -> c
. OutboundStIdle objectId object m ()
-> m (OutboundStIdle objectId object m ())
forall a. a -> m a
forall (f :: * -> *) a. Applicative f => a -> f a
pure (OutboundStIdle objectId object m ()
 -> m (OutboundStIdle objectId object m ()))
-> ([object] -> OutboundStIdle objectId object m ())
-> [object]
-> m (OutboundStIdle objectId object m ())
forall b c a. (b -> c) -> (a -> b) -> a -> c
. StrictSeq objectId
-> Map objectId object
-> [object]
-> OutboundStIdle objectId object m ()
outboundIdle StrictSeq objectId
forall a. StrictSeq a
Seq.empty Map objectId object
forall k a. Map k a
Map.empty
  where
    outboundIdle :: StrictSeq objectId
           -> Map objectId object
           -> [object]
           -> OutboundStIdle objectId object m ()
    outboundIdle :: StrictSeq objectId
-> Map objectId object
-> [object]
-> OutboundStIdle objectId object m ()
outboundIdle !StrictSeq objectId
unackedSeq !Map objectId object
unackedMap [object]
remainingObjects =
        Bool
-> OutboundStIdle objectId object m ()
-> OutboundStIdle objectId object m ()
forall a. HasCallStack => Bool -> a -> a
assert Bool
invariant
        OutboundStIdle {
          SingBlockingStyle blocking
-> NumObjectIdsAck
-> NumObjectIdsReq
-> m (OutboundStObjectIds blocking objectId object m ())
forall (blocking :: StBlockingStyle).
SingBlockingStyle blocking
-> NumObjectIdsAck
-> NumObjectIdsReq
-> m (OutboundStObjectIds blocking objectId object m ())
recvMsgRequestObjectIds :: forall (blocking :: StBlockingStyle).
SingBlockingStyle blocking
-> NumObjectIdsAck
-> NumObjectIdsReq
-> m (OutboundStObjectIds blocking objectId object m ())
recvMsgRequestObjectIds :: forall (blocking :: StBlockingStyle).
SingBlockingStyle blocking
-> NumObjectIdsAck
-> NumObjectIdsReq
-> m (OutboundStObjectIds blocking objectId object m ())
recvMsgRequestObjectIds,
          [objectId] -> m (OutboundStObjects objectId object m ())
recvMsgRequestObjects :: [objectId] -> m (OutboundStObjects objectId object m ())
recvMsgRequestObjects :: [objectId] -> m (OutboundStObjects objectId object m ())
recvMsgRequestObjects,
          m ()
recvMsgDone :: m ()
recvMsgDone :: m ()
recvMsgDone
        }
        where
          invariant :: Bool
invariant =
            (object -> () -> Bool)
-> Map objectId object -> Map objectId () -> Bool
forall k a b.
Ord k =>
(a -> b -> Bool) -> Map k a -> Map k b -> Bool
Map.isSubmapOfBy
              (\object
_ ()
_ -> Bool
True)
              Map objectId object
unackedMap
              ([(objectId, ())] -> Map objectId ()
forall k a. Ord k => [(k, a)] -> Map k a
Map.fromList [ (objectId
x, ()) | objectId
x <- StrictSeq objectId -> [objectId]
forall a. StrictSeq a -> [a]
forall (t :: * -> *) a. Foldable t => t a -> [a]
Foldable.toList StrictSeq objectId
unackedSeq ])

          recvMsgRequestObjectIds :: forall blocking.
                                     SingBlockingStyle blocking
                                  -> NumObjectIdsAck
                                  -> NumObjectIdsReq
                                  -> m (OutboundStObjectIds blocking objectId object m ())
          recvMsgRequestObjectIds :: forall (blocking :: StBlockingStyle).
SingBlockingStyle blocking
-> NumObjectIdsAck
-> NumObjectIdsReq
-> m (OutboundStObjectIds blocking objectId object m ())
recvMsgRequestObjectIds SingBlockingStyle blocking
blocking NumObjectIdsAck
ackNo NumObjectIdsReq
reqNo = do
            Tracer m (TraceObjectDiffusionTestImplem objectId object)
-> TraceObjectDiffusionTestImplem objectId object -> m ()
forall (m :: * -> *) a. Tracer m a -> a -> m ()
traceWith Tracer m (TraceObjectDiffusionTestImplem objectId object)
tracer (TraceObjectDiffusionTestImplem objectId object -> m ())
-> TraceObjectDiffusionTestImplem objectId object -> m ()
forall a b. (a -> b) -> a -> b
$
              StrictSeq objectId
-> Map objectId object
-> [object]
-> NumObjectIdsAck
-> NumObjectIdsReq
-> TraceObjectDiffusionTestImplem objectId object
forall objectId object.
StrictSeq objectId
-> Map objectId object
-> [object]
-> NumObjectIdsAck
-> NumObjectIdsReq
-> TraceObjectDiffusionTestImplem objectId object
EventRecvMsgRequestObjectIds
                StrictSeq objectId
unackedSeq Map objectId object
unackedMap [object]
remainingObjects NumObjectIdsAck
ackNo NumObjectIdsReq
reqNo

            Bool -> m () -> m ()
forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
when (NumObjectIdsAck
ackNo NumObjectIdsAck -> NumObjectIdsAck -> Bool
forall a. Ord a => a -> a -> Bool
> Int -> NumObjectIdsAck
forall a b. (Integral a, Num b) => a -> b
fromIntegral (StrictSeq objectId -> Int
forall a. StrictSeq a -> Int
Seq.length StrictSeq objectId
unackedSeq)) (m () -> m ()) -> m () -> m ()
forall a b. (a -> b) -> a -> b
$
              String -> m ()
forall a. HasCallStack => String -> a
error (String -> m ()) -> String -> m ()
forall a b. (a -> b) -> a -> b
$ String
"testObjectDiffusionOutbound.recvMsgRequestObjectIds: "
                   String -> ShowS
forall a. Semigroup a => a -> a -> a
<> String
"peer acknowledged more object IDs than possible"

            Bool -> m () -> m ()
forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
when (  Int -> Word16
forall a b. (Integral a, Num b) => a -> b
fromIntegral (StrictSeq objectId -> Int
forall a. StrictSeq a -> Int
Seq.length StrictSeq objectId
unackedSeq)
                  Word16 -> Word16 -> Word16
forall a. Num a => a -> a -> a
- NumObjectIdsAck -> Word16
getNumObjectIdsAck NumObjectIdsAck
ackNo
                  Word16 -> Word16 -> Word16
forall a. Num a => a -> a -> a
+ NumObjectIdsReq -> Word16
getNumObjectIdsReq NumObjectIdsReq
reqNo
                  Word16 -> Word16 -> Bool
forall a. Ord a => a -> a -> Bool
> Word16
maxUnacked) (m () -> m ()) -> m () -> m ()
forall a b. (a -> b) -> a -> b
$
              String -> m ()
forall a. HasCallStack => String -> a
error (String -> m ()) -> String -> m ()
forall a b. (a -> b) -> a -> b
$ String
"testObjectDiffusionOutbound.recvMsgRequestObjectIds: "
                   String -> ShowS
forall a. Semigroup a => a -> a -> a
<> String
"peer requested more object IDs than permitted"

            let unackedSeq' :: StrictSeq objectId
unackedSeq' = Int -> StrictSeq objectId -> StrictSeq objectId
forall a. Int -> StrictSeq a -> StrictSeq a
Seq.drop (NumObjectIdsAck -> Int
forall a b. (Integral a, Num b) => a -> b
fromIntegral NumObjectIdsAck
ackNo) StrictSeq objectId
unackedSeq
                unackedMap' :: Map objectId object
unackedMap' = (Map objectId object -> objectId -> Map objectId object)
-> Map objectId object -> StrictSeq objectId -> Map objectId object
forall b a. (b -> a -> b) -> b -> StrictSeq a -> b
forall (t :: * -> *) b a.
Foldable t =>
(b -> a -> b) -> b -> t a -> b
Foldable.foldl' ((objectId -> Map objectId object -> Map objectId object)
-> Map objectId object -> objectId -> Map objectId object
forall a b c. (a -> b -> c) -> b -> a -> c
flip objectId -> Map objectId object -> Map objectId object
forall k a. Ord k => k -> Map k a -> Map k a
Map.delete) Map objectId object
unackedMap
                                (Int -> StrictSeq objectId -> StrictSeq objectId
forall a. Int -> StrictSeq a -> StrictSeq a
Seq.take (NumObjectIdsAck -> Int
forall a b. (Integral a, Num b) => a -> b
fromIntegral NumObjectIdsAck
ackNo) StrictSeq objectId
unackedSeq)

            case SingBlockingStyle blocking
blocking of
              SingBlockingStyle blocking
SingBlocking | Bool -> Bool
not (StrictSeq objectId -> Bool
forall a. StrictSeq a -> Bool
Seq.null StrictSeq objectId
unackedSeq')
                -> String -> m ()
forall a. HasCallStack => String -> a
error (String -> m ()) -> String -> m ()
forall a b. (a -> b) -> a -> b
$ String
"testObjectDiffusionOutbound.recvMsgRequestObjectIds: "
                        String -> ShowS
forall a. Semigroup a => a -> a -> a
<> String
"peer made a blocking request for more object IDs when "
                        String -> ShowS
forall a. Semigroup a => a -> a -> a
<> String
"there are still unacknowledged object IDs."
              SingBlockingStyle blocking
_ -> () -> m ()
forall a. a -> m a
forall (m :: * -> *) a. Monad m => a -> m a
return ()

            -- This example is eager, it always provides as many as asked for,
            -- up to the number remaining available.
            let unackedExtra :: [object]
unackedExtra      = Int -> [object] -> [object]
forall a. Int -> [a] -> [a]
take (NumObjectIdsReq -> Int
forall a b. (Integral a, Num b) => a -> b
fromIntegral NumObjectIdsReq
reqNo) [object]
remainingObjects
                unackedSeq'' :: StrictSeq objectId
unackedSeq''      = StrictSeq objectId
unackedSeq'
                                 StrictSeq objectId -> StrictSeq objectId -> StrictSeq objectId
forall a. Semigroup a => a -> a -> a
<> [objectId] -> StrictSeq objectId
forall a. [a] -> StrictSeq a
Seq.fromList ((object -> objectId) -> [object] -> [objectId]
forall a b. (a -> b) -> [a] -> [b]
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
fmap object -> objectId
objectId [object]
unackedExtra)
                unackedMap'' :: Map objectId object
unackedMap''      = Map objectId object
unackedMap'
                                 Map objectId object -> Map objectId object -> Map objectId object
forall a. Semigroup a => a -> a -> a
<> [(objectId, object)] -> Map objectId object
forall k a. Ord k => [(k, a)] -> Map k a
Map.fromList [ (object -> objectId
objectId object
obj, object
obj)
                                                 | object
obj <- [object]
unackedExtra ]
                remainingObjects' :: [object]
remainingObjects' = Int -> [object] -> [object]
forall a. Int -> [a] -> [a]
drop (NumObjectIdsReq -> Int
forall a b. (Integral a, Num b) => a -> b
fromIntegral NumObjectIdsReq
reqNo) [object]
remainingObjects

            OutboundStObjectIds blocking objectId object m ()
-> m (OutboundStObjectIds blocking objectId object m ())
forall a. a -> m a
forall (m :: * -> *) a. Monad m => a -> m a
return (OutboundStObjectIds blocking objectId object m ()
 -> m (OutboundStObjectIds blocking objectId object m ()))
-> OutboundStObjectIds blocking objectId object m ()
-> m (OutboundStObjectIds blocking objectId object m ())
forall a b. (a -> b) -> a -> b
$! case (SingBlockingStyle blocking
blocking, [object]
unackedExtra) of
              (SingBlockingStyle blocking
SingBlocking, []) ->
                -- | In the production-ready implementation that lives in
                -- `ouroboros-consensus`, we would block on waiting new objects
                -- from the ObjectPool here.
                -- But in this test implementation, we use 'caughtUpSentinel'
                -- to signal that there are no more objects to send, so the
                -- inbound side knows it is caught-up and can terminate the
                -- protocol gracefully.
                BlockingReplyList blocking objectId
-> OutboundStIdle objectId object m ()
-> OutboundStObjectIds blocking objectId object m ()
forall (blocking :: StBlockingStyle) objectId object (m :: * -> *)
       a.
BlockingReplyList blocking objectId
-> OutboundStIdle objectId object m a
-> OutboundStObjectIds blocking objectId object m a
SendMsgReplyObjectIds
                  (NonEmpty objectId -> BlockingReplyList 'StBlocking objectId
forall a. NonEmpty a -> BlockingReplyList 'StBlocking a
BlockingReply NonEmpty objectId
forall objectId.
WithCaughtUpDetection objectId =>
NonEmpty objectId
caughtUpSentinel)
                  (StrictSeq objectId
-> Map objectId object
-> [object]
-> OutboundStIdle objectId object m ()
outboundIdle StrictSeq objectId
unackedSeq'' Map objectId object
unackedMap'' [object]
remainingObjects')

              (SingBlockingStyle blocking
SingBlocking, object
obj : [object]
objs) ->
                BlockingReplyList blocking objectId
-> OutboundStIdle objectId object m ()
-> OutboundStObjectIds blocking objectId object m ()
forall (blocking :: StBlockingStyle) objectId object (m :: * -> *)
       a.
BlockingReplyList blocking objectId
-> OutboundStIdle objectId object m a
-> OutboundStObjectIds blocking objectId object m a
SendMsgReplyObjectIds
                  (NonEmpty objectId -> BlockingReplyList 'StBlocking objectId
forall a. NonEmpty a -> BlockingReplyList 'StBlocking a
BlockingReply ((object -> objectId) -> NonEmpty object -> NonEmpty objectId
forall a b. (a -> b) -> NonEmpty a -> NonEmpty b
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
fmap object -> objectId
objectId (object
obj object -> [object] -> NonEmpty object
forall a. a -> [a] -> NonEmpty a
:| [object]
objs)))
                  (StrictSeq objectId
-> Map objectId object
-> [object]
-> OutboundStIdle objectId object m ()
outboundIdle StrictSeq objectId
unackedSeq'' Map objectId object
unackedMap'' [object]
remainingObjects')

              (SingBlockingStyle blocking
SingNonBlocking, [object]
objs) ->
                BlockingReplyList blocking objectId
-> OutboundStIdle objectId object m ()
-> OutboundStObjectIds blocking objectId object m ()
forall (blocking :: StBlockingStyle) objectId object (m :: * -> *)
       a.
BlockingReplyList blocking objectId
-> OutboundStIdle objectId object m a
-> OutboundStObjectIds blocking objectId object m a
SendMsgReplyObjectIds
                  ([objectId] -> BlockingReplyList 'StNonBlocking objectId
forall a. [a] -> BlockingReplyList 'StNonBlocking a
NonBlockingReply ((object -> objectId) -> [object] -> [objectId]
forall a b. (a -> b) -> [a] -> [b]
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
fmap object -> objectId
objectId [object]
objs))
                  (StrictSeq objectId
-> Map objectId object
-> [object]
-> OutboundStIdle objectId object m ()
outboundIdle StrictSeq objectId
unackedSeq'' Map objectId object
unackedMap'' [object]
remainingObjects')

          recvMsgRequestObjects :: [objectId]
                                -> m (OutboundStObjects objectId object m ())
          recvMsgRequestObjects :: [objectId] -> m (OutboundStObjects objectId object m ())
recvMsgRequestObjects [objectId]
objectIds = do
            Tracer m (TraceObjectDiffusionTestImplem objectId object)
-> TraceObjectDiffusionTestImplem objectId object -> m ()
forall (m :: * -> *) a. Tracer m a -> a -> m ()
traceWith Tracer m (TraceObjectDiffusionTestImplem objectId object)
tracer (TraceObjectDiffusionTestImplem objectId object -> m ())
-> TraceObjectDiffusionTestImplem objectId object -> m ()
forall a b. (a -> b) -> a -> b
$
              StrictSeq objectId
-> Map objectId object
-> [object]
-> [objectId]
-> TraceObjectDiffusionTestImplem objectId object
forall objectId object.
StrictSeq objectId
-> Map objectId object
-> [object]
-> [objectId]
-> TraceObjectDiffusionTestImplem objectId object
EventRecvMsgRequestObjects
                StrictSeq objectId
unackedSeq Map objectId object
unackedMap [object]
remainingObjects [objectId]
objectIds
            case [ objectId
objId | objectId
objId <- [objectId]
objectIds, objectId
objId objectId -> Map objectId object -> Bool
forall k a. Ord k => k -> Map k a -> Bool
`Map.notMember` Map objectId object
unackedMap ] of
              [] -> OutboundStObjects objectId object m ()
-> m (OutboundStObjects objectId object m ())
forall a. a -> m a
forall (f :: * -> *) a. Applicative f => a -> f a
pure ([object]
-> OutboundStIdle objectId object m ()
-> OutboundStObjects objectId object m ()
forall object objectId (m :: * -> *) a.
[object]
-> OutboundStIdle objectId object m a
-> OutboundStObjects objectId object m a
SendMsgReplyObjects [object]
objects OutboundStIdle objectId object m ()
outbound')
                where
                  objects :: [object]
objects     = (objectId -> object) -> [objectId] -> [object]
forall a b. (a -> b) -> [a] -> [b]
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
fmap (Map objectId object
unackedMap Map objectId object -> objectId -> object
forall k a. Ord k => Map k a -> k -> a
Map.!) [objectId]
objectIds
                  outbound' :: OutboundStIdle objectId object m ()
outbound'   = StrictSeq objectId
-> Map objectId object
-> [object]
-> OutboundStIdle objectId object m ()
outboundIdle StrictSeq objectId
unackedSeq Map objectId object
unackedMap' [object]
remainingObjects
                  unackedMap' :: Map objectId object
unackedMap' = (objectId -> Map objectId object -> Map objectId object)
-> Map objectId object -> [objectId] -> Map objectId object
forall a b. (a -> b -> b) -> b -> [a] -> b
forall (t :: * -> *) a b.
Foldable t =>
(a -> b -> b) -> b -> t a -> b
foldr objectId -> Map objectId object -> Map objectId object
forall k a. Ord k => k -> Map k a -> Map k a
Map.delete Map objectId object
unackedMap [objectId]
objectIds
                  -- Here we remove from the map, while the seq stays unchanged.
                  -- This enforces that each object can be requested at most once.

              [objectId]
missing -> String -> m (OutboundStObjects objectId object m ())
forall a. HasCallStack => String -> a
error (String -> m (OutboundStObjects objectId object m ()))
-> String -> m (OutboundStObjects objectId object m ())
forall a b. (a -> b) -> a -> b
$ String
"testObjectDiffusionOutbound.recvMsgRequestObjects: "
                              String -> ShowS
forall a. Semigroup a => a -> a -> a
<> String
"requested missing ObjectIds: " String -> ShowS
forall a. Semigroup a => a -> a -> a
<> [objectId] -> String
forall a. Show a => a -> String
show [objectId]
missing

          recvMsgDone :: m ()
          recvMsgDone :: m ()
recvMsgDone = () -> m ()
forall a. a -> m a
forall (f :: * -> *) a. Applicative f => a -> f a
pure ()


--
-- Inbound implementation
--

data InboundState objectId object = InboundState {
    forall objectId object.
InboundState objectId object -> NumObjectIdsReq
requestedObjectIdsInFlight :: NumObjectIdsReq,
    forall objectId object.
InboundState objectId object -> StrictSeq objectId
unacknowledgedObjectIds    :: StrictSeq objectId,
    forall objectId object.
InboundState objectId object -> Set objectId
availableObjectIds         :: Set objectId,
    forall objectId object.
InboundState objectId object -> Map objectId (Maybe object)
bufferedObjects            :: Map objectId (Maybe object),
    forall objectId object.
InboundState objectId object -> NumObjectIdsAck
numObjectsToAcknowledge    :: NumObjectIdsAck
  }
  deriving Int -> InboundState objectId object -> ShowS
[InboundState objectId object] -> ShowS
InboundState objectId object -> String
(Int -> InboundState objectId object -> ShowS)
-> (InboundState objectId object -> String)
-> ([InboundState objectId object] -> ShowS)
-> Show (InboundState objectId object)
forall a.
(Int -> a -> ShowS) -> (a -> String) -> ([a] -> ShowS) -> Show a
forall objectId object.
(Show objectId, Show object) =>
Int -> InboundState objectId object -> ShowS
forall objectId object.
(Show objectId, Show object) =>
[InboundState objectId object] -> ShowS
forall objectId object.
(Show objectId, Show object) =>
InboundState objectId object -> String
$cshowsPrec :: forall objectId object.
(Show objectId, Show object) =>
Int -> InboundState objectId object -> ShowS
showsPrec :: Int -> InboundState objectId object -> ShowS
$cshow :: forall objectId object.
(Show objectId, Show object) =>
InboundState objectId object -> String
show :: InboundState objectId object -> String
$cshowList :: forall objectId object.
(Show objectId, Show object) =>
[InboundState objectId object] -> ShowS
showList :: [InboundState objectId object] -> ShowS
Show


initialInboundState :: InboundState objectId object
initialInboundState :: forall objectId object. InboundState objectId object
initialInboundState = NumObjectIdsReq
-> StrictSeq objectId
-> Set objectId
-> Map objectId (Maybe object)
-> NumObjectIdsAck
-> InboundState objectId object
forall objectId object.
NumObjectIdsReq
-> StrictSeq objectId
-> Set objectId
-> Map objectId (Maybe object)
-> NumObjectIdsAck
-> InboundState objectId object
InboundState NumObjectIdsReq
0 StrictSeq objectId
forall a. StrictSeq a
Seq.empty Set objectId
forall a. Set a
Set.empty Map objectId (Maybe object)
forall k a. Map k a
Map.empty NumObjectIdsAck
0


testObjectDiffusionInbound
  :: forall objectId object m.
     (Ord objectId, WithCaughtUpDetection objectId)
  => Tracer m (TraceObjectDiffusionTestImplem objectId object)
  -> (object -> objectId)
  -> Word16  -- ^ Maximum number of unacknowledged object IDs allowed
  -> Word16  -- ^ Maximum number of object IDs to request in any one go
  -> Word16  -- ^ Maximum number of objects to request in any one go
  -> ObjectDiffusionInboundPipelined objectId object m [object]
testObjectDiffusionInbound :: forall objectId object (m :: * -> *).
(Ord objectId, WithCaughtUpDetection objectId) =>
Tracer m (TraceObjectDiffusionTestImplem objectId object)
-> (object -> objectId)
-> Word16
-> Word16
-> Word16
-> ObjectDiffusionInboundPipelined objectId object m [object]
testObjectDiffusionInbound
  Tracer m (TraceObjectDiffusionTestImplem objectId object)
_tracer
  object -> objectId
objectId
  Word16
maxUnacked
  Word16
maxObjectIdsToRequest
  Word16
maxObjectsToRequest =
    InboundStIdle 'Z objectId object m [object]
-> ObjectDiffusionInboundPipelined objectId object m [object]
forall objectId object (m :: * -> *) a.
InboundStIdle 'Z objectId object m a
-> ObjectDiffusionInboundPipelined objectId object m a
ObjectDiffusionInboundPipelined ([object]
-> Nat 'Z
-> InboundState objectId object
-> InboundStIdle 'Z objectId object m [object]
forall (n :: N).
[object]
-> Nat n
-> InboundState objectId object
-> InboundStIdle n objectId object m [object]
inboundIdle [] Nat 'Z
forall (n :: N). ('Z ~ n) => Nat n
Zero InboundState objectId object
forall objectId object. InboundState objectId object
initialInboundState)
  where
    inboundIdle :: forall (n :: N).
                  [object]
               -> Nat n
               -> InboundState objectId object
               -> InboundStIdle n objectId object m [object]
    inboundIdle :: forall (n :: N).
[object]
-> Nat n
-> InboundState objectId object
-> InboundStIdle n objectId object m [object]
inboundIdle [object]
accum Nat n
Zero InboundState objectId object
st
        -- There are no replies in flight, but we do know some more objects we
        -- can ask for, so lets ask for them and more object IDs.
      | InboundState objectId object -> Bool
forall k. InboundState k object -> Bool
canRequestMoreObjects InboundState objectId object
st
      = [object]
-> Nat n
-> InboundState objectId object
-> InboundStIdle n objectId object m [object]
forall (n :: N).
[object]
-> Nat n
-> InboundState objectId object
-> InboundStIdle n objectId object m [object]
inboundReqObjects [object]
accum Nat n
forall (n :: N). ('Z ~ n) => Nat n
Zero InboundState objectId object
st

        -- There's no replies in flight, and we have no more objects we can ask
        -- for so the only remaining thing to do is to ask for more object IDs.
        -- Since this is the only thing to do now, we make this a blocking call.
      | Bool
otherwise
      , let numObjectIdsToRequest :: NumObjectIdsReq
numObjectIdsToRequest =
              Word16 -> NumObjectIdsReq
NumObjectIdsReq (Word16 -> NumObjectIdsReq) -> Word16 -> NumObjectIdsReq
forall a b. (a -> b) -> a -> b
$ Word16
maxObjectIdsToRequest Word16 -> Word16 -> Word16
forall a. Ord a => a -> a -> a
`min` Word16
maxUnacked
      = Bool
-> InboundStIdle n objectId object m [object]
-> InboundStIdle n objectId object m [object]
forall a. HasCallStack => Bool -> a -> a
assert (InboundState objectId object -> NumObjectIdsReq
forall objectId object.
InboundState objectId object -> NumObjectIdsReq
requestedObjectIdsInFlight InboundState objectId object
st NumObjectIdsReq -> NumObjectIdsReq -> Bool
forall a. Eq a => a -> a -> Bool
== NumObjectIdsReq
0
             Bool -> Bool -> Bool
&& StrictSeq objectId -> Bool
forall a. StrictSeq a -> Bool
Seq.null (InboundState objectId object -> StrictSeq objectId
forall objectId object.
InboundState objectId object -> StrictSeq objectId
unacknowledgedObjectIds InboundState objectId object
st)
             Bool -> Bool -> Bool
&& Set objectId -> Bool
forall a. Set a -> Bool
Set.null (InboundState objectId object -> Set objectId
forall objectId object.
InboundState objectId object -> Set objectId
availableObjectIds InboundState objectId object
st)
             Bool -> Bool -> Bool
&& Map objectId (Maybe object) -> Bool
forall k a. Map k a -> Bool
Map.null (InboundState objectId object -> Map objectId (Maybe object)
forall objectId object.
InboundState objectId object -> Map objectId (Maybe object)
bufferedObjects InboundState objectId object
st)) (InboundStIdle n objectId object m [object]
 -> InboundStIdle n objectId object m [object])
-> InboundStIdle n objectId object m [object]
-> InboundStIdle n objectId object m [object]
forall a b. (a -> b) -> a -> b
$
        NumObjectIdsAck
-> NumObjectIdsReq
-> (NonEmpty objectId
    -> InboundStIdle 'Z objectId object m [object])
-> InboundStIdle 'Z objectId object m [object]
forall objectId object (m :: * -> *) a.
NumObjectIdsAck
-> NumObjectIdsReq
-> (NonEmpty objectId -> InboundStIdle 'Z objectId object m a)
-> InboundStIdle 'Z objectId object m a
SendMsgRequestObjectIdsBlocking
          (InboundState objectId object -> NumObjectIdsAck
forall objectId object.
InboundState objectId object -> NumObjectIdsAck
numObjectsToAcknowledge InboundState objectId object
st)
          NumObjectIdsReq
numObjectIdsToRequest
          -- We use 'ifCaughtUp' here to detect if the outbound side ha
          -- signaled that there are no more objects to send, in which case we
          -- terminate the protocol gracefully using 'SendMsgDone'.
          (InboundStIdle 'Z objectId object m [object]
-> (NonEmpty objectId
    -> InboundStIdle 'Z objectId object m [object])
-> NonEmpty objectId
-> InboundStIdle 'Z objectId object m [object]
forall objectId object (m :: * -> *) a.
WithCaughtUpDetection objectId =>
InboundStIdle 'Z objectId object m a
-> (NonEmpty objectId -> InboundStIdle 'Z objectId object m a)
-> NonEmpty objectId
-> InboundStIdle 'Z objectId object m a
forall object (m :: * -> *) a.
InboundStIdle 'Z objectId object m a
-> (NonEmpty objectId -> InboundStIdle 'Z objectId object m a)
-> NonEmpty objectId
-> InboundStIdle 'Z objectId object m a
ifCaughtUp
            ([object] -> InboundStIdle 'Z objectId object m [object]
forall a objectId object (m :: * -> *).
a -> InboundStIdle 'Z objectId object m a
SendMsgDone [object]
accum)
            ([object]
-> Nat 'Z
-> InboundState objectId object
-> Collect objectId object
-> InboundStIdle 'Z objectId object m [object]
forall (n :: N).
[object]
-> Nat n
-> InboundState objectId object
-> Collect objectId object
-> InboundStIdle n objectId object m [object]
handleReply [object]
accum Nat 'Z
forall (n :: N). ('Z ~ n) => Nat n
Zero InboundState objectId object
st {
                    numObjectsToAcknowledge    = 0,
                    requestedObjectIdsInFlight = numObjectIdsToRequest
                  }
                  (Collect objectId object
 -> InboundStIdle 'Z objectId object m [object])
-> (NonEmpty objectId -> Collect objectId object)
-> NonEmpty objectId
-> InboundStIdle 'Z objectId object m [object]
forall b c a. (b -> c) -> (a -> b) -> a -> c
. NumObjectIdsReq -> [objectId] -> Collect objectId object
forall objectId object.
NumObjectIdsReq -> [objectId] -> Collect objectId object
CollectObjectIds NumObjectIdsReq
numObjectIdsToRequest
                  ([objectId] -> Collect objectId object)
-> (NonEmpty objectId -> [objectId])
-> NonEmpty objectId
-> Collect objectId object
forall b c a. (b -> c) -> (a -> b) -> a -> c
. NonEmpty objectId -> [objectId]
forall a. NonEmpty a -> [a]
NonEmpty.toList)
          )

    inboundIdle [object]
accum (Succ Nat n
n) InboundState objectId object
st
        -- We have replies in flight and we should eagerly collect them if
        -- available, but there are objects to request too so we should not
        -- block waiting for replies.
        --
        -- Having requested more objects, we opportunistically ask for more
        -- object IDs in a non-blocking way. This is how we pipeline asking for
        -- both objects and object IDs.
        --
        -- It's important not to pipeline more requests for object IDs when we
        -- have no objects to ask for, since (with no other guard) this will
        -- put us into a busy-polling loop.
        --
      | InboundState objectId object -> Bool
forall k. InboundState k object -> Bool
canRequestMoreObjects InboundState objectId object
st
      = Maybe (InboundStIdle ('S n) objectId object m [object])
-> (Collect objectId object
    -> InboundStIdle n objectId object m [object])
-> InboundStIdle ('S n) objectId object m [object]
forall (n1 :: N) objectId object (m :: * -> *) a.
Maybe (InboundStIdle ('S n1) objectId object m a)
-> (Collect objectId object
    -> InboundStIdle n1 objectId object m a)
-> InboundStIdle ('S n1) objectId object m a
CollectPipelined
          (InboundStIdle ('S n) objectId object m [object]
-> Maybe (InboundStIdle ('S n) objectId object m [object])
forall a. a -> Maybe a
Just ([object]
-> Nat ('S n)
-> InboundState objectId object
-> InboundStIdle ('S n) objectId object m [object]
forall (n :: N).
[object]
-> Nat n
-> InboundState objectId object
-> InboundStIdle n objectId object m [object]
inboundReqObjects [object]
accum (Nat n -> Nat ('S n)
forall (m :: N) (n :: N). (m ~ 'S n) => Nat n -> Nat m
Succ Nat n
n) InboundState objectId object
st))
          ([object]
-> Nat n
-> InboundState objectId object
-> Collect objectId object
-> InboundStIdle n objectId object m [object]
forall (n :: N).
[object]
-> Nat n
-> InboundState objectId object
-> Collect objectId object
-> InboundStIdle n objectId object m [object]
handleReply [object]
accum Nat n
n InboundState objectId object
st)

        -- In this case there is nothing else to do so we block until we
        -- collect a reply.
      | Bool
otherwise
      = Maybe (InboundStIdle ('S n) objectId object m [object])
-> (Collect objectId object
    -> InboundStIdle n objectId object m [object])
-> InboundStIdle ('S n) objectId object m [object]
forall (n1 :: N) objectId object (m :: * -> *) a.
Maybe (InboundStIdle ('S n1) objectId object m a)
-> (Collect objectId object
    -> InboundStIdle n1 objectId object m a)
-> InboundStIdle ('S n1) objectId object m a
CollectPipelined
          Maybe (InboundStIdle ('S n) objectId object m [object])
forall a. Maybe a
Nothing
          ([object]
-> Nat n
-> InboundState objectId object
-> Collect objectId object
-> InboundStIdle n objectId object m [object]
forall (n :: N).
[object]
-> Nat n
-> InboundState objectId object
-> Collect objectId object
-> InboundStIdle n objectId object m [object]
handleReply [object]
accum Nat n
n InboundState objectId object
st)

    canRequestMoreObjects :: InboundState k object -> Bool
    canRequestMoreObjects :: forall k. InboundState k object -> Bool
canRequestMoreObjects InboundState k object
st =
        Bool -> Bool
not (Set k -> Bool
forall a. Set a -> Bool
Set.null (InboundState k object -> Set k
forall objectId object.
InboundState objectId object -> Set objectId
availableObjectIds InboundState k object
st))

    handleReply :: forall (n :: N).
                   [object]
                -> Nat n
                -> InboundState objectId object
                -> Collect objectId object
                -> InboundStIdle n objectId object m [object]
    handleReply :: forall (n :: N).
[object]
-> Nat n
-> InboundState objectId object
-> Collect objectId object
-> InboundStIdle n objectId object m [object]
handleReply [object]
accum Nat n
n InboundState objectId object
st (CollectObjectIds NumObjectIdsReq
reqNo [objectId]
objectIds) =
      -- Upon receiving a batch of new object IDs we extend our available set,
      -- and extended the unacknowledged sequence.
      [object]
-> Nat n
-> InboundState objectId object
-> InboundStIdle n objectId object m [object]
forall (n :: N).
[object]
-> Nat n
-> InboundState objectId object
-> InboundStIdle n objectId object m [object]
inboundIdle [object]
accum Nat n
n InboundState objectId object
st {
        requestedObjectIdsInFlight = requestedObjectIdsInFlight st - reqNo,
        unacknowledgedObjectIds    = unacknowledgedObjectIds st
                                  <> Seq.fromList objectIds,
        availableObjectIds         = availableObjectIds st
                                  <> Set.fromList objectIds
      }

    handleReply [object]
accum Nat n
n InboundState objectId object
st (CollectObjects [objectId]
objectIds [object]
objects) =
      -- When we receive a batch of objects, in general we get a subset of those
      -- that we asked for, with the remainder now deemed unnecessary.
      -- But we still have to acknowledge the object IDs we were given. This
      -- combined with the fact that we request objects out of order means our
      -- bufferedObjects has to track all the object IDs we asked for, even
      -- though not all have replies.
      --
      -- We have to update the unacknowledgedObjectIds here eagerly and not
      -- delay it to inboundReqObjects, otherwise we could end up blocking in
      -- inboundIdle on more pipelined results rather than being able to move on.
      [object]
-> Nat n
-> InboundState objectId object
-> InboundStIdle n objectId object m [object]
forall (n :: N).
[object]
-> Nat n
-> InboundState objectId object
-> InboundStIdle n objectId object m [object]
inboundIdle [object]
accum' Nat n
n InboundState objectId object
st {
        bufferedObjects         = bufferedObjects'',
        unacknowledgedObjectIds = unacknowledgedObjectIds',
        numObjectsToAcknowledge = numObjectsToAcknowledge st
                                + fromIntegral (Seq.length acknowledgedObjectIds)
      }
      where
        objectIdsRequestedWithObjectsReceived :: [(objectId, Maybe object)]
        objectIdsRequestedWithObjectsReceived :: [(objectId, Maybe object)]
objectIdsRequestedWithObjectsReceived =
          [ (objectId
objId, Maybe object
mbObj)
          | let objsMap :: Map objectId object
                objsMap :: Map objectId object
objsMap  = [(objectId, object)] -> Map objectId object
forall k a. Ord k => [(k, a)] -> Map k a
Map.fromList [ (object -> objectId
objectId object
obj, object
obj) | object
obj <- [object]
objects ]
          , objectId
objId <- [objectId]
objectIds
          , let !mbObj :: Maybe object
mbObj = objectId -> Map objectId object -> Maybe object
forall k a. Ord k => k -> Map k a -> Maybe a
Map.lookup objectId
objId Map objectId object
objsMap
          ]

        bufferedObjects' :: Map objectId (Maybe object)
bufferedObjects'  = InboundState objectId object -> Map objectId (Maybe object)
forall objectId object.
InboundState objectId object -> Map objectId (Maybe object)
bufferedObjects InboundState objectId object
st
                         Map objectId (Maybe object)
-> Map objectId (Maybe object) -> Map objectId (Maybe object)
forall a. Semigroup a => a -> a -> a
<> [(objectId, Maybe object)] -> Map objectId (Maybe object)
forall k a. Ord k => [(k, a)] -> Map k a
Map.fromList [(objectId, Maybe object)]
objectIdsRequestedWithObjectsReceived

        -- Check if having received more objects we can now confirm any (in
        -- strict order in the unacknowledgedObjectIds sequence).
        (StrictSeq objectId
acknowledgedObjectIds, StrictSeq objectId
unacknowledgedObjectIds') =
          (objectId -> Bool)
-> StrictSeq objectId -> (StrictSeq objectId, StrictSeq objectId)
forall a. (a -> Bool) -> StrictSeq a -> (StrictSeq a, StrictSeq a)
Seq.spanl (objectId -> Map objectId (Maybe object) -> Bool
forall k a. Ord k => k -> Map k a -> Bool
`Map.member` Map objectId (Maybe object)
bufferedObjects') (InboundState objectId object -> StrictSeq objectId
forall objectId object.
InboundState objectId object -> StrictSeq objectId
unacknowledgedObjectIds InboundState objectId object
st)

        -- If so we can add the acknowledged objects to our accumulating result
        accum' :: [object]
accum' = [object]
accum
              [object] -> [object] -> [object]
forall a. Semigroup a => a -> a -> a
<> (objectId -> [object] -> [object])
-> [object] -> StrictSeq objectId -> [object]
forall a b. (a -> b -> b) -> b -> StrictSeq a -> b
forall (t :: * -> *) a b.
Foldable t =>
(a -> b -> b) -> b -> t a -> b
foldr
                   (\objectId
objId [object]
r -> [object] -> (object -> [object]) -> Maybe object -> [object]
forall b a. b -> (a -> b) -> Maybe a -> b
maybe [object]
r (object -> [object] -> [object]
forall a. a -> [a] -> [a]
:[object]
r) (Map objectId (Maybe object)
bufferedObjects' Map objectId (Maybe object) -> objectId -> Maybe object
forall k a. Ord k => Map k a -> k -> a
Map.! objectId
objId))
                   []
                   StrictSeq objectId
acknowledgedObjectIds

        -- And remove acknowledged objects from our buffer
        bufferedObjects'' :: Map objectId (Maybe object)
bufferedObjects'' =
          (Map objectId (Maybe object)
 -> objectId -> Map objectId (Maybe object))
-> Map objectId (Maybe object)
-> StrictSeq objectId
-> Map objectId (Maybe object)
forall b a. (b -> a -> b) -> b -> StrictSeq a -> b
forall (t :: * -> *) b a.
Foldable t =>
(b -> a -> b) -> b -> t a -> b
Foldable.foldl' ((objectId
 -> Map objectId (Maybe object) -> Map objectId (Maybe object))
-> Map objectId (Maybe object)
-> objectId
-> Map objectId (Maybe object)
forall a b c. (a -> b -> c) -> b -> a -> c
flip objectId
-> Map objectId (Maybe object) -> Map objectId (Maybe object)
forall k a. Ord k => k -> Map k a -> Map k a
Map.delete) Map objectId (Maybe object)
bufferedObjects' StrictSeq objectId
acknowledgedObjectIds

    inboundReqObjects :: forall (n :: N).
                         [object]
                      -> Nat n
                      -> InboundState objectId object
                      -> InboundStIdle n objectId object m [object]
    inboundReqObjects :: forall (n :: N).
[object]
-> Nat n
-> InboundState objectId object
-> InboundStIdle n objectId object m [object]
inboundReqObjects [object]
accum Nat n
n InboundState objectId object
st =
        [objectId]
-> InboundStIdle ('S n) objectId object m [object]
-> InboundStIdle n objectId object m [object]
forall objectId (n :: N) object (m :: * -> *) a.
[objectId]
-> InboundStIdle ('S n) objectId object m a
-> InboundStIdle n objectId object m a
SendMsgRequestObjectsPipelined
          (Set objectId -> [objectId]
forall a. Set a -> [a]
Set.toList Set objectId
objectsToRequest)
          ([object]
-> Nat ('S n)
-> InboundState objectId object
-> InboundStIdle ('S n) objectId object m [object]
forall (n :: N).
[object]
-> Nat n
-> InboundState objectId object
-> InboundStIdle n objectId object m [object]
inboundReqObjectIds [object]
accum (Nat n -> Nat ('S n)
forall (m :: N) (n :: N). (m ~ 'S n) => Nat n -> Nat m
Succ Nat n
n) InboundState objectId object
st {
             availableObjectIds = availableObjectIds'
          })
      where
        (Set objectId
objectsToRequest, Set objectId
availableObjectIds') =
          Int -> Set objectId -> (Set objectId, Set objectId)
forall a. Int -> Set a -> (Set a, Set a)
Set.splitAt (Word16 -> Int
forall a b. (Integral a, Num b) => a -> b
fromIntegral Word16
maxObjectsToRequest) (InboundState objectId object -> Set objectId
forall objectId object.
InboundState objectId object -> Set objectId
availableObjectIds InboundState objectId object
st)

    inboundReqObjectIds :: forall (n :: N).
                           [object]
                        -> Nat n
                        -> InboundState objectId object
                        -> InboundStIdle n objectId object m [object]
    inboundReqObjectIds :: forall (n :: N).
[object]
-> Nat n
-> InboundState objectId object
-> InboundStIdle n objectId object m [object]
inboundReqObjectIds [object]
accum Nat n
n InboundState objectId object
st
      | NumObjectIdsReq
numObjectIdsToRequest NumObjectIdsReq -> NumObjectIdsReq -> Bool
forall a. Ord a => a -> a -> Bool
> NumObjectIdsReq
0
      = NumObjectIdsAck
-> NumObjectIdsReq
-> InboundStIdle ('S n) objectId object m [object]
-> InboundStIdle n objectId object m [object]
forall (n :: N) objectId object (m :: * -> *) a.
NumObjectIdsAck
-> NumObjectIdsReq
-> InboundStIdle ('S n) objectId object m a
-> InboundStIdle n objectId object m a
SendMsgRequestObjectIdsPipelined
          (InboundState objectId object -> NumObjectIdsAck
forall objectId object.
InboundState objectId object -> NumObjectIdsAck
numObjectsToAcknowledge InboundState objectId object
st)
          NumObjectIdsReq
numObjectIdsToRequest
          ([object]
-> Nat ('S n)
-> InboundState objectId object
-> InboundStIdle ('S n) objectId object m [object]
forall (n :: N).
[object]
-> Nat n
-> InboundState objectId object
-> InboundStIdle n objectId object m [object]
inboundIdle [object]
accum (Nat n -> Nat ('S n)
forall (m :: N) (n :: N). (m ~ 'S n) => Nat n -> Nat m
Succ Nat n
n) InboundState objectId object
st {
             requestedObjectIdsInFlight = requestedObjectIdsInFlight st
                                        + numObjectIdsToRequest,
             numObjectsToAcknowledge    = 0
          })

      | Bool
otherwise
      = [object]
-> Nat n
-> InboundState objectId object
-> InboundStIdle n objectId object m [object]
forall (n :: N).
[object]
-> Nat n
-> InboundState objectId object
-> InboundStIdle n objectId object m [object]
inboundIdle [object]
accum Nat n
n InboundState objectId object
st
      where
        -- This definition is justified by the fact that the
        -- 'numObjectsToAcknowledge' are not included in the
        -- 'unacknowledgedObjectIds'.
        numObjectIdsToRequest :: NumObjectIdsReq
numObjectIdsToRequest =
          Word16 -> NumObjectIdsReq
NumObjectIdsReq (Word16 -> NumObjectIdsReq) -> Word16 -> NumObjectIdsReq
forall a b. (a -> b) -> a -> b
$
                (Word16
maxUnacked
                  Word16 -> Word16 -> Word16
forall a. Num a => a -> a -> a
- Int -> Word16
forall a b. (Integral a, Num b) => a -> b
fromIntegral (StrictSeq objectId -> Int
forall a. StrictSeq a -> Int
Seq.length (InboundState objectId object -> StrictSeq objectId
forall objectId object.
InboundState objectId object -> StrictSeq objectId
unacknowledgedObjectIds InboundState objectId object
st))
                  Word16 -> Word16 -> Word16
forall a. Num a => a -> a -> a
- NumObjectIdsReq -> Word16
getNumObjectIdsReq (InboundState objectId object -> NumObjectIdsReq
forall objectId object.
InboundState objectId object -> NumObjectIdsReq
requestedObjectIdsInFlight InboundState objectId object
st))
          Word16 -> Word16 -> Word16
forall a. Ord a => a -> a -> a
`min` Word16
maxObjectIdsToRequest