{-# LANGUAGE FlexibleContexts      #-}
{-# LANGUAGE MultiParamTypeClasses #-}
{-# LANGUAGE NamedFieldPuns        #-}
{-# LANGUAGE RankNTypes            #-}
{-# LANGUAGE TypeFamilies          #-}

module Network.Mux.Egress
  ( muxer
    -- $egress
    -- $servicingsSemantics
  , EgressQueue
  , TranslocationServiceRequest (..)
  , Wanton (..)
  ) where

import Control.Monad
import Data.ByteString.Lazy qualified as BL

import Control.Concurrent.Class.MonadSTM.Strict
import Control.Monad.Class.MonadAsync
import Control.Monad.Class.MonadThrow
import Control.Monad.Class.MonadTimer.SI hiding (timeout)

import Network.Mux.Timeout
import Network.Mux.Types

-- $servicingsSemantics
-- = Desired Servicing Semantics
--
--  == /Constructing Fairness/
--
--   In this context we are defining fairness as:
--    - no starvation
--    - when presented with equal demand (from a selection of mini
--      protocols) deliver "equal" service.
--
--   Equality here might be in terms of equal service rate of
--   requests (or segmented requests) and/or in terms of effective
--   (SDU) data rates.
--
--
--  Notes:
--
--   1) It is assumed that (for a given peer) that bulk delivery of
--      blocks (i.e. in recovery mode) and normal, interactive,
--      operation (e.g. chain following) are mutually exclusive. As
--      such there is no requirement to create a notion of
--      prioritisation between such traffic.
--
--   2) We are assuming that the underlying TCP/IP bearer is managed
--      so that individual Mux-layer PDUs are paced. a) this is necessary
--      to mitigate head-of-line blocking effects (i.e. arbitrary
--      amounts of data accruing in the O/S kernel); b) ensuring that
--      any host egress data rate limits can be respected / enforced.
--
--  == /Current Caveats/
--
--  1) Not considering how mini-protocol associations are constructed
--     (depending on deployment model this might be resolved within
--     the instantiation of the peer relationship)
--
--  2) Not yet considered notion of orderly termination - this not
--     likely to be used in an operational context, but may be needed
--     for test harness use.
--
--  == /Principle of Operation/
--
--
--  Egress direction (mini protocol instance to remote peer)
--
--  The request for service (the demand) from a mini protocol is
--  encapsulated in a `Wanton`, such `Wanton`s are placed in a (finite)
--  queue (e.g TBMQ) of `TranslocationServiceRequest`s.
--

-- $egress
-- = Egress Path
--
-- > ┌───────────┐ ┌───────────┐ ┌───────────┐ ┌───────────┐ Every mode per miniprotocol
-- > │ muxDuplex │ │ muxDuplex │ │ muxDuplex │ │ muxDuplex │ has a dedicated thread which
-- > │ Initiator │ │ Responder │ │ Initiator │ │ Responder │ will send ByteStrings of CBOR
-- > │ ChainSync │ │ ChainSync │ │ BlockFetch│ │ BlockFetch│ encoded data.
-- > └─────┬─────┘ └─────┬─────┘ └─────┬─────┘ └─────┬─────┘
-- >       │             │             │             │
-- >       │             │             │             │
-- >       ╰─────────────┴──────┬──────┴─────────────╯
-- >                            │
-- >                     application data
-- >                            │
-- >                         ░░░▼░░
-- >                         ░│  │░ For a given Mux Bearer there is a single egress
-- >                         ░│ci│░ queue shared among all miniprotocols. To ensure
-- >                         ░│cr│░ fairness each miniprotocol can at most have one
-- >                         ░└──┘░ message in the queue, see Desired Servicing
-- >                         ░░░│░░ Semantics.
-- >                           ░│░
-- >                       ░░░░░▼░░░
-- >                       ░┌─────┐░ The egress queue is served by a dedicated thread
-- >                       ░│ mux │░ which chops up the CBOR data into MuxSDUs with at
-- >                       ░└─────┘░ most sduSize bytes of data in them.
-- >                       ░░░░│░░░░
-- >                          ░│░ MuxSDUs
-- >                          ░│░
-- >                  ░░░░░░░░░▼░░░░░░░░░░
-- >                  ░┌────────────────┐░
-- >                  ░│ Bearer.write() │░ Mux Bearer implementation specific write
-- >                  ░└────────────────┘░
-- >                  ░░░░░░░░░│░░░░░░░░░░
-- >                           │ ByteStrings
-- >                           ▼
-- >                           ●

type EgressQueue m = StrictTBQueue m (TranslocationServiceRequest m)

-- | A TranslocationServiceRequest is a demand for the translocation
--  of a single mini-protocol message. This message can be of
--  arbitrary (yet bounded) size. This multiplexing layer is
--  responsible for the segmentation of concrete representation into
--  appropriate SDU's for onward transmission.
data TranslocationServiceRequest m =
     TLSRDemand !MiniProtocolNum !MiniProtocolDir !(Wanton m)

-- | A Wanton represent the concrete data to be translocated, note that the
--  TVar becoming empty indicates -- that the last fragment of the data has
--  been enqueued on the -- underlying bearer.
newtype Wanton m = Wanton { forall (m :: * -> *). Wanton m -> StrictTVar m ByteString
want :: StrictTVar m BL.ByteString }


-- | Process the messages from the mini protocols - there is a single
-- shared FIFO that contains the items of work. This is processed so
-- that each active demand gets a `maxSDU`s work of data processed
-- each time it gets to the front of the queue
muxer
    :: ( MonadAsync m
       , MonadFork m
       , MonadMask m
       , MonadThrow (STM m)
       , MonadTimer m
       )
    => EgressQueue m
    -> Bearer m
    -> m void
muxer :: forall (m :: * -> *) void.
(MonadAsync m, MonadFork m, MonadMask m, MonadThrow (STM m),
 MonadTimer m) =>
EgressQueue m -> Bearer m -> m void
muxer EgressQueue m
egressQueue Bearer m
bearer =
    (TimeoutFn m -> m void) -> m void
forall (m :: * -> *) b.
(MonadAsync m, MonadFork m, MonadMonotonicTime m, MonadTimer m,
 MonadMask m, MonadThrow (STM m)) =>
(TimeoutFn m -> m b) -> m b
withTimeoutSerial ((TimeoutFn m -> m void) -> m void)
-> (TimeoutFn m -> m void) -> m void
forall a b. (a -> b) -> a -> b
$ \TimeoutFn m
timeout ->
    m () -> m void
forall (f :: * -> *) a b. Applicative f => f a -> f b
forever (m () -> m void) -> m () -> m void
forall a b. (a -> b) -> a -> b
$ do
      TLSRDemand mpc md d <- STM m (TranslocationServiceRequest m)
-> m (TranslocationServiceRequest m)
forall a. HasCallStack => STM m a -> m a
forall (m :: * -> *) a.
(MonadSTM m, HasCallStack) =>
STM m a -> m a
atomically (STM m (TranslocationServiceRequest m)
 -> m (TranslocationServiceRequest m))
-> STM m (TranslocationServiceRequest m)
-> m (TranslocationServiceRequest m)
forall a b. (a -> b) -> a -> b
$ EgressQueue m -> STM m (TranslocationServiceRequest m)
forall (m :: * -> *) a. MonadSTM m => StrictTBQueue m a -> STM m a
readTBQueue EgressQueue m
egressQueue
      processSingleWanton egressQueue bearer timeout mpc md d

-- | Pull a `maxSDU`s worth of data out out the `Wanton` - if there is
-- data remaining requeue the `TranslocationServiceRequest` (this
-- ensures that any other items on the queue will get some service
-- first.
processSingleWanton :: MonadSTM m
                    => EgressQueue m
                    -> Bearer m
                    -> TimeoutFn m
                    -> MiniProtocolNum
                    -> MiniProtocolDir
                    -> Wanton m
                    -> m ()
processSingleWanton :: forall (m :: * -> *).
MonadSTM m =>
EgressQueue m
-> Bearer m
-> TimeoutFn m
-> MiniProtocolNum
-> MiniProtocolDir
-> Wanton m
-> m ()
processSingleWanton EgressQueue m
egressQueue Bearer { TimeoutFn m -> SDU -> m Time
write :: TimeoutFn m -> SDU -> m Time
write :: forall (m :: * -> *). Bearer m -> TimeoutFn m -> SDU -> m Time
write, SDUSize
sduSize :: SDUSize
sduSize :: forall (m :: * -> *). Bearer m -> SDUSize
sduSize }
                    TimeoutFn m
timeout MiniProtocolNum
mpc MiniProtocolDir
md Wanton m
wanton = do
    blob <- STM m ByteString -> m ByteString
forall a. HasCallStack => STM m a -> m a
forall (m :: * -> *) a.
(MonadSTM m, HasCallStack) =>
STM m a -> m a
atomically (STM m ByteString -> m ByteString)
-> STM m ByteString -> m ByteString
forall a b. (a -> b) -> a -> b
$ do
      -- extract next SDU
      d <- StrictTVar m ByteString -> STM m ByteString
forall (m :: * -> *) a. MonadSTM m => StrictTVar m a -> STM m a
readTVar (Wanton m -> StrictTVar m ByteString
forall (m :: * -> *). Wanton m -> StrictTVar m ByteString
want Wanton m
wanton)
      let (frag, rest) = BL.splitAt (fromIntegral (getSDUSize sduSize)) d
      -- if more to process then enqueue remaining work
      if BL.null rest
        then writeTVar (want wanton) BL.empty
        else do
          -- Note that to preserve bytestream ordering within a given
          -- miniprotocol the readTVar and writeTVar operations
          -- must be inside the same STM transaction.
          writeTVar (want wanton) rest
          writeTBQueue egressQueue (TLSRDemand mpc md wanton)
      -- return data to send
      pure frag
    let sdu = SDU {
                msHeader :: SDUHeader
msHeader = SDUHeader {
                    mhTimestamp :: RemoteClockModel
mhTimestamp = (Word32 -> RemoteClockModel
RemoteClockModel Word32
0),
                    mhNum :: MiniProtocolNum
mhNum       = MiniProtocolNum
mpc,
                    mhDir :: MiniProtocolDir
mhDir       = MiniProtocolDir
md,
                    mhLength :: Word16
mhLength    = Int64 -> Word16
forall a b. (Integral a, Num b) => a -> b
fromIntegral (Int64 -> Word16) -> Int64 -> Word16
forall a b. (a -> b) -> a -> b
$ ByteString -> Int64
BL.length ByteString
blob
                  },
                msBlob :: ByteString
msBlob = ByteString
blob
              }
    void $ write timeout sdu
    --paceTransmission tNow