{-# LANGUAGE FlexibleContexts #-}
{-# LANGUAGE NamedFieldPuns #-}
{-# LANGUAGE RankNTypes #-}
{-# LANGUAGE ScopedTypeVariables #-}
module Network.Mux.Bearer.AttenuatedChannel
( AttenuatedChannel (..)
, Size
, SuccessOrFailure (..)
, Attenuation (..)
, QueueChannel
, newAttenuatedChannel
, echoQueueChannel
, newConnectedAttenuatedChannelPair
, attenuationChannelAsBearer
, AttenuatedChannelTrace (..)
, resourceVanishedIOError
) where
import Prelude hiding (read)
import Control.Concurrent.Class.MonadSTM.Strict
import Control.Monad (when)
import Control.Monad.Class.MonadThrow
import Control.Monad.Class.MonadTime.SI
import Control.Monad.Class.MonadTimer.SI
import Control.Tracer (Tracer, traceWith)
import GHC.IO.Exception
import Data.ByteString.Lazy qualified as BL
import Data.Int (Int64)
import Network.Mux.Codec
import Network.Mux.Time
import Network.Mux.Timeout
import Network.Mux.Trace
import Network.Mux.Types
data Message =
MsgClose
| MsgBytes BL.ByteString
deriving Message -> Message -> Bool
(Message -> Message -> Bool)
-> (Message -> Message -> Bool) -> Eq Message
forall a. (a -> a -> Bool) -> (a -> a -> Bool) -> Eq a
$c== :: Message -> Message -> Bool
== :: Message -> Message -> Bool
$c/= :: Message -> Message -> Bool
/= :: Message -> Message -> Bool
Eq
data QueueChannel m = QueueChannel {
forall (m :: * -> *).
QueueChannel m -> StrictTVar m (Maybe (StrictTQueue m Message))
qcRead :: StrictTVar m (Maybe (StrictTQueue m Message)),
forall (m :: * -> *).
QueueChannel m -> StrictTVar m (Maybe (StrictTQueue m Message))
qcWrite :: StrictTVar m (Maybe (StrictTQueue m Message))
}
echoQueueChannel :: MonadSTM m => STM m (QueueChannel m)
echoQueueChannel :: forall (m :: * -> *). MonadSTM m => STM m (QueueChannel m)
echoQueueChannel = do
q <- STM m (StrictTQueue m Message)
forall (m :: * -> *) a. MonadSTM m => STM m (StrictTQueue m a)
newTQueue
v <- newTVar (Just q)
return QueueChannel {
qcRead = v,
qcWrite = v
}
readQueueChannel :: ( MonadSTM m
, MonadThrow (STM m)
)
=> QueueChannel m -> m Message
readQueueChannel :: forall (m :: * -> *).
(MonadSTM m, MonadThrow (STM m)) =>
QueueChannel m -> m Message
readQueueChannel QueueChannel { StrictTVar m (Maybe (StrictTQueue m Message))
qcRead :: forall (m :: * -> *).
QueueChannel m -> StrictTVar m (Maybe (StrictTQueue m Message))
qcRead :: StrictTVar m (Maybe (StrictTQueue m Message))
qcRead } =
STM m Message -> m Message
forall a. HasCallStack => STM m a -> m a
forall (m :: * -> *) a.
(MonadSTM m, HasCallStack) =>
STM m a -> m a
atomically (STM m Message -> m Message) -> STM m Message -> m Message
forall a b. (a -> b) -> a -> b
$ do
a <- StrictTVar m (Maybe (StrictTQueue m Message))
-> STM m (Maybe (StrictTQueue m Message))
forall (m :: * -> *) a. MonadSTM m => StrictTVar m a -> STM m a
readTVar StrictTVar m (Maybe (StrictTQueue m Message))
qcRead STM m (Maybe (StrictTQueue m Message))
-> (Maybe (StrictTQueue m Message) -> STM m (Maybe Message))
-> STM m (Maybe Message)
forall a b. STM m a -> (a -> STM m b) -> STM m b
forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= (StrictTQueue m Message -> STM m Message)
-> Maybe (StrictTQueue m Message) -> STM m (Maybe Message)
forall (t :: * -> *) (f :: * -> *) a b.
(Traversable t, Applicative f) =>
(a -> f b) -> t a -> f (t b)
forall (f :: * -> *) a b.
Applicative f =>
(a -> f b) -> Maybe a -> f (Maybe b)
traverse StrictTQueue m Message -> STM m Message
forall (m :: * -> *) a. MonadSTM m => StrictTQueue m a -> STM m a
readTQueue
case a of
Maybe Message
Nothing -> IOError -> STM m Message
forall (m :: * -> *) e a.
(MonadSTM m, MonadThrow (STM m), Exception e) =>
e -> STM m a
throwSTM (String -> String -> IOError
resourceVanishedIOError
String
"AttenuatedChannel.readQueueChannel"
String
"channel vanished")
Just msg :: Message
msg@Message
MsgClose -> StrictTVar m (Maybe (StrictTQueue m Message))
-> Maybe (StrictTQueue m Message) -> STM m ()
forall (m :: * -> *) a.
MonadSTM m =>
StrictTVar m a -> a -> STM m ()
writeTVar StrictTVar m (Maybe (StrictTQueue m Message))
qcRead Maybe (StrictTQueue m Message)
forall a. Maybe a
Nothing
STM m () -> STM m Message -> STM m Message
forall a b. STM m a -> STM m b -> STM m b
forall (m :: * -> *) a b. Monad m => m a -> m b -> m b
>> Message -> STM m Message
forall a. a -> STM m a
forall (m :: * -> *) a. Monad m => a -> m a
return Message
msg
Just Message
msg -> Message -> STM m Message
forall a. a -> STM m a
forall (m :: * -> *) a. Monad m => a -> m a
return Message
msg
writeQueueChannel :: MonadSTM m
=> QueueChannel m -> Message -> m Bool
writeQueueChannel :: forall (m :: * -> *).
MonadSTM m =>
QueueChannel m -> Message -> m Bool
writeQueueChannel QueueChannel { StrictTVar m (Maybe (StrictTQueue m Message))
qcWrite :: forall (m :: * -> *).
QueueChannel m -> StrictTVar m (Maybe (StrictTQueue m Message))
qcWrite :: StrictTVar m (Maybe (StrictTQueue m Message))
qcWrite, StrictTVar m (Maybe (StrictTQueue m Message))
qcRead :: forall (m :: * -> *).
QueueChannel m -> StrictTVar m (Maybe (StrictTQueue m Message))
qcRead :: StrictTVar m (Maybe (StrictTQueue m Message))
qcRead } Message
msg =
STM m Bool -> m Bool
forall a. HasCallStack => STM m a -> m a
forall (m :: * -> *) a.
(MonadSTM m, HasCallStack) =>
STM m a -> m a
atomically (STM m Bool -> m Bool) -> STM m Bool -> m Bool
forall a b. (a -> b) -> a -> b
$ do
mq <- StrictTVar m (Maybe (StrictTQueue m Message))
-> STM m (Maybe (StrictTQueue m Message))
forall (m :: * -> *) a. MonadSTM m => StrictTVar m a -> STM m a
readTVar StrictTVar m (Maybe (StrictTQueue m Message))
qcWrite
case mq of
Maybe (StrictTQueue m Message)
Nothing -> do
StrictTVar m (Maybe (StrictTQueue m Message))
-> Maybe (StrictTQueue m Message) -> STM m ()
forall (m :: * -> *) a.
MonadSTM m =>
StrictTVar m a -> a -> STM m ()
writeTVar StrictTVar m (Maybe (StrictTQueue m Message))
qcRead Maybe (StrictTQueue m Message)
forall a. Maybe a
Nothing
Bool -> STM m Bool
forall a. a -> STM m a
forall (m :: * -> *) a. Monad m => a -> m a
return Bool
False
Just StrictTQueue m Message
q -> do
case Message
msg of
Message
MsgClose -> StrictTVar m (Maybe (StrictTQueue m Message))
-> Maybe (StrictTQueue m Message) -> STM m ()
forall (m :: * -> *) a.
MonadSTM m =>
StrictTVar m a -> a -> STM m ()
writeTVar StrictTVar m (Maybe (StrictTQueue m Message))
qcWrite Maybe (StrictTQueue m Message)
forall a. Maybe a
Nothing
Message
_ -> StrictTQueue m Message -> Message -> STM m ()
forall (m :: * -> *) a.
MonadSTM m =>
StrictTQueue m a -> a -> STM m ()
writeTQueue StrictTQueue m Message
q Message
msg
Bool -> STM m Bool
forall a. a -> STM m a
forall (m :: * -> *) a. Monad m => a -> m a
return Bool
True
newConnectedQueueChannelPair :: ( MonadLabelledSTM m
)
=> STM m ( QueueChannel m
, QueueChannel m )
newConnectedQueueChannelPair :: forall (m :: * -> *).
MonadLabelledSTM m =>
STM m (QueueChannel m, QueueChannel m)
newConnectedQueueChannelPair = do
read <- STM m (StrictTQueue m Message)
forall (m :: * -> *) a. MonadSTM m => STM m (StrictTQueue m a)
newTQueue
write <- newTQueue
labelTQueue read "qc-queue-read"
labelTQueue write "qc-queue-write"
q <- QueueChannel <$> newTVar (Just read)
<*> newTVar (Just write)
labelTVar (qcRead q) "qc-read"
labelTVar (qcWrite q) "qc-write"
q' <- QueueChannel <$> newTVar (Just write)
<*> newTVar (Just read)
labelTVar (qcRead q') "qc-read'"
labelTVar (qcWrite q') "qc-write'"
return (q, q')
data AttenuatedChannel m = AttenuatedChannel {
forall (m :: * -> *). AttenuatedChannel m -> m ByteString
acRead :: m BL.ByteString,
forall (m :: * -> *). AttenuatedChannel m -> ByteString -> m ()
acWrite :: BL.ByteString -> m (),
forall (m :: * -> *). AttenuatedChannel m -> m ()
acClose :: m ()
}
data SuccessOrFailure = Success | Failure IOError
type Size = Int64
data Attenuation = Attenuation {
Attenuation -> Time -> Size -> (DiffTime, SuccessOrFailure)
aReadAttenuation :: Time -> Size -> ( DiffTime,
SuccessOrFailure ),
Attenuation -> Maybe Int
aWriteAttenuation :: Maybe Int
}
newAttenuatedChannel :: forall m.
( MonadDelay m
, MonadTimer m
, MonadThrow m
, MonadThrow (STM m)
)
=> Tracer m AttenuatedChannelTrace
-> Attenuation
-> QueueChannel m
-> STM m (AttenuatedChannel m)
newAttenuatedChannel :: forall (m :: * -> *).
(MonadDelay m, MonadTimer m, MonadThrow m, MonadThrow (STM m)) =>
Tracer m AttenuatedChannelTrace
-> Attenuation -> QueueChannel m -> STM m (AttenuatedChannel m)
newAttenuatedChannel Tracer m AttenuatedChannelTrace
tr Attenuation { Time -> Size -> (DiffTime, SuccessOrFailure)
aReadAttenuation :: Attenuation -> Time -> Size -> (DiffTime, SuccessOrFailure)
aReadAttenuation :: Time -> Size -> (DiffTime, SuccessOrFailure)
aReadAttenuation,
Maybe Int
aWriteAttenuation :: Attenuation -> Maybe Int
aWriteAttenuation :: Maybe Int
aWriteAttenuation } QueueChannel m
qc = do
writeCounterVar <- Int -> STM m (StrictTVar m Int)
forall (m :: * -> *) a. MonadSTM m => a -> STM m (StrictTVar m a)
newTVar Int
0
return AttenuatedChannel { acRead
, acWrite = acWrite writeCounterVar
, acClose
}
where
acRead :: m BL.ByteString
acRead :: m ByteString
acRead = do
msg <- QueueChannel m -> m Message
forall (m :: * -> *).
(MonadSTM m, MonadThrow (STM m)) =>
QueueChannel m -> m Message
readQueueChannel QueueChannel m
qc
t <- getMonotonicTime
case msg of
Message
MsgClose -> do
case Time -> Size -> (DiffTime, SuccessOrFailure)
aReadAttenuation Time
t Size
1 of
( DiffTime
d, SuccessOrFailure
_ ) -> Tracer m AttenuatedChannelTrace -> AttenuatedChannelTrace -> m ()
forall (m :: * -> *) a. Tracer m a -> a -> m ()
traceWith Tracer m AttenuatedChannelTrace
tr AttenuatedChannelTrace
AttChannRemoteClose
m () -> m () -> m ()
forall a b. m a -> m b -> m b
forall (m :: * -> *) a b. Monad m => m a -> m b -> m b
>> DiffTime -> m ()
forall (m :: * -> *). MonadDelay m => DiffTime -> m ()
threadDelay DiffTime
d
m () -> m ByteString -> m ByteString
forall a b. m a -> m b -> m b
forall (m :: * -> *) a b. Monad m => m a -> m b -> m b
>> Error -> m ByteString
forall e a. Exception e => e -> m a
forall (m :: * -> *) e a. (MonadThrow m, Exception e) => e -> m a
throwIO (String -> Error
BearerClosed String
"closed when reading data")
MsgBytes ByteString
bs ->
case Time -> Size -> (DiffTime, SuccessOrFailure)
aReadAttenuation Time
t (ByteString -> Size
BL.length ByteString
bs) of
( DiffTime
d, SuccessOrFailure
Success ) -> DiffTime -> m ()
forall (m :: * -> *). MonadDelay m => DiffTime -> m ()
threadDelay DiffTime
d
m () -> m ByteString -> m ByteString
forall a b. m a -> m b -> m b
forall (m :: * -> *) a b. Monad m => m a -> m b -> m b
>> ByteString -> m ByteString
forall a. a -> m a
forall (m :: * -> *) a. Monad m => a -> m a
return ByteString
bs
( DiffTime
d, Failure IOError
ioe) -> DiffTime -> m ()
forall (m :: * -> *). MonadDelay m => DiffTime -> m ()
threadDelay DiffTime
d
m () -> m ByteString -> m ByteString
forall a b. m a -> m b -> m b
forall (m :: * -> *) a b. Monad m => m a -> m b -> m b
>> IOError -> m ByteString
forall e a. Exception e => e -> m a
forall (m :: * -> *) e a. (MonadThrow m, Exception e) => e -> m a
throwIO IOError
ioe
acWrite :: StrictTVar m Int
-> BL.ByteString
-> m ()
acWrite :: StrictTVar m Int -> ByteString -> m ()
acWrite StrictTVar m Int
writeCounterVar ByteString
bs = do
wCount <- STM m Int -> m Int
forall a. HasCallStack => STM m a -> m a
forall (m :: * -> *) a.
(MonadSTM m, HasCallStack) =>
STM m a -> m a
atomically (STM m Int -> m Int) -> STM m Int -> m Int
forall a b. (a -> b) -> a -> b
$ do
StrictTVar m Int -> (Int -> Int) -> STM m ()
forall (m :: * -> *) a.
MonadSTM m =>
StrictTVar m a -> (a -> a) -> STM m ()
modifyTVar StrictTVar m Int
writeCounterVar Int -> Int
forall a. Enum a => a -> a
succ
StrictTVar m Int -> STM m Int
forall (m :: * -> *) a. MonadSTM m => StrictTVar m a -> STM m a
readTVar StrictTVar m Int
writeCounterVar
case aWriteAttenuation of
Just Int
limit | Int
wCount Int -> Int -> Bool
forall a. Ord a => a -> a -> Bool
>= Int
limit
-> IOError -> m ()
forall e a. Exception e => e -> m a
forall (m :: * -> *) e a. (MonadThrow m, Exception e) => e -> m a
throwIO (IOError -> m ()) -> IOError -> m ()
forall a b. (a -> b) -> a -> b
$
String -> String -> IOError
resourceVanishedIOError
String
"AttenuatedChannel.write"
String
"write limit reached (write attenuation)"
Maybe Int
_ -> () -> m ()
forall a. a -> m a
forall (m :: * -> *) a. Monad m => a -> m a
return ()
sent <- writeQueueChannel qc (MsgBytes bs)
when (not sent) $
throwIO (resourceVanishedIOError "AttenuatedChannel.write" "")
acClose :: m ()
acClose :: m ()
acClose = do
sent <- QueueChannel m -> Message -> m Bool
forall (m :: * -> *).
MonadSTM m =>
QueueChannel m -> Message -> m Bool
writeQueueChannel QueueChannel m
qc Message
MsgClose
traceWith tr (AttChannLocalClose sent)
newConnectedAttenuatedChannelPair
:: forall m.
( MonadDelay m
, MonadLabelledSTM m
, MonadTimer m
, MonadThrow m
, MonadThrow (STM m)
)
=> Tracer m AttenuatedChannelTrace
-> Tracer m AttenuatedChannelTrace
-> Attenuation
-> Attenuation
-> STM m (AttenuatedChannel m, AttenuatedChannel m)
newConnectedAttenuatedChannelPair :: forall (m :: * -> *).
(MonadDelay m, MonadLabelledSTM m, MonadTimer m, MonadThrow m,
MonadThrow (STM m)) =>
Tracer m AttenuatedChannelTrace
-> Tracer m AttenuatedChannelTrace
-> Attenuation
-> Attenuation
-> STM m (AttenuatedChannel m, AttenuatedChannel m)
newConnectedAttenuatedChannelPair Tracer m AttenuatedChannelTrace
tr Tracer m AttenuatedChannelTrace
tr' Attenuation
attenuation Attenuation
attenuation' = do
(c, c') <- STM m (QueueChannel m, QueueChannel m)
forall (m :: * -> *).
MonadLabelledSTM m =>
STM m (QueueChannel m, QueueChannel m)
newConnectedQueueChannelPair
b <- newAttenuatedChannel tr attenuation c
b' <- newAttenuatedChannel tr' attenuation' c'
return (b, b')
attenuationChannelAsBearer :: forall m.
( MonadThrow m
, MonadMonotonicTime m
)
=> SDUSize
-> DiffTime
-> Tracer m Trace
-> AttenuatedChannel m
-> Bearer m
attenuationChannelAsBearer :: forall (m :: * -> *).
(MonadThrow m, MonadMonotonicTime m) =>
SDUSize
-> DiffTime -> Tracer m Trace -> AttenuatedChannel m -> Bearer m
attenuationChannelAsBearer SDUSize
sduSize DiffTime
sduTimeout Tracer m Trace
muxTracer AttenuatedChannel m
chan =
Bearer {
read :: TimeoutFn m -> m (SDU, Time)
read = TimeoutFn m -> m (SDU, Time)
readMux,
write :: TimeoutFn m -> SDU -> m Time
write = TimeoutFn m -> SDU -> m Time
writeMux,
SDUSize
sduSize :: SDUSize
sduSize :: SDUSize
sduSize,
name :: String
name = String
"attenuation-channel"
}
where
readMux :: TimeoutFn m -> m (SDU, Time)
readMux :: TimeoutFn m -> m (SDU, Time)
readMux TimeoutFn m
timeoutFn = do
Tracer m Trace -> Trace -> m ()
forall (m :: * -> *) a. Tracer m a -> a -> m ()
traceWith Tracer m Trace
muxTracer Trace
TraceRecvHeaderStart
mbuf <- DiffTime -> m ByteString -> m (Maybe ByteString)
TimeoutFn m
timeoutFn DiffTime
sduTimeout (m ByteString -> m (Maybe ByteString))
-> m ByteString -> m (Maybe ByteString)
forall a b. (a -> b) -> a -> b
$ AttenuatedChannel m -> m ByteString
forall (m :: * -> *). AttenuatedChannel m -> m ByteString
acRead AttenuatedChannel m
chan
case mbuf of
Maybe ByteString
Nothing -> do
Tracer m Trace -> Trace -> m ()
forall (m :: * -> *) a. Tracer m a -> a -> m ()
traceWith Tracer m Trace
muxTracer Trace
TraceSDUReadTimeoutException
Error -> m (SDU, Time)
forall e a. Exception e => e -> m a
forall (m :: * -> *) e a. (MonadThrow m, Exception e) => e -> m a
throwIO Error
SDUReadTimeout
Just ByteString
buf -> do
let (ByteString
hbuf, ByteString
payload) = Size -> ByteString -> (ByteString, ByteString)
BL.splitAt Size
8 ByteString
buf
case ByteString -> Either Error SDU
decodeSDU ByteString
hbuf of
Left Error
e -> Error -> m (SDU, Time)
forall e a. Exception e => e -> m a
forall (m :: * -> *) e a. (MonadThrow m, Exception e) => e -> m a
throwIO Error
e
Right SDU
muxsdu -> do
let header :: SDUHeader
header = SDU -> SDUHeader
msHeader SDU
muxsdu
Tracer m Trace -> Trace -> m ()
forall (m :: * -> *) a. Tracer m a -> a -> m ()
traceWith Tracer m Trace
muxTracer (Trace -> m ()) -> Trace -> m ()
forall a b. (a -> b) -> a -> b
$ SDUHeader -> Trace
TraceRecvHeaderEnd SDUHeader
header
ts <- m Time
forall (m :: * -> *). MonadMonotonicTime m => m Time
getMonotonicTime
traceWith muxTracer $ TraceRecvDeltaQObservation header ts
return (muxsdu {msBlob = payload}, ts)
writeMux :: TimeoutFn m -> SDU -> m Time
writeMux :: TimeoutFn m -> SDU -> m Time
writeMux TimeoutFn m
_ SDU
sdu = do
ts <- m Time
forall (m :: * -> *). MonadMonotonicTime m => m Time
getMonotonicTime
let ts32 = Time -> Word32
timestampMicrosecondsLow32Bits Time
ts
sdu' = SDU -> RemoteClockModel -> SDU
setTimestamp SDU
sdu (Word32 -> RemoteClockModel
RemoteClockModel Word32
ts32)
buf = SDU -> ByteString
encodeSDU SDU
sdu'
traceWith muxTracer $ TraceSendStart (msHeader sdu')
acWrite chan buf
traceWith muxTracer TraceSendEnd
return ts
data AttenuatedChannelTrace =
AttChannLocalClose Bool
| AttChannRemoteClose
deriving Int -> AttenuatedChannelTrace -> ShowS
[AttenuatedChannelTrace] -> ShowS
AttenuatedChannelTrace -> String
(Int -> AttenuatedChannelTrace -> ShowS)
-> (AttenuatedChannelTrace -> String)
-> ([AttenuatedChannelTrace] -> ShowS)
-> Show AttenuatedChannelTrace
forall a.
(Int -> a -> ShowS) -> (a -> String) -> ([a] -> ShowS) -> Show a
$cshowsPrec :: Int -> AttenuatedChannelTrace -> ShowS
showsPrec :: Int -> AttenuatedChannelTrace -> ShowS
$cshow :: AttenuatedChannelTrace -> String
show :: AttenuatedChannelTrace -> String
$cshowList :: [AttenuatedChannelTrace] -> ShowS
showList :: [AttenuatedChannelTrace] -> ShowS
Show
resourceVanishedIOError :: String -> String -> IOError
resourceVanishedIOError :: String -> String -> IOError
resourceVanishedIOError String
ioe_location String
ioe_description = IOError
{ ioe_handle :: Maybe Handle
ioe_handle = Maybe Handle
forall a. Maybe a
Nothing
, ioe_type :: IOErrorType
ioe_type = IOErrorType
ResourceVanished
, String
ioe_location :: String
ioe_location :: String
ioe_location
, String
ioe_description :: String
ioe_description :: String
ioe_description
, ioe_errno :: Maybe CInt
ioe_errno = Maybe CInt
forall a. Maybe a
Nothing
, ioe_filename :: Maybe String
ioe_filename = Maybe String
forall a. Maybe a
Nothing
}