{-# LANGUAGE FlexibleContexts #-} {-# LANGUAGE NamedFieldPuns #-} {-# LANGUAGE RankNTypes #-} {-# LANGUAGE ScopedTypeVariables #-} module Network.Mux.Bearer.Queues ( QueueChannel (..) , queueChannelAsMuxBearer ) where import Data.ByteString.Lazy qualified as BL import Control.Concurrent.Class.MonadSTM.Strict import Control.Monad.Class.MonadThrow import Control.Monad.Class.MonadTime.SI import Control.Tracer import Network.Mux.Codec qualified as Mx import Network.Mux.Time as Mx import Network.Mux.Timeout qualified as Mx import Network.Mux.Trace qualified as Mx import Network.Mux.Types (MuxBearer) import Network.Mux.Types qualified as Mx data QueueChannel m = QueueChannel { forall (m :: * -> *). QueueChannel m -> StrictTBQueue m ByteString readQueue :: StrictTBQueue m BL.ByteString, forall (m :: * -> *). QueueChannel m -> StrictTBQueue m ByteString writeQueue :: StrictTBQueue m BL.ByteString } queueChannelAsMuxBearer :: forall m. ( MonadSTM m , MonadMonotonicTime m , MonadThrow m ) => Mx.SDUSize -> Tracer m Mx.MuxTrace -> QueueChannel m -> MuxBearer m queueChannelAsMuxBearer :: forall (m :: * -> *). (MonadSTM m, MonadMonotonicTime m, MonadThrow m) => SDUSize -> Tracer m MuxTrace -> QueueChannel m -> MuxBearer m queueChannelAsMuxBearer SDUSize sduSize Tracer m MuxTrace tracer QueueChannel { StrictTBQueue m ByteString writeQueue :: forall (m :: * -> *). QueueChannel m -> StrictTBQueue m ByteString writeQueue :: StrictTBQueue m ByteString writeQueue, StrictTBQueue m ByteString readQueue :: forall (m :: * -> *). QueueChannel m -> StrictTBQueue m ByteString readQueue :: StrictTBQueue m ByteString readQueue } = do Mx.MuxBearer { read :: TimeoutFn m -> m (MuxSDU, Time) Mx.read = TimeoutFn m -> m (MuxSDU, Time) readMux, write :: TimeoutFn m -> MuxSDU -> m Time Mx.write = TimeoutFn m -> MuxSDU -> m Time writeMux, sduSize :: SDUSize Mx.sduSize = SDUSize sduSize, name :: String Mx.name = String "queue-channel" } where readMux :: Mx.TimeoutFn m -> m (Mx.MuxSDU, Time) readMux :: TimeoutFn m -> m (MuxSDU, Time) readMux TimeoutFn m _ = do Tracer m MuxTrace -> MuxTrace -> m () forall (m :: * -> *) a. Tracer m a -> a -> m () traceWith Tracer m MuxTrace tracer MuxTrace Mx.MuxTraceRecvHeaderStart buf <- STM m ByteString -> m ByteString forall a. HasCallStack => STM m a -> m a forall (m :: * -> *) a. (MonadSTM m, HasCallStack) => STM m a -> m a atomically (STM m ByteString -> m ByteString) -> STM m ByteString -> m ByteString forall a b. (a -> b) -> a -> b $ StrictTBQueue m ByteString -> STM m ByteString forall (m :: * -> *) a. MonadSTM m => StrictTBQueue m a -> STM m a readTBQueue StrictTBQueue m ByteString readQueue let (hbuf, payload) = BL.splitAt 8 buf case Mx.decodeMuxSDU hbuf of Left MuxError e -> MuxError -> m (MuxSDU, Time) forall e a. Exception e => e -> m a forall (m :: * -> *) e a. (MonadThrow m, Exception e) => e -> m a throwIO MuxError e Right MuxSDU header -> do Tracer m MuxTrace -> MuxTrace -> m () forall (m :: * -> *) a. Tracer m a -> a -> m () traceWith Tracer m MuxTrace tracer (MuxTrace -> m ()) -> MuxTrace -> m () forall a b. (a -> b) -> a -> b $ MuxSDUHeader -> MuxTrace Mx.MuxTraceRecvHeaderEnd (MuxSDU -> MuxSDUHeader Mx.msHeader MuxSDU header) ts <- m Time forall (m :: * -> *). MonadMonotonicTime m => m Time getMonotonicTime traceWith tracer $ Mx.MuxTraceRecvDeltaQObservation (Mx.msHeader header) ts return (header {Mx.msBlob = payload}, ts) writeMux :: Mx.TimeoutFn m -> Mx.MuxSDU -> m Time writeMux :: TimeoutFn m -> MuxSDU -> m Time writeMux TimeoutFn m _ MuxSDU sdu = do ts <- m Time forall (m :: * -> *). MonadMonotonicTime m => m Time getMonotonicTime let ts32 = Time -> Word32 Mx.timestampMicrosecondsLow32Bits Time ts sdu' = MuxSDU -> RemoteClockModel -> MuxSDU Mx.setTimestamp MuxSDU sdu (Word32 -> RemoteClockModel Mx.RemoteClockModel Word32 ts32) buf = MuxSDU -> ByteString Mx.encodeMuxSDU MuxSDU sdu' traceWith tracer $ Mx.MuxTraceSendStart (Mx.msHeader sdu') atomically $ writeTBQueue writeQueue buf traceWith tracer Mx.MuxTraceSendEnd return ts