{-# LANGUAGE BangPatterns          #-}
{-# LANGUAGE FlexibleContexts      #-}
{-# LANGUAGE MultiParamTypeClasses #-}
{-# LANGUAGE NamedFieldPuns        #-}
{-# LANGUAGE RankNTypes            #-}
{-# LANGUAGE TypeFamilies          #-}

module Network.Mux.Egress
  ( muxer
    -- $egress
    -- $servicingsSemantics
  , EgressQueue
  , TranslocationServiceRequest (..)
  , Wanton (..)
  ) where

import Control.Monad
import Data.ByteString.Lazy qualified as BL

import Control.Concurrent.Class.MonadSTM.Strict
import Control.Monad.Class.MonadAsync
import Control.Monad.Class.MonadThrow
import Control.Monad.Class.MonadTime.SI
import Control.Monad.Class.MonadTimer.SI hiding (timeout)

import Network.Mux.Timeout
import Network.Mux.Types

-- $servicingsSemantics
-- = Desired Servicing Semantics
--
--  == /Constructing Fairness/
--
--   In this context we are defining fairness as:
--    - no starvation
--    - when presented with equal demand (from a selection of mini
--      protocols) deliver "equal" service.
--
--   Equality here might be in terms of equal service rate of
--   requests (or segmented requests) and/or in terms of effective
--   (SDU) data rates.
--
--
--  Notes:
--
--   1) It is assumed that (for a given peer) that bulk delivery of
--      blocks (i.e. in recovery mode) and normal, interactive,
--      operation (e.g. chain following) are mutually exclusive. As
--      such there is no requirement to create a notion of
--      prioritisation between such traffic.
--
--   2) We are assuming that the underlying TCP/IP bearer is managed
--      so that individual Mux-layer PDUs are paced. a) this is necessary
--      to mitigate head-of-line blocking effects (i.e. arbitrary
--      amounts of data accruing in the O/S kernel); b) ensuring that
--      any host egress data rate limits can be respected / enforced.
--
--  == /Current Caveats/
--
--  1) Not considering how mini-protocol associations are constructed
--     (depending on deployment model this might be resolved within
--     the instantiation of the peer relationship)
--
--  2) Not yet considered notion of orderly termination - this not
--     likely to be used in an operational context, but may be needed
--     for test harness use.
--
--  == /Principle of Operation/
--
--
--  Egress direction (mini protocol instance to remote peer)
--
--  The request for service (the demand) from a mini protocol is
--  encapsulated in a `Wanton`, such `Wanton`s are placed in a (finite)
--  queue (e.g TBMQ) of `TranslocationServiceRequest`s.
--

-- $egress
-- = Egress Path
--
-- > ┌───────────┐ ┌───────────┐ ┌───────────┐ ┌───────────┐ Every mode per miniprotocol
-- > │ muxDuplex │ │ muxDuplex │ │ muxDuplex │ │ muxDuplex │ has a dedicated thread which
-- > │ Initiator │ │ Responder │ │ Initiator │ │ Responder │ will send ByteStrings of CBOR
-- > │ ChainSync │ │ ChainSync │ │ BlockFetch│ │ BlockFetch│ encoded data.
-- > └─────┬─────┘ └─────┬─────┘ └─────┬─────┘ └─────┬─────┘
-- >       │             │             │             │
-- >       │             │             │             │
-- >       ╰─────────────┴──────┬──────┴─────────────╯
-- >                            │
-- >                     application data
-- >                            │
-- >                         ░░░▼░░
-- >                         ░│  │░ For a given Mux Bearer there is a single egress
-- >                         ░│ci│░ queue shared among all miniprotocols. To ensure
-- >                         ░│cr│░ fairness each miniprotocol can at most have one
-- >                         ░└──┘░ message in the queue, see Desired Servicing
-- >                         ░░░│░░ Semantics.
-- >                           ░│░
-- >                       ░░░░░▼░░░
-- >                       ░┌─────┐░ The egress queue is served by a dedicated thread
-- >                       ░│ mux │░ which chops up the CBOR data into MuxSDUs with at
-- >                       ░└─────┘░ most sduSize bytes of data in them.
-- >                       ░░░░│░░░░
-- >                          ░│░ MuxSDUs
-- >                          ░│░
-- >                  ░░░░░░░░░▼░░░░░░░░░░
-- >                  ░┌────────────────┐░
-- >                  ░│ Bearer.write() │░ Mux Bearer implementation specific write
-- >                  ░└────────────────┘░
-- >                  ░░░░░░░░░│░░░░░░░░░░
-- >                           │ ByteStrings
-- >                           ▼
-- >                           ●

type EgressQueue m = StrictTBQueue m (TranslocationServiceRequest m)

-- | A TranslocationServiceRequest is a demand for the translocation
--  of a single mini-protocol message. This message can be of
--  arbitrary (yet bounded) size. This multiplexing layer is
--  responsible for the segmentation of concrete representation into
--  appropriate SDU's for onward transmission.
data TranslocationServiceRequest m =
     TLSRDemand !MiniProtocolNum !MiniProtocolDir !(Wanton m)

-- | A Wanton represent the concrete data to be translocated, note that the
--  TVar becoming empty indicates -- that the last fragment of the data has
--  been enqueued on the -- underlying bearer.
newtype Wanton m = Wanton { forall (m :: * -> *). Wanton m -> StrictTVar m ByteString
want :: StrictTVar m BL.ByteString }


-- | Process the messages from the mini protocols - there is a single
-- shared FIFO that contains the items of work. This is processed so
-- that each active demand gets a `maxSDU`s work of data processed
-- each time it gets to the front of the queue
muxer
    :: forall m void.
       ( MonadAsync m
       , MonadDelay m
       , MonadFork m
       , MonadMask m
       , MonadThrow (STM m)
       , MonadTimer m
       )
    => EgressQueue m
    -> Bearer m
    -> m void
muxer :: forall (m :: * -> *) void.
(MonadAsync m, MonadDelay m, MonadFork m, MonadMask m,
 MonadThrow (STM m), MonadTimer m) =>
EgressQueue m -> Bearer m -> m void
muxer EgressQueue m
egressQueue Bearer { TimeoutFn m -> [SDU] -> m Time
writeMany :: TimeoutFn m -> [SDU] -> m Time
writeMany :: forall (m :: * -> *). Bearer m -> TimeoutFn m -> [SDU] -> m Time
writeMany, SDUSize
sduSize :: SDUSize
sduSize :: forall (m :: * -> *). Bearer m -> SDUSize
sduSize, Int
batchSize :: Int
batchSize :: forall (m :: * -> *). Bearer m -> Int
batchSize, DiffTime
egressInterval :: DiffTime
egressInterval :: forall (m :: * -> *). Bearer m -> DiffTime
egressInterval } =
    (TimeoutFn m -> m void) -> m void
forall (m :: * -> *) b.
(MonadAsync m, MonadFork m, MonadMonotonicTime m, MonadTimer m,
 MonadMask m, MonadThrow (STM m)) =>
(TimeoutFn m -> m b) -> m b
withTimeoutSerial ((TimeoutFn m -> m void) -> m void)
-> (TimeoutFn m -> m void) -> m void
forall a b. (a -> b) -> a -> b
$ \TimeoutFn m
timeout ->
    m () -> m void
forall (f :: * -> *) a b. Applicative f => f a -> f b
forever (m () -> m void) -> m () -> m void
forall a b. (a -> b) -> a -> b
$ do
      start <- m Time
forall (m :: * -> *). MonadMonotonicTime m => m Time
getMonotonicTime
      TLSRDemand mpc md d <- atomically $ readTBQueue egressQueue
      sdu <- processSingleWanton egressQueue sduSize mpc md d
      sdus <- buildBatch [sdu] (sduLength sdu)
      void $ writeMany timeout sdus
      end <- getMonotonicTime
      empty <- atomically $ isEmptyTBQueue egressQueue
      when (empty) $ do
        let delta = Time -> Time -> DiffTime
diffTime Time
end Time
start
        threadDelay (egressInterval - delta)

  where
    maxSDUsPerBatch :: Int
    maxSDUsPerBatch :: Int
maxSDUsPerBatch = Int
100

    sduLength :: SDU -> Int
    sduLength :: SDU -> Int
sduLength SDU
sdu = Int64 -> Int
forall a b. (Integral a, Num b) => a -> b
fromIntegral Int64
msHeaderLength Int -> Int -> Int
forall a. Num a => a -> a -> a
+ Word16 -> Int
forall a b. (Integral a, Num b) => a -> b
fromIntegral (SDU -> Word16
msLength SDU
sdu)

    -- Build a batch of SDUs to submit in one go to the bearer.
    -- The egress queue is still processed one SDU at the time
    -- to ensure that we don't cause starvation.
    -- The batch size is either limited by the bearer
    -- (e.g the SO_SNDBUF for Socket) or number of SDUs.
    --
    buildBatch :: [SDU] -> Int -> m [SDU]
buildBatch [SDU]
s Int
sl = [SDU] -> [SDU]
forall a. [a] -> [a]
reverse ([SDU] -> [SDU]) -> m [SDU] -> m [SDU]
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> [SDU] -> Int -> m [SDU]
go [SDU]
s Int
sl
     where
      go :: [SDU] -> Int -> m [SDU]
go [SDU]
sdus Int
_ | [SDU] -> Int
forall a. [a] -> Int
forall (t :: * -> *) a. Foldable t => t a -> Int
length [SDU]
sdus Int -> Int -> Bool
forall a. Ord a => a -> a -> Bool
>= Int
maxSDUsPerBatch   = [SDU] -> m [SDU]
forall a. a -> m a
forall (m :: * -> *) a. Monad m => a -> m a
return [SDU]
sdus
      go [SDU]
sdus Int
sdusLength | Int
sdusLength Int -> Int -> Bool
forall a. Ord a => a -> a -> Bool
>= Int
batchSize = [SDU] -> m [SDU]
forall a. a -> m a
forall (m :: * -> *) a. Monad m => a -> m a
return [SDU]
sdus
      go [SDU]
sdus !Int
sdusLength = do
        demand_m <- STM m (Maybe (TranslocationServiceRequest m))
-> m (Maybe (TranslocationServiceRequest m))
forall a. HasCallStack => STM m a -> m a
forall (m :: * -> *) a.
(MonadSTM m, HasCallStack) =>
STM m a -> m a
atomically (STM m (Maybe (TranslocationServiceRequest m))
 -> m (Maybe (TranslocationServiceRequest m)))
-> STM m (Maybe (TranslocationServiceRequest m))
-> m (Maybe (TranslocationServiceRequest m))
forall a b. (a -> b) -> a -> b
$ EgressQueue m -> STM m (Maybe (TranslocationServiceRequest m))
forall (m :: * -> *) a.
MonadSTM m =>
StrictTBQueue m a -> STM m (Maybe a)
tryReadTBQueue EgressQueue m
egressQueue
        case demand_m of
             Just (TLSRDemand MiniProtocolNum
mpc MiniProtocolDir
md Wanton m
d) -> do
               sdu <- EgressQueue m
-> SDUSize
-> MiniProtocolNum
-> MiniProtocolDir
-> Wanton m
-> m SDU
forall (m :: * -> *).
MonadSTM m =>
EgressQueue m
-> SDUSize
-> MiniProtocolNum
-> MiniProtocolDir
-> Wanton m
-> m SDU
processSingleWanton EgressQueue m
egressQueue SDUSize
sduSize MiniProtocolNum
mpc MiniProtocolDir
md Wanton m
d
               go (sdu:sdus) (sdusLength + sduLength sdu)
             Maybe (TranslocationServiceRequest m)
Nothing -> [SDU] -> m [SDU]
forall a. a -> m a
forall (m :: * -> *) a. Monad m => a -> m a
return [SDU]
sdus

-- | Pull a `maxSDU`s worth of data out out the `Wanton` - if there is
-- data remaining requeue the `TranslocationServiceRequest` (this
-- ensures that any other items on the queue will get some service
-- first.
processSingleWanton :: MonadSTM m
                    => EgressQueue m
                    -> SDUSize
                    -> MiniProtocolNum
                    -> MiniProtocolDir
                    -> Wanton m
                    -> m SDU
processSingleWanton :: forall (m :: * -> *).
MonadSTM m =>
EgressQueue m
-> SDUSize
-> MiniProtocolNum
-> MiniProtocolDir
-> Wanton m
-> m SDU
processSingleWanton EgressQueue m
egressQueue (SDUSize Word16
sduSize)
                    MiniProtocolNum
mpc MiniProtocolDir
md Wanton m
wanton = do
    blob <- STM m ByteString -> m ByteString
forall a. HasCallStack => STM m a -> m a
forall (m :: * -> *) a.
(MonadSTM m, HasCallStack) =>
STM m a -> m a
atomically (STM m ByteString -> m ByteString)
-> STM m ByteString -> m ByteString
forall a b. (a -> b) -> a -> b
$ do
      -- extract next SDU
      d <- StrictTVar m ByteString -> STM m ByteString
forall (m :: * -> *) a. MonadSTM m => StrictTVar m a -> STM m a
readTVar (Wanton m -> StrictTVar m ByteString
forall (m :: * -> *). Wanton m -> StrictTVar m ByteString
want Wanton m
wanton)
      let (frag, rest) = BL.splitAt (fromIntegral sduSize) d
      -- if more to process then enqueue remaining work
      if BL.null rest
        then writeTVar (want wanton) BL.empty
        else do
          -- Note that to preserve bytestream ordering within a given
          -- miniprotocol the readTVar and writeTVar operations
          -- must be inside the same STM transaction.
          writeTVar (want wanton) rest
          writeTBQueue egressQueue (TLSRDemand mpc md wanton)
      -- return data to send
      pure frag
    let sdu = SDU {
                msHeader :: SDUHeader
msHeader = SDUHeader {
                    mhTimestamp :: RemoteClockModel
mhTimestamp = (Word32 -> RemoteClockModel
RemoteClockModel Word32
0),
                    mhNum :: MiniProtocolNum
mhNum       = MiniProtocolNum
mpc,
                    mhDir :: MiniProtocolDir
mhDir       = MiniProtocolDir
md,
                    mhLength :: Word16
mhLength    = Int64 -> Word16
forall a b. (Integral a, Num b) => a -> b
fromIntegral (Int64 -> Word16) -> Int64 -> Word16
forall a b. (a -> b) -> a -> b
$ ByteString -> Int64
BL.length ByteString
blob
                  },
                msBlob :: ByteString
msBlob = ByteString
blob
              }
    return sdu
    --paceTransmission tNow