{-# LANGUAGE BangPatterns #-}
{-# LANGUAGE FlexibleContexts #-}
{-# LANGUAGE MultiParamTypeClasses #-}
{-# LANGUAGE NamedFieldPuns #-}
{-# LANGUAGE RankNTypes #-}
{-# LANGUAGE TypeFamilies #-}
module Network.Mux.Egress
( muxer
, EgressQueue
, TranslocationServiceRequest (..)
, Wanton (..)
) where
import Control.Monad
import Data.ByteString.Lazy qualified as BL
import Control.Concurrent.Class.MonadSTM.Strict
import Control.Monad.Class.MonadAsync
import Control.Monad.Class.MonadThrow
import Control.Monad.Class.MonadTime.SI
import Control.Monad.Class.MonadTimer.SI hiding (timeout)
import Network.Mux.Timeout
import Network.Mux.Types
type EgressQueue m = StrictTBQueue m (TranslocationServiceRequest m)
data TranslocationServiceRequest m =
TLSRDemand !MiniProtocolNum !MiniProtocolDir !(Wanton m)
newtype Wanton m = Wanton { forall (m :: * -> *). Wanton m -> StrictTVar m ByteString
want :: StrictTVar m BL.ByteString }
muxer
:: forall m void.
( MonadAsync m
, MonadDelay m
, MonadFork m
, MonadMask m
, MonadThrow (STM m)
, MonadTimer m
)
=> EgressQueue m
-> Bearer m
-> m void
muxer :: forall (m :: * -> *) void.
(MonadAsync m, MonadDelay m, MonadFork m, MonadMask m,
MonadThrow (STM m), MonadTimer m) =>
EgressQueue m -> Bearer m -> m void
muxer EgressQueue m
egressQueue Bearer { TimeoutFn m -> [SDU] -> m Time
writeMany :: TimeoutFn m -> [SDU] -> m Time
writeMany :: forall (m :: * -> *). Bearer m -> TimeoutFn m -> [SDU] -> m Time
writeMany, SDUSize
sduSize :: SDUSize
sduSize :: forall (m :: * -> *). Bearer m -> SDUSize
sduSize, Int
batchSize :: Int
batchSize :: forall (m :: * -> *). Bearer m -> Int
batchSize, DiffTime
egressInterval :: DiffTime
egressInterval :: forall (m :: * -> *). Bearer m -> DiffTime
egressInterval } =
(TimeoutFn m -> m void) -> m void
forall (m :: * -> *) b.
(MonadAsync m, MonadFork m, MonadMonotonicTime m, MonadTimer m,
MonadMask m, MonadThrow (STM m)) =>
(TimeoutFn m -> m b) -> m b
withTimeoutSerial ((TimeoutFn m -> m void) -> m void)
-> (TimeoutFn m -> m void) -> m void
forall a b. (a -> b) -> a -> b
$ \TimeoutFn m
timeout ->
m () -> m void
forall (f :: * -> *) a b. Applicative f => f a -> f b
forever (m () -> m void) -> m () -> m void
forall a b. (a -> b) -> a -> b
$ do
start <- m Time
forall (m :: * -> *). MonadMonotonicTime m => m Time
getMonotonicTime
TLSRDemand mpc md d <- atomically $ readTBQueue egressQueue
sdu <- processSingleWanton egressQueue sduSize mpc md d
sdus <- buildBatch [sdu] (sduLength sdu)
void $ writeMany timeout sdus
end <- getMonotonicTime
empty <- atomically $ isEmptyTBQueue egressQueue
when (empty) $ do
let delta = Time -> Time -> DiffTime
diffTime Time
end Time
start
threadDelay (egressInterval - delta)
where
maxSDUsPerBatch :: Int
maxSDUsPerBatch :: Int
maxSDUsPerBatch = Int
100
sduLength :: SDU -> Int
sduLength :: SDU -> Int
sduLength SDU
sdu = Int64 -> Int
forall a b. (Integral a, Num b) => a -> b
fromIntegral Int64
msHeaderLength Int -> Int -> Int
forall a. Num a => a -> a -> a
+ Word16 -> Int
forall a b. (Integral a, Num b) => a -> b
fromIntegral (SDU -> Word16
msLength SDU
sdu)
buildBatch :: [SDU] -> Int -> m [SDU]
buildBatch [SDU]
s Int
sl = [SDU] -> [SDU]
forall a. [a] -> [a]
reverse ([SDU] -> [SDU]) -> m [SDU] -> m [SDU]
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> [SDU] -> Int -> m [SDU]
go [SDU]
s Int
sl
where
go :: [SDU] -> Int -> m [SDU]
go [SDU]
sdus Int
_ | [SDU] -> Int
forall a. [a] -> Int
forall (t :: * -> *) a. Foldable t => t a -> Int
length [SDU]
sdus Int -> Int -> Bool
forall a. Ord a => a -> a -> Bool
>= Int
maxSDUsPerBatch = [SDU] -> m [SDU]
forall a. a -> m a
forall (m :: * -> *) a. Monad m => a -> m a
return [SDU]
sdus
go [SDU]
sdus Int
sdusLength | Int
sdusLength Int -> Int -> Bool
forall a. Ord a => a -> a -> Bool
>= Int
batchSize = [SDU] -> m [SDU]
forall a. a -> m a
forall (m :: * -> *) a. Monad m => a -> m a
return [SDU]
sdus
go [SDU]
sdus !Int
sdusLength = do
demand_m <- STM m (Maybe (TranslocationServiceRequest m))
-> m (Maybe (TranslocationServiceRequest m))
forall a. HasCallStack => STM m a -> m a
forall (m :: * -> *) a.
(MonadSTM m, HasCallStack) =>
STM m a -> m a
atomically (STM m (Maybe (TranslocationServiceRequest m))
-> m (Maybe (TranslocationServiceRequest m)))
-> STM m (Maybe (TranslocationServiceRequest m))
-> m (Maybe (TranslocationServiceRequest m))
forall a b. (a -> b) -> a -> b
$ EgressQueue m -> STM m (Maybe (TranslocationServiceRequest m))
forall (m :: * -> *) a.
MonadSTM m =>
StrictTBQueue m a -> STM m (Maybe a)
tryReadTBQueue EgressQueue m
egressQueue
case demand_m of
Just (TLSRDemand MiniProtocolNum
mpc MiniProtocolDir
md Wanton m
d) -> do
sdu <- EgressQueue m
-> SDUSize
-> MiniProtocolNum
-> MiniProtocolDir
-> Wanton m
-> m SDU
forall (m :: * -> *).
MonadSTM m =>
EgressQueue m
-> SDUSize
-> MiniProtocolNum
-> MiniProtocolDir
-> Wanton m
-> m SDU
processSingleWanton EgressQueue m
egressQueue SDUSize
sduSize MiniProtocolNum
mpc MiniProtocolDir
md Wanton m
d
go (sdu:sdus) (sdusLength + sduLength sdu)
Maybe (TranslocationServiceRequest m)
Nothing -> [SDU] -> m [SDU]
forall a. a -> m a
forall (m :: * -> *) a. Monad m => a -> m a
return [SDU]
sdus
processSingleWanton :: MonadSTM m
=> EgressQueue m
-> SDUSize
-> MiniProtocolNum
-> MiniProtocolDir
-> Wanton m
-> m SDU
processSingleWanton :: forall (m :: * -> *).
MonadSTM m =>
EgressQueue m
-> SDUSize
-> MiniProtocolNum
-> MiniProtocolDir
-> Wanton m
-> m SDU
processSingleWanton EgressQueue m
egressQueue (SDUSize Word16
sduSize)
MiniProtocolNum
mpc MiniProtocolDir
md Wanton m
wanton = do
blob <- STM m ByteString -> m ByteString
forall a. HasCallStack => STM m a -> m a
forall (m :: * -> *) a.
(MonadSTM m, HasCallStack) =>
STM m a -> m a
atomically (STM m ByteString -> m ByteString)
-> STM m ByteString -> m ByteString
forall a b. (a -> b) -> a -> b
$ do
d <- StrictTVar m ByteString -> STM m ByteString
forall (m :: * -> *) a. MonadSTM m => StrictTVar m a -> STM m a
readTVar (Wanton m -> StrictTVar m ByteString
forall (m :: * -> *). Wanton m -> StrictTVar m ByteString
want Wanton m
wanton)
let (frag, rest) = BL.splitAt (fromIntegral sduSize) d
if BL.null rest
then writeTVar (want wanton) BL.empty
else do
writeTVar (want wanton) rest
writeTBQueue egressQueue (TLSRDemand mpc md wanton)
pure frag
let sdu = SDU {
msHeader :: SDUHeader
msHeader = SDUHeader {
mhTimestamp :: RemoteClockModel
mhTimestamp = (Word32 -> RemoteClockModel
RemoteClockModel Word32
0),
mhNum :: MiniProtocolNum
mhNum = MiniProtocolNum
mpc,
mhDir :: MiniProtocolDir
mhDir = MiniProtocolDir
md,
mhLength :: Word16
mhLength = Int64 -> Word16
forall a b. (Integral a, Num b) => a -> b
fromIntegral (Int64 -> Word16) -> Int64 -> Word16
forall a b. (a -> b) -> a -> b
$ ByteString -> Int64
BL.length ByteString
blob
},
msBlob :: ByteString
msBlob = ByteString
blob
}
return sdu