{-# LANGUAGE FlexibleContexts #-} {-# LANGUAGE MultiParamTypeClasses #-} {-# LANGUAGE NamedFieldPuns #-} {-# LANGUAGE RankNTypes #-} {-# LANGUAGE TypeFamilies #-} module Network.Mux.Egress ( muxer -- $egress -- $servicingsSemantics , EgressQueue , TranslocationServiceRequest (..) , Wanton (..) ) where import Control.Monad import Data.ByteString.Lazy qualified as BL import Control.Concurrent.Class.MonadSTM.Strict import Control.Monad.Class.MonadAsync import Control.Monad.Class.MonadThrow import Control.Monad.Class.MonadTimer.SI hiding (timeout) import Network.Mux.Timeout import Network.Mux.Types -- $servicingsSemantics -- = Desired Servicing Semantics -- -- == /Constructing Fairness/ -- -- In this context we are defining fairness as: -- - no starvation -- - when presented with equal demand (from a selection of mini -- protocols) deliver "equal" service. -- -- Equality here might be in terms of equal service rate of -- requests (or segmented requests) and/or in terms of effective -- (SDU) data rates. -- -- -- Notes: -- -- 1) It is assumed that (for a given peer) that bulk delivery of -- blocks (i.e. in recovery mode) and normal, interactive, -- operation (e.g. chain following) are mutually exclusive. As -- such there is no requirement to create a notion of -- prioritisation between such traffic. -- -- 2) We are assuming that the underlying TCP/IP bearer is managed -- so that individual Mux-layer PDUs are paced. a) this is necessary -- to mitigate head-of-line blocking effects (i.e. arbitrary -- amounts of data accruing in the O/S kernel); b) ensuring that -- any host egress data rate limits can be respected / enforced. -- -- == /Current Caveats/ -- -- 1) Not considering how mini-protocol associations are constructed -- (depending on deployment model this might be resolved within -- the instantiation of the peer relationship) -- -- 2) Not yet considered notion of orderly termination - this not -- likely to be used in an operational context, but may be needed -- for test harness use. -- -- == /Principle of Operation/ -- -- -- Egress direction (mini protocol instance to remote peer) -- -- The request for service (the demand) from a mini protocol is -- encapsulated in a `Wanton`, such `Wanton`s are placed in a (finite) -- queue (e.g TBMQ) of `TranslocationServiceRequest`s. -- -- $egress -- = Egress Path -- -- > ┌───────────┐ ┌───────────┐ ┌───────────┐ ┌───────────┐ Every mode per miniprotocol -- > │ muxDuplex │ │ muxDuplex │ │ muxDuplex │ │ muxDuplex │ has a dedicated thread which -- > │ Initiator │ │ Responder │ │ Initiator │ │ Responder │ will send ByteStrings of CBOR -- > │ ChainSync │ │ ChainSync │ │ BlockFetch│ │ BlockFetch│ encoded data. -- > └─────┬─────┘ └─────┬─────┘ └─────┬─────┘ └─────┬─────┘ -- > │ │ │ │ -- > │ │ │ │ -- > ╰─────────────┴──────┬──────┴─────────────╯ -- > │ -- > application data -- > │ -- > ░░░▼░░ -- > ░│ │░ For a given Mux Bearer there is a single egress -- > ░│ci│░ queue shared among all miniprotocols. To ensure -- > ░│cr│░ fairness each miniprotocol can at most have one -- > ░└──┘░ message in the queue, see Desired Servicing -- > ░░░│░░ Semantics. -- > ░│░ -- > ░░░░░▼░░░ -- > ░┌─────┐░ The egress queue is served by a dedicated thread -- > ░│ mux │░ which chops up the CBOR data into MuxSDUs with at -- > ░└─────┘░ most sduSize bytes of data in them. -- > ░░░░│░░░░ -- > ░│░ MuxSDUs -- > ░│░ -- > ░░░░░░░░░▼░░░░░░░░░░ -- > ░┌────────────────┐░ -- > ░│ Bearer.write() │░ Mux Bearer implementation specific write -- > ░└────────────────┘░ -- > ░░░░░░░░░│░░░░░░░░░░ -- > │ ByteStrings -- > ▼ -- > ● type EgressQueue m = StrictTBQueue m (TranslocationServiceRequest m) -- | A TranslocationServiceRequest is a demand for the translocation -- of a single mini-protocol message. This message can be of -- arbitrary (yet bounded) size. This multiplexing layer is -- responsible for the segmentation of concrete representation into -- appropriate SDU's for onward transmission. data TranslocationServiceRequest m = TLSRDemand !MiniProtocolNum !MiniProtocolDir !(Wanton m) -- | A Wanton represent the concrete data to be translocated, note that the -- TVar becoming empty indicates -- that the last fragment of the data has -- been enqueued on the -- underlying bearer. newtype Wanton m = Wanton { forall (m :: * -> *). Wanton m -> StrictTVar m ByteString want :: StrictTVar m BL.ByteString } -- | Process the messages from the mini protocols - there is a single -- shared FIFO that contains the items of work. This is processed so -- that each active demand gets a `maxSDU`s work of data processed -- each time it gets to the front of the queue muxer :: ( MonadAsync m , MonadFork m , MonadMask m , MonadThrow (STM m) , MonadTimer m ) => EgressQueue m -> Bearer m -> m void muxer :: forall (m :: * -> *) void. (MonadAsync m, MonadFork m, MonadMask m, MonadThrow (STM m), MonadTimer m) => EgressQueue m -> Bearer m -> m void muxer EgressQueue m egressQueue Bearer m bearer = (TimeoutFn m -> m void) -> m void forall (m :: * -> *) b. (MonadAsync m, MonadFork m, MonadMonotonicTime m, MonadTimer m, MonadMask m, MonadThrow (STM m)) => (TimeoutFn m -> m b) -> m b withTimeoutSerial ((TimeoutFn m -> m void) -> m void) -> (TimeoutFn m -> m void) -> m void forall a b. (a -> b) -> a -> b $ \TimeoutFn m timeout -> m () -> m void forall (f :: * -> *) a b. Applicative f => f a -> f b forever (m () -> m void) -> m () -> m void forall a b. (a -> b) -> a -> b $ do TLSRDemand mpc md d <- STM m (TranslocationServiceRequest m) -> m (TranslocationServiceRequest m) forall a. HasCallStack => STM m a -> m a forall (m :: * -> *) a. (MonadSTM m, HasCallStack) => STM m a -> m a atomically (STM m (TranslocationServiceRequest m) -> m (TranslocationServiceRequest m)) -> STM m (TranslocationServiceRequest m) -> m (TranslocationServiceRequest m) forall a b. (a -> b) -> a -> b $ EgressQueue m -> STM m (TranslocationServiceRequest m) forall (m :: * -> *) a. MonadSTM m => StrictTBQueue m a -> STM m a readTBQueue EgressQueue m egressQueue processSingleWanton egressQueue bearer timeout mpc md d -- | Pull a `maxSDU`s worth of data out out the `Wanton` - if there is -- data remaining requeue the `TranslocationServiceRequest` (this -- ensures that any other items on the queue will get some service -- first. processSingleWanton :: MonadSTM m => EgressQueue m -> Bearer m -> TimeoutFn m -> MiniProtocolNum -> MiniProtocolDir -> Wanton m -> m () processSingleWanton :: forall (m :: * -> *). MonadSTM m => EgressQueue m -> Bearer m -> TimeoutFn m -> MiniProtocolNum -> MiniProtocolDir -> Wanton m -> m () processSingleWanton EgressQueue m egressQueue Bearer { TimeoutFn m -> SDU -> m Time write :: TimeoutFn m -> SDU -> m Time write :: forall (m :: * -> *). Bearer m -> TimeoutFn m -> SDU -> m Time write, SDUSize sduSize :: SDUSize sduSize :: forall (m :: * -> *). Bearer m -> SDUSize sduSize } TimeoutFn m timeout MiniProtocolNum mpc MiniProtocolDir md Wanton m wanton = do blob <- STM m ByteString -> m ByteString forall a. HasCallStack => STM m a -> m a forall (m :: * -> *) a. (MonadSTM m, HasCallStack) => STM m a -> m a atomically (STM m ByteString -> m ByteString) -> STM m ByteString -> m ByteString forall a b. (a -> b) -> a -> b $ do -- extract next SDU d <- StrictTVar m ByteString -> STM m ByteString forall (m :: * -> *) a. MonadSTM m => StrictTVar m a -> STM m a readTVar (Wanton m -> StrictTVar m ByteString forall (m :: * -> *). Wanton m -> StrictTVar m ByteString want Wanton m wanton) let (frag, rest) = BL.splitAt (fromIntegral (getSDUSize sduSize)) d -- if more to process then enqueue remaining work if BL.null rest then writeTVar (want wanton) BL.empty else do -- Note that to preserve bytestream ordering within a given -- miniprotocol the readTVar and writeTVar operations -- must be inside the same STM transaction. writeTVar (want wanton) rest writeTBQueue egressQueue (TLSRDemand mpc md wanton) -- return data to send pure frag let sdu = SDU { msHeader :: SDUHeader msHeader = SDUHeader { mhTimestamp :: RemoteClockModel mhTimestamp = (Word32 -> RemoteClockModel RemoteClockModel Word32 0), mhNum :: MiniProtocolNum mhNum = MiniProtocolNum mpc, mhDir :: MiniProtocolDir mhDir = MiniProtocolDir md, mhLength :: Word16 mhLength = Int64 -> Word16 forall a b. (Integral a, Num b) => a -> b fromIntegral (Int64 -> Word16) -> Int64 -> Word16 forall a b. (a -> b) -> a -> b $ ByteString -> Int64 BL.length ByteString blob }, msBlob :: ByteString msBlob = ByteString blob } void $ write timeout sdu --paceTransmission tNow