{-# LANGUAGE FlexibleContexts #-} {-# LANGUAGE NamedFieldPuns #-} {-# LANGUAGE RankNTypes #-} {-# LANGUAGE ScopedTypeVariables #-} module Network.Mux.Bearer.Queues ( QueueChannel (..) , queueChannelAsBearer ) where import Data.ByteString.Lazy qualified as BL import Control.Concurrent.Class.MonadSTM.Strict import Control.Monad.Class.MonadThrow import Control.Monad.Class.MonadTime.SI import Control.Tracer import Network.Mux.Codec qualified as Mx import Network.Mux.Time as Mx import Network.Mux.Timeout qualified as Mx import Network.Mux.Trace qualified as Mx import Network.Mux.Types (Bearer) import Network.Mux.Types qualified as Mx data QueueChannel m = QueueChannel { forall (m :: * -> *). QueueChannel m -> StrictTBQueue m ByteString readQueue :: StrictTBQueue m BL.ByteString, forall (m :: * -> *). QueueChannel m -> StrictTBQueue m ByteString writeQueue :: StrictTBQueue m BL.ByteString } queueChannelAsBearer :: forall m. ( MonadSTM m , MonadMonotonicTime m , MonadThrow m ) => Mx.SDUSize -> Tracer m Mx.Trace -> QueueChannel m -> Bearer m queueChannelAsBearer :: forall (m :: * -> *). (MonadSTM m, MonadMonotonicTime m, MonadThrow m) => SDUSize -> Tracer m Trace -> QueueChannel m -> Bearer m queueChannelAsBearer SDUSize sduSize Tracer m Trace tracer QueueChannel { StrictTBQueue m ByteString writeQueue :: forall (m :: * -> *). QueueChannel m -> StrictTBQueue m ByteString writeQueue :: StrictTBQueue m ByteString writeQueue, StrictTBQueue m ByteString readQueue :: forall (m :: * -> *). QueueChannel m -> StrictTBQueue m ByteString readQueue :: StrictTBQueue m ByteString readQueue } = do Mx.Bearer { read :: TimeoutFn m -> m (SDU, Time) Mx.read = TimeoutFn m -> m (SDU, Time) readMux, write :: TimeoutFn m -> SDU -> m Time Mx.write = TimeoutFn m -> SDU -> m Time writeMux, sduSize :: SDUSize Mx.sduSize = SDUSize sduSize, name :: String Mx.name = String "queue-channel" } where readMux :: Mx.TimeoutFn m -> m (Mx.SDU, Time) readMux :: TimeoutFn m -> m (SDU, Time) readMux TimeoutFn m _ = do Tracer m Trace -> Trace -> m () forall (m :: * -> *) a. Tracer m a -> a -> m () traceWith Tracer m Trace tracer Trace Mx.TraceRecvHeaderStart buf <- STM m ByteString -> m ByteString forall a. HasCallStack => STM m a -> m a forall (m :: * -> *) a. (MonadSTM m, HasCallStack) => STM m a -> m a atomically (STM m ByteString -> m ByteString) -> STM m ByteString -> m ByteString forall a b. (a -> b) -> a -> b $ StrictTBQueue m ByteString -> STM m ByteString forall (m :: * -> *) a. MonadSTM m => StrictTBQueue m a -> STM m a readTBQueue StrictTBQueue m ByteString readQueue let (hbuf, payload) = BL.splitAt 8 buf case Mx.decodeSDU hbuf of Left Error e -> Error -> m (SDU, Time) forall e a. Exception e => e -> m a forall (m :: * -> *) e a. (MonadThrow m, Exception e) => e -> m a throwIO Error e Right SDU header -> do Tracer m Trace -> Trace -> m () forall (m :: * -> *) a. Tracer m a -> a -> m () traceWith Tracer m Trace tracer (Trace -> m ()) -> Trace -> m () forall a b. (a -> b) -> a -> b $ SDUHeader -> Trace Mx.TraceRecvHeaderEnd (SDU -> SDUHeader Mx.msHeader SDU header) ts <- m Time forall (m :: * -> *). MonadMonotonicTime m => m Time getMonotonicTime traceWith tracer $ Mx.TraceRecvDeltaQObservation (Mx.msHeader header) ts return (header {Mx.msBlob = payload}, ts) writeMux :: Mx.TimeoutFn m -> Mx.SDU -> m Time writeMux :: TimeoutFn m -> SDU -> m Time writeMux TimeoutFn m _ SDU sdu = do ts <- m Time forall (m :: * -> *). MonadMonotonicTime m => m Time getMonotonicTime let ts32 = Time -> Word32 Mx.timestampMicrosecondsLow32Bits Time ts sdu' = SDU -> RemoteClockModel -> SDU Mx.setTimestamp SDU sdu (Word32 -> RemoteClockModel Mx.RemoteClockModel Word32 ts32) buf = SDU -> ByteString Mx.encodeSDU SDU sdu' traceWith tracer $ Mx.TraceSendStart (Mx.msHeader sdu') atomically $ writeTBQueue writeQueue buf traceWith tracer Mx.TraceSendEnd return ts