{-# LANGUAGE FlexibleContexts    #-}
{-# LANGUAGE NamedFieldPuns      #-}
{-# LANGUAGE RankNTypes          #-}
{-# LANGUAGE ScopedTypeVariables #-}

module Network.Mux.Bearer.Queues
  ( QueueChannel (..)
  , queueChannelAsBearer
  ) where

import Data.ByteString.Lazy qualified as BL

import Control.Concurrent.Class.MonadSTM.Strict
import Control.Monad.Class.MonadThrow
import Control.Monad.Class.MonadTime.SI
import Control.Tracer

import Network.Mux.Codec qualified as Mx
import Network.Mux.Time as Mx
import Network.Mux.Timeout qualified as Mx
import Network.Mux.Trace qualified as Mx
import Network.Mux.Types (Bearer)
import Network.Mux.Types qualified as Mx

data QueueChannel m = QueueChannel {
    forall (m :: * -> *). QueueChannel m -> StrictTBQueue m ByteString
readQueue  :: StrictTBQueue m BL.ByteString,
    forall (m :: * -> *). QueueChannel m -> StrictTBQueue m ByteString
writeQueue :: StrictTBQueue m BL.ByteString
  }


queueChannelAsBearer
  :: forall m.
     ( MonadSTM   m
     , MonadMonotonicTime m
     , MonadThrow m
     )
  => Mx.SDUSize
  -> Tracer m Mx.Trace
  -> QueueChannel m
  -> Bearer m
queueChannelAsBearer :: forall (m :: * -> *).
(MonadSTM m, MonadMonotonicTime m, MonadThrow m) =>
SDUSize -> Tracer m Trace -> QueueChannel m -> Bearer m
queueChannelAsBearer SDUSize
sduSize Tracer m Trace
tracer QueueChannel { StrictTBQueue m ByteString
writeQueue :: forall (m :: * -> *). QueueChannel m -> StrictTBQueue m ByteString
writeQueue :: StrictTBQueue m ByteString
writeQueue, StrictTBQueue m ByteString
readQueue :: forall (m :: * -> *). QueueChannel m -> StrictTBQueue m ByteString
readQueue :: StrictTBQueue m ByteString
readQueue } = do
      Mx.Bearer {
        read :: TimeoutFn m -> m (SDU, Time)
Mx.read    = TimeoutFn m -> m (SDU, Time)
readMux,
        write :: TimeoutFn m -> SDU -> m Time
Mx.write   = TimeoutFn m -> SDU -> m Time
writeMux,
        sduSize :: SDUSize
Mx.sduSize = SDUSize
sduSize,
        name :: String
Mx.name    = String
"queue-channel"
      }
    where
      readMux :: Mx.TimeoutFn m -> m (Mx.SDU, Time)
      readMux :: TimeoutFn m -> m (SDU, Time)
readMux TimeoutFn m
_ = do
          Tracer m Trace -> Trace -> m ()
forall (m :: * -> *) a. Tracer m a -> a -> m ()
traceWith Tracer m Trace
tracer Trace
Mx.TraceRecvHeaderStart
          buf <- STM m ByteString -> m ByteString
forall a. HasCallStack => STM m a -> m a
forall (m :: * -> *) a.
(MonadSTM m, HasCallStack) =>
STM m a -> m a
atomically (STM m ByteString -> m ByteString)
-> STM m ByteString -> m ByteString
forall a b. (a -> b) -> a -> b
$ StrictTBQueue m ByteString -> STM m ByteString
forall (m :: * -> *) a. MonadSTM m => StrictTBQueue m a -> STM m a
readTBQueue StrictTBQueue m ByteString
readQueue
          let (hbuf, payload) = BL.splitAt 8 buf
          case Mx.decodeSDU hbuf of
              Left  Error
e      -> Error -> m (SDU, Time)
forall e a. Exception e => e -> m a
forall (m :: * -> *) e a. (MonadThrow m, Exception e) => e -> m a
throwIO Error
e
              Right SDU
header -> do
                  Tracer m Trace -> Trace -> m ()
forall (m :: * -> *) a. Tracer m a -> a -> m ()
traceWith Tracer m Trace
tracer (Trace -> m ()) -> Trace -> m ()
forall a b. (a -> b) -> a -> b
$ SDUHeader -> Trace
Mx.TraceRecvHeaderEnd (SDU -> SDUHeader
Mx.msHeader SDU
header)
                  ts <- m Time
forall (m :: * -> *). MonadMonotonicTime m => m Time
getMonotonicTime
                  traceWith tracer $ Mx.TraceRecvDeltaQObservation (Mx.msHeader header) ts
                  return (header {Mx.msBlob = payload}, ts)

      writeMux :: Mx.TimeoutFn m -> Mx.SDU -> m Time
      writeMux :: TimeoutFn m -> SDU -> m Time
writeMux TimeoutFn m
_ SDU
sdu = do
          ts <- m Time
forall (m :: * -> *). MonadMonotonicTime m => m Time
getMonotonicTime
          let ts32 = Time -> Word32
Mx.timestampMicrosecondsLow32Bits Time
ts
              sdu' = SDU -> RemoteClockModel -> SDU
Mx.setTimestamp SDU
sdu (Word32 -> RemoteClockModel
Mx.RemoteClockModel Word32
ts32)
              buf  = SDU -> ByteString
Mx.encodeSDU SDU
sdu'
          traceWith tracer $ Mx.TraceSendStart (Mx.msHeader sdu')
          atomically $ writeTBQueue writeQueue buf
          traceWith tracer Mx.TraceSendEnd
          return ts