{-# LANGUAGE FlexibleContexts #-} {-# LANGUAGE NamedFieldPuns #-} {-# LANGUAGE RankNTypes #-} {-# LANGUAGE ScopedTypeVariables #-} module Network.Mux.Bearer.Queues ( QueueChannel (..) , queueChannelAsBearer ) where import Data.ByteString.Lazy qualified as BL import Control.Concurrent.Class.MonadSTM.Strict import Control.Monad.Class.MonadThrow import Control.Monad.Class.MonadTime.SI import Control.Tracer import Network.Mux.Codec qualified as Mx import Network.Mux.Time as Mx import Network.Mux.Timeout qualified as Mx import Network.Mux.Trace qualified as Mx import Network.Mux.Types (Bearer) import Network.Mux.Types qualified as Mx data QueueChannel m = QueueChannel { forall (m :: * -> *). QueueChannel m -> StrictTBQueue m ByteString readQueue :: StrictTBQueue m BL.ByteString, forall (m :: * -> *). QueueChannel m -> StrictTBQueue m ByteString writeQueue :: StrictTBQueue m BL.ByteString } queueChannelAsBearer :: forall m. ( MonadSTM m , MonadMonotonicTime m , MonadThrow m ) => Mx.SDUSize -> QueueChannel m -> Bearer m queueChannelAsBearer :: forall (m :: * -> *). (MonadSTM m, MonadMonotonicTime m, MonadThrow m) => SDUSize -> QueueChannel m -> Bearer m queueChannelAsBearer SDUSize sduSize QueueChannel { StrictTBQueue m ByteString writeQueue :: forall (m :: * -> *). QueueChannel m -> StrictTBQueue m ByteString writeQueue :: StrictTBQueue m ByteString writeQueue, StrictTBQueue m ByteString readQueue :: forall (m :: * -> *). QueueChannel m -> StrictTBQueue m ByteString readQueue :: StrictTBQueue m ByteString readQueue } = do Mx.Bearer { read :: Tracer m BearerTrace -> TimeoutFn m -> m (SDU, Time) Mx.read = Tracer m BearerTrace -> TimeoutFn m -> m (SDU, Time) readMux, write :: Tracer m BearerTrace -> TimeoutFn m -> SDU -> m Time Mx.write = Tracer m BearerTrace -> TimeoutFn m -> SDU -> m Time writeMux, writeMany :: Tracer m BearerTrace -> TimeoutFn m -> [SDU] -> m Time Mx.writeMany = Tracer m BearerTrace -> TimeoutFn m -> [SDU] -> m Time writeMuxMany, sduSize :: SDUSize Mx.sduSize = SDUSize sduSize, batchSize :: Int Mx.batchSize = Int 2 Int -> Int -> Int forall a. Num a => a -> a -> a * Word16 -> Int forall a b. (Integral a, Num b) => a -> b fromIntegral (SDUSize -> Word16 Mx.getSDUSize SDUSize sduSize), name :: String Mx.name = String "queue-channel", egressInterval :: DiffTime Mx.egressInterval = DiffTime 0 } where readMux :: Tracer m Mx.BearerTrace -> Mx.TimeoutFn m -> m (Mx.SDU, Time) readMux :: Tracer m BearerTrace -> TimeoutFn m -> m (SDU, Time) readMux Tracer m BearerTrace tracer TimeoutFn m _ = do Tracer m BearerTrace -> BearerTrace -> m () forall (m :: * -> *) a. Tracer m a -> a -> m () traceWith Tracer m BearerTrace tracer BearerTrace Mx.TraceRecvHeaderStart buf <- STM m ByteString -> m ByteString forall a. HasCallStack => STM m a -> m a forall (m :: * -> *) a. (MonadSTM m, HasCallStack) => STM m a -> m a atomically (STM m ByteString -> m ByteString) -> STM m ByteString -> m ByteString forall a b. (a -> b) -> a -> b $ StrictTBQueue m ByteString -> STM m ByteString forall (m :: * -> *) a. MonadSTM m => StrictTBQueue m a -> STM m a readTBQueue StrictTBQueue m ByteString readQueue let (hbuf, payload) = BL.splitAt 8 buf case Mx.decodeSDU hbuf of Left Error e -> Error -> m (SDU, Time) forall e a. Exception e => e -> m a forall (m :: * -> *) e a. (MonadThrow m, Exception e) => e -> m a throwIO Error e Right SDU header -> do Tracer m BearerTrace -> BearerTrace -> m () forall (m :: * -> *) a. Tracer m a -> a -> m () traceWith Tracer m BearerTrace tracer (BearerTrace -> m ()) -> BearerTrace -> m () forall a b. (a -> b) -> a -> b $ SDUHeader -> BearerTrace Mx.TraceRecvHeaderEnd (SDU -> SDUHeader Mx.msHeader SDU header) ts <- m Time forall (m :: * -> *). MonadMonotonicTime m => m Time getMonotonicTime traceWith tracer $ Mx.TraceRecvDeltaQObservation (Mx.msHeader header) ts return (header {Mx.msBlob = payload}, ts) writeMux :: Tracer m Mx.BearerTrace -> Mx.TimeoutFn m -> Mx.SDU -> m Time writeMux :: Tracer m BearerTrace -> TimeoutFn m -> SDU -> m Time writeMux Tracer m BearerTrace tracer TimeoutFn m _ SDU sdu = do ts <- m Time forall (m :: * -> *). MonadMonotonicTime m => m Time getMonotonicTime let ts32 = Time -> Word32 Mx.timestampMicrosecondsLow32Bits Time ts sdu' = SDU -> RemoteClockModel -> SDU Mx.setTimestamp SDU sdu (Word32 -> RemoteClockModel Mx.RemoteClockModel Word32 ts32) buf = SDU -> ByteString Mx.encodeSDU SDU sdu' traceWith tracer $ Mx.TraceSendStart (Mx.msHeader sdu') atomically $ writeTBQueue writeQueue buf traceWith tracer Mx.TraceSendEnd return ts writeMuxMany :: Tracer m Mx.BearerTrace -> Mx.TimeoutFn m -> [Mx.SDU] -> m Time writeMuxMany :: Tracer m BearerTrace -> TimeoutFn m -> [SDU] -> m Time writeMuxMany Tracer m BearerTrace tracer TimeoutFn m timeoutFn [SDU] sdus = do ts <- m Time forall (m :: * -> *). MonadMonotonicTime m => m Time getMonotonicTime mapM_ (writeMux tracer timeoutFn) sdus return ts