{-# LANGUAGE BangPatterns #-}
{-# LANGUAGE DataKinds #-}
{-# LANGUAGE ExistentialQuantification #-}
{-# LANGUAGE FlexibleContexts #-}
{-# LANGUAGE GADTSyntax #-}
{-# LANGUAGE LambdaCase #-}
{-# LANGUAGE NamedFieldPuns #-}
{-# LANGUAGE RankNTypes #-}
{-# LANGUAGE ScopedTypeVariables #-}
{-# LANGUAGE TypeFamilies #-}
module Network.Mux
(
new
, Mux
, Mode (..)
, HasInitiator
, HasResponder
, MiniProtocolInfo (..)
, MiniProtocolNum (..)
, MiniProtocolDirection (..)
, MiniProtocolLimits (..)
, run
, stop
, runMiniProtocol
, StartOnDemandOrEagerly (..)
, ByteChannel
, Channel (..)
, Bearer
, MakeBearer (..)
, SDUSize (..)
, miniProtocolStateMap
, stopped
, Error (..)
, RuntimeError (..)
, traceBearerState
, BearerState (..)
, Trace (..)
, WithBearer (..)
) where
import Data.ByteString.Lazy qualified as BL
import Data.Int (Int64)
import Data.Map (Map)
import Data.Map.Strict qualified as Map
import Data.Maybe (isNothing)
import Data.Monoid.Synchronisation (FirstToFinish (..))
import Control.Applicative
import Control.Concurrent.Class.MonadSTM.Strict
import Control.Concurrent.JobPool qualified as JobPool
import Control.Exception (SomeAsyncException (..))
import Control.Monad
import Control.Monad.Class.MonadAsync
import Control.Monad.Class.MonadFork
import Control.Monad.Class.MonadThrow
import Control.Monad.Class.MonadTimer.SI hiding (timeout)
import Control.Tracer
import Network.Mux.Bearer
import Network.Mux.Channel
import Network.Mux.Egress as Egress
import Network.Mux.Ingress as Ingress
import Network.Mux.Timeout
import Network.Mux.Trace
import Network.Mux.Types
data Mux (mode :: Mode) m =
Mux {
forall (mode :: Mode) (m :: * -> *).
Mux mode m
-> Map
(MiniProtocolNum, MiniProtocolDir) (MiniProtocolState mode m)
muxMiniProtocols :: !(Map (MiniProtocolNum, MiniProtocolDir)
(MiniProtocolState mode m)),
forall (mode :: Mode) (m :: * -> *).
Mux mode m -> StrictTQueue m (ControlCmd mode m)
muxControlCmdQueue :: !(StrictTQueue m (ControlCmd mode m)),
forall (mode :: Mode) (m :: * -> *).
Mux mode m -> StrictTVar m Status
muxStatus :: StrictTVar m Status
}
miniProtocolStateMap :: MonadSTM m
=> Mux mode m
-> Map (MiniProtocolNum, MiniProtocolDir)
(STM m MiniProtocolStatus)
miniProtocolStateMap :: forall (m :: * -> *) (mode :: Mode).
MonadSTM m =>
Mux mode m
-> Map
(MiniProtocolNum, MiniProtocolDir) (STM m MiniProtocolStatus)
miniProtocolStateMap = (MiniProtocolState mode m -> STM m MiniProtocolStatus)
-> Map
(MiniProtocolNum, MiniProtocolDir) (MiniProtocolState mode m)
-> Map
(MiniProtocolNum, MiniProtocolDir) (STM m MiniProtocolStatus)
forall a b.
(a -> b)
-> Map (MiniProtocolNum, MiniProtocolDir) a
-> Map (MiniProtocolNum, MiniProtocolDir) b
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
fmap (StrictTVar m MiniProtocolStatus -> STM m MiniProtocolStatus
forall (m :: * -> *) a. MonadSTM m => StrictTVar m a -> STM m a
readTVar (StrictTVar m MiniProtocolStatus -> STM m MiniProtocolStatus)
-> (MiniProtocolState mode m -> StrictTVar m MiniProtocolStatus)
-> MiniProtocolState mode m
-> STM m MiniProtocolStatus
forall b c a. (b -> c) -> (a -> b) -> a -> c
. MiniProtocolState mode m -> StrictTVar m MiniProtocolStatus
forall (mode :: Mode) (m :: * -> *).
MiniProtocolState mode m -> StrictTVar m MiniProtocolStatus
miniProtocolStatusVar)
(Map (MiniProtocolNum, MiniProtocolDir) (MiniProtocolState mode m)
-> Map
(MiniProtocolNum, MiniProtocolDir) (STM m MiniProtocolStatus))
-> (Mux mode m
-> Map
(MiniProtocolNum, MiniProtocolDir) (MiniProtocolState mode m))
-> Mux mode m
-> Map
(MiniProtocolNum, MiniProtocolDir) (STM m MiniProtocolStatus)
forall b c a. (b -> c) -> (a -> b) -> a -> c
. Mux mode m
-> Map
(MiniProtocolNum, MiniProtocolDir) (MiniProtocolState mode m)
forall (mode :: Mode) (m :: * -> *).
Mux mode m
-> Map
(MiniProtocolNum, MiniProtocolDir) (MiniProtocolState mode m)
muxMiniProtocols
stopped :: MonadSTM m => Mux mode m -> STM m (Maybe SomeException)
stopped :: forall (m :: * -> *) (mode :: Mode).
MonadSTM m =>
Mux mode m -> STM m (Maybe SomeException)
stopped Mux { StrictTVar m Status
muxStatus :: forall (mode :: Mode) (m :: * -> *).
Mux mode m -> StrictTVar m Status
muxStatus :: StrictTVar m Status
muxStatus } =
StrictTVar m Status -> STM m Status
forall (m :: * -> *) a. MonadSTM m => StrictTVar m a -> STM m a
readTVar StrictTVar m Status
muxStatus STM m Status
-> (Status -> STM m (Maybe SomeException))
-> STM m (Maybe SomeException)
forall a b. STM m a -> (a -> STM m b) -> STM m b
forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= \case
Status
Ready -> STM m (Maybe SomeException)
forall a. STM m a
forall (m :: * -> *) a. MonadSTM m => STM m a
retry
Failed SomeException
err -> Maybe SomeException -> STM m (Maybe SomeException)
forall a. a -> STM m a
forall (m :: * -> *) a. Monad m => a -> m a
return (SomeException -> Maybe SomeException
forall a. a -> Maybe a
Just SomeException
err)
Status
Stopping -> STM m (Maybe SomeException)
forall a. STM m a
forall (m :: * -> *) a. MonadSTM m => STM m a
retry
Status
Stopped -> Maybe SomeException -> STM m (Maybe SomeException)
forall a. a -> STM m a
forall (m :: * -> *) a. Monad m => a -> m a
return Maybe SomeException
forall a. Maybe a
Nothing
new :: forall (mode :: Mode) m.
MonadLabelledSTM m
=> [MiniProtocolInfo mode]
-> m (Mux mode m)
new :: forall (mode :: Mode) (m :: * -> *).
MonadLabelledSTM m =>
[MiniProtocolInfo mode] -> m (Mux mode m)
new [MiniProtocolInfo mode]
ptcls = do
muxMiniProtocols <- [MiniProtocolInfo mode]
-> m (Map
(MiniProtocolNum, MiniProtocolDir) (MiniProtocolState mode m))
forall (m :: * -> *) (mode :: Mode).
MonadSTM m =>
[MiniProtocolInfo mode]
-> m (Map
(MiniProtocolNum, MiniProtocolDir) (MiniProtocolState mode m))
mkMiniProtocolStateMap [MiniProtocolInfo mode]
ptcls
muxControlCmdQueue <- atomically newTQueue
muxStatus <- newTVarIO Ready
return Mux {
muxMiniProtocols,
muxControlCmdQueue,
muxStatus
}
mkMiniProtocolStateMap :: MonadSTM m
=> [MiniProtocolInfo mode]
-> m (Map (MiniProtocolNum, MiniProtocolDir)
(MiniProtocolState mode m))
mkMiniProtocolStateMap :: forall (m :: * -> *) (mode :: Mode).
MonadSTM m =>
[MiniProtocolInfo mode]
-> m (Map
(MiniProtocolNum, MiniProtocolDir) (MiniProtocolState mode m))
mkMiniProtocolStateMap [MiniProtocolInfo mode]
ptcls =
[((MiniProtocolNum, MiniProtocolDir), MiniProtocolState mode m)]
-> Map
(MiniProtocolNum, MiniProtocolDir) (MiniProtocolState mode m)
forall k a. Ord k => [(k, a)] -> Map k a
Map.fromList ([((MiniProtocolNum, MiniProtocolDir), MiniProtocolState mode m)]
-> Map
(MiniProtocolNum, MiniProtocolDir) (MiniProtocolState mode m))
-> m [((MiniProtocolNum, MiniProtocolDir),
MiniProtocolState mode m)]
-> m (Map
(MiniProtocolNum, MiniProtocolDir) (MiniProtocolState mode m))
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$>
[m ((MiniProtocolNum, MiniProtocolDir), MiniProtocolState mode m)]
-> m [((MiniProtocolNum, MiniProtocolDir),
MiniProtocolState mode m)]
forall (t :: * -> *) (m :: * -> *) a.
(Traversable t, Monad m) =>
t (m a) -> m (t a)
forall (m :: * -> *) a. Monad m => [m a] -> m [a]
sequence
[ do state <- MiniProtocolInfo mode -> m (MiniProtocolState mode m)
forall (m :: * -> *) (mode :: Mode).
MonadSTM m =>
MiniProtocolInfo mode -> m (MiniProtocolState mode m)
mkMiniProtocolState MiniProtocolInfo mode
ptcl
return ((miniProtocolNum, protocolDirEnum miniProtocolDir), state)
| ptcl :: MiniProtocolInfo mode
ptcl@MiniProtocolInfo {MiniProtocolNum
miniProtocolNum :: MiniProtocolNum
miniProtocolNum :: forall (mode :: Mode). MiniProtocolInfo mode -> MiniProtocolNum
miniProtocolNum, MiniProtocolDirection mode
miniProtocolDir :: MiniProtocolDirection mode
miniProtocolDir :: forall (mode :: Mode).
MiniProtocolInfo mode -> MiniProtocolDirection mode
miniProtocolDir} <- [MiniProtocolInfo mode]
ptcls ]
mkMiniProtocolState :: MonadSTM m
=> MiniProtocolInfo mode
-> m (MiniProtocolState mode m)
mkMiniProtocolState :: forall (m :: * -> *) (mode :: Mode).
MonadSTM m =>
MiniProtocolInfo mode -> m (MiniProtocolState mode m)
mkMiniProtocolState MiniProtocolInfo mode
miniProtocolInfo = do
miniProtocolIngressQueue <- ByteString -> m (StrictTVar m ByteString)
forall (m :: * -> *) a. MonadSTM m => a -> m (StrictTVar m a)
newTVarIO ByteString
BL.empty
miniProtocolStatusVar <- newTVarIO StatusIdle
return MiniProtocolState {
miniProtocolInfo,
miniProtocolIngressQueue,
miniProtocolStatusVar
}
stop :: MonadSTM m => Mux mode m -> m ()
stop :: forall (m :: * -> *) (mode :: Mode).
MonadSTM m =>
Mux mode m -> m ()
stop Mux{StrictTQueue m (ControlCmd mode m)
muxControlCmdQueue :: forall (mode :: Mode) (m :: * -> *).
Mux mode m -> StrictTQueue m (ControlCmd mode m)
muxControlCmdQueue :: StrictTQueue m (ControlCmd mode m)
muxControlCmdQueue} =
STM m () -> m ()
forall a. HasCallStack => STM m a -> m a
forall (m :: * -> *) a.
(MonadSTM m, HasCallStack) =>
STM m a -> m a
atomically (STM m () -> m ()) -> STM m () -> m ()
forall a b. (a -> b) -> a -> b
$ StrictTQueue m (ControlCmd mode m) -> ControlCmd mode m -> STM m ()
forall (m :: * -> *) a.
MonadSTM m =>
StrictTQueue m a -> a -> STM m ()
writeTQueue StrictTQueue m (ControlCmd mode m)
muxControlCmdQueue ControlCmd mode m
forall (mode :: Mode) (m :: * -> *). ControlCmd mode m
CmdShutdown
data Group = MuxJob
| MiniProtocolJob
deriving (Group -> Group -> Bool
(Group -> Group -> Bool) -> (Group -> Group -> Bool) -> Eq Group
forall a. (a -> a -> Bool) -> (a -> a -> Bool) -> Eq a
$c== :: Group -> Group -> Bool
== :: Group -> Group -> Bool
$c/= :: Group -> Group -> Bool
/= :: Group -> Group -> Bool
Eq, Eq Group
Eq Group =>
(Group -> Group -> Ordering)
-> (Group -> Group -> Bool)
-> (Group -> Group -> Bool)
-> (Group -> Group -> Bool)
-> (Group -> Group -> Bool)
-> (Group -> Group -> Group)
-> (Group -> Group -> Group)
-> Ord Group
Group -> Group -> Bool
Group -> Group -> Ordering
Group -> Group -> Group
forall a.
Eq a =>
(a -> a -> Ordering)
-> (a -> a -> Bool)
-> (a -> a -> Bool)
-> (a -> a -> Bool)
-> (a -> a -> Bool)
-> (a -> a -> a)
-> (a -> a -> a)
-> Ord a
$ccompare :: Group -> Group -> Ordering
compare :: Group -> Group -> Ordering
$c< :: Group -> Group -> Bool
< :: Group -> Group -> Bool
$c<= :: Group -> Group -> Bool
<= :: Group -> Group -> Bool
$c> :: Group -> Group -> Bool
> :: Group -> Group -> Bool
$c>= :: Group -> Group -> Bool
>= :: Group -> Group -> Bool
$cmax :: Group -> Group -> Group
max :: Group -> Group -> Group
$cmin :: Group -> Group -> Group
min :: Group -> Group -> Group
Ord)
run :: forall m mode.
( MonadAsync m
, MonadFork m
, MonadLabelledSTM m
, Alternative (STM m)
, MonadThrow (STM m)
, MonadTimer m
, MonadMask m
)
=> Tracer m Trace
-> Mux mode m
-> Bearer m
-> m ()
run :: forall (m :: * -> *) (mode :: Mode).
(MonadAsync m, MonadFork m, MonadLabelledSTM m,
Alternative (STM m), MonadThrow (STM m), MonadTimer m,
MonadMask m) =>
Tracer m Trace -> Mux mode m -> Bearer m -> m ()
run Tracer m Trace
tracer Mux {Map (MiniProtocolNum, MiniProtocolDir) (MiniProtocolState mode m)
muxMiniProtocols :: forall (mode :: Mode) (m :: * -> *).
Mux mode m
-> Map
(MiniProtocolNum, MiniProtocolDir) (MiniProtocolState mode m)
muxMiniProtocols :: Map (MiniProtocolNum, MiniProtocolDir) (MiniProtocolState mode m)
muxMiniProtocols, StrictTQueue m (ControlCmd mode m)
muxControlCmdQueue :: forall (mode :: Mode) (m :: * -> *).
Mux mode m -> StrictTQueue m (ControlCmd mode m)
muxControlCmdQueue :: StrictTQueue m (ControlCmd mode m)
muxControlCmdQueue, StrictTVar m Status
muxStatus :: forall (mode :: Mode) (m :: * -> *).
Mux mode m -> StrictTVar m Status
muxStatus :: StrictTVar m Status
muxStatus} bearer :: Bearer m
bearer@Bearer {String
name :: String
name :: forall (m :: * -> *). Bearer m -> String
name} = do
egressQueue <- STM m (StrictTBQueue m (TranslocationServiceRequest m))
-> m (StrictTBQueue m (TranslocationServiceRequest m))
forall a. HasCallStack => STM m a -> m a
forall (m :: * -> *) a.
(MonadSTM m, HasCallStack) =>
STM m a -> m a
atomically (STM m (StrictTBQueue m (TranslocationServiceRequest m))
-> m (StrictTBQueue m (TranslocationServiceRequest m)))
-> STM m (StrictTBQueue m (TranslocationServiceRequest m))
-> m (StrictTBQueue m (TranslocationServiceRequest m))
forall a b. (a -> b) -> a -> b
$ Natural -> STM m (StrictTBQueue m (TranslocationServiceRequest m))
forall (m :: * -> *) a.
MonadSTM m =>
Natural -> STM m (StrictTBQueue m a)
newTBQueue Natural
100
labelTBQueueIO egressQueue (name ++ "-mux-egress")
labelTVarIO muxStatus (name ++ "-mux-status")
labelTQueueIO muxControlCmdQueue (name ++ "-mux-ctrl")
JobPool.withJobPool
(\JobPool Group m JobResult
jobpool -> do
JobPool Group m JobResult -> Job Group m JobResult -> m ()
forall group (m :: * -> *) a.
(MonadAsync m, MonadMask m, Ord group) =>
JobPool group m a -> Job group m a -> m ()
JobPool.forkJob JobPool Group m JobResult
jobpool (StrictTBQueue m (TranslocationServiceRequest m)
-> Job Group m JobResult
muxerJob StrictTBQueue m (TranslocationServiceRequest m)
egressQueue)
JobPool Group m JobResult -> Job Group m JobResult -> m ()
forall group (m :: * -> *) a.
(MonadAsync m, MonadMask m, Ord group) =>
JobPool group m a -> Job group m a -> m ()
JobPool.forkJob JobPool Group m JobResult
jobpool Job Group m JobResult
demuxerJob
Tracer m Trace -> Trace -> m ()
forall (m :: * -> *) a. Tracer m a -> a -> m ()
traceWith Tracer m Trace
tracer (BearerState -> Trace
TraceState BearerState
Mature)
(TimeoutFn m -> m ()) -> m ()
forall (m :: * -> *) b.
(MonadAsync m, MonadFork m, MonadMonotonicTime m, MonadTimer m,
MonadMask m, MonadThrow (STM m)) =>
(TimeoutFn m -> m b) -> m b
withTimeoutSerial ((TimeoutFn m -> m ()) -> m ()) -> (TimeoutFn m -> m ()) -> m ()
forall a b. (a -> b) -> a -> b
$ \TimeoutFn m
timeout ->
Tracer m Trace
-> TimeoutFn m
-> JobPool Group m JobResult
-> StrictTBQueue m (TranslocationServiceRequest m)
-> StrictTQueue m (ControlCmd mode m)
-> StrictTVar m Status
-> m ()
forall (mode :: Mode) (m :: * -> *).
(MonadAsync m, MonadMask m, Alternative (STM m),
MonadThrow (STM m)) =>
Tracer m Trace
-> TimeoutFn m
-> JobPool Group m JobResult
-> EgressQueue m
-> StrictTQueue m (ControlCmd mode m)
-> StrictTVar m Status
-> m ()
monitor Tracer m Trace
tracer
DiffTime -> m a -> m (Maybe a)
TimeoutFn m
timeout
JobPool Group m JobResult
jobpool
StrictTBQueue m (TranslocationServiceRequest m)
egressQueue
StrictTQueue m (ControlCmd mode m)
muxControlCmdQueue
StrictTVar m Status
muxStatus
)
m () -> (SomeAsyncException -> m ()) -> m ()
forall e a. Exception e => m a -> (e -> m a) -> m a
forall (m :: * -> *) e a.
(MonadCatch m, Exception e) =>
m a -> (e -> m a) -> m a
`catch` \(SomeAsyncException e
e) -> do
STM m () -> m ()
forall a. HasCallStack => STM m a -> m a
forall (m :: * -> *) a.
(MonadSTM m, HasCallStack) =>
STM m a -> m a
atomically (STM m () -> m ()) -> STM m () -> m ()
forall a b. (a -> b) -> a -> b
$ StrictTVar m Status -> Status -> STM m ()
forall (m :: * -> *) a.
MonadSTM m =>
StrictTVar m a -> a -> STM m ()
writeTVar StrictTVar m Status
muxStatus (SomeException -> Status
Failed (SomeException -> Status) -> SomeException -> Status
forall a b. (a -> b) -> a -> b
$ e -> SomeException
forall e. Exception e => e -> SomeException
toException e
e)
e -> m ()
forall e a. Exception e => e -> m a
forall (m :: * -> *) e a. (MonadThrow m, Exception e) => e -> m a
throwIO e
e
where
muxerJob :: StrictTBQueue m (TranslocationServiceRequest m)
-> Job Group m JobResult
muxerJob StrictTBQueue m (TranslocationServiceRequest m)
egressQueue =
m JobResult
-> (SomeException -> m JobResult)
-> Group
-> String
-> Job Group m JobResult
forall group (m :: * -> *) a.
m a -> (SomeException -> m a) -> group -> String -> Job group m a
JobPool.Job (StrictTBQueue m (TranslocationServiceRequest m)
-> Bearer m -> m JobResult
forall (m :: * -> *) void.
(MonadAsync m, MonadFork m, MonadMask m, MonadThrow (STM m),
MonadTimer m) =>
EgressQueue m -> Bearer m -> m void
muxer StrictTBQueue m (TranslocationServiceRequest m)
egressQueue Bearer m
bearer)
(JobResult -> m JobResult
forall a. a -> m a
forall (m :: * -> *) a. Monad m => a -> m a
return (JobResult -> m JobResult)
-> (SomeException -> JobResult) -> SomeException -> m JobResult
forall b c a. (b -> c) -> (a -> b) -> a -> c
. SomeException -> JobResult
MuxerException)
Group
MuxJob
(String
name String -> String -> String
forall a. [a] -> [a] -> [a]
++ String
"-muxer")
demuxerJob :: Job Group m JobResult
demuxerJob =
m JobResult
-> (SomeException -> m JobResult)
-> Group
-> String
-> Job Group m JobResult
forall group (m :: * -> *) a.
m a -> (SomeException -> m a) -> group -> String -> Job group m a
JobPool.Job ([MiniProtocolState mode m] -> Bearer m -> m JobResult
forall (m :: * -> *) (mode :: Mode) void.
(MonadAsync m, MonadFork m, MonadMask m, MonadThrow (STM m),
MonadTimer m) =>
[MiniProtocolState mode m] -> Bearer m -> m void
demuxer (Map (MiniProtocolNum, MiniProtocolDir) (MiniProtocolState mode m)
-> [MiniProtocolState mode m]
forall k a. Map k a -> [a]
Map.elems Map (MiniProtocolNum, MiniProtocolDir) (MiniProtocolState mode m)
muxMiniProtocols) Bearer m
bearer)
(JobResult -> m JobResult
forall a. a -> m a
forall (m :: * -> *) a. Monad m => a -> m a
return (JobResult -> m JobResult)
-> (SomeException -> JobResult) -> SomeException -> m JobResult
forall b c a. (b -> c) -> (a -> b) -> a -> c
. SomeException -> JobResult
DemuxerException)
Group
MuxJob
(String
name String -> String -> String
forall a. [a] -> [a] -> [a]
++ String
"-demuxer")
miniProtocolJob
:: forall mode m.
( MonadSTM m
, MonadThread m
, MonadThrow (STM m)
)
=> Tracer m Trace
-> EgressQueue m
-> MiniProtocolState mode m
-> MiniProtocolAction m
-> JobPool.Job Group m JobResult
miniProtocolJob :: forall (mode :: Mode) (m :: * -> *).
(MonadSTM m, MonadThread m, MonadThrow (STM m)) =>
Tracer m Trace
-> EgressQueue m
-> MiniProtocolState mode m
-> MiniProtocolAction m
-> Job Group m JobResult
miniProtocolJob Tracer m Trace
tracer EgressQueue m
egressQueue
MiniProtocolState {
miniProtocolInfo :: forall (mode :: Mode) (m :: * -> *).
MiniProtocolState mode m -> MiniProtocolInfo mode
miniProtocolInfo =
MiniProtocolInfo {
MiniProtocolNum
miniProtocolNum :: forall (mode :: Mode). MiniProtocolInfo mode -> MiniProtocolNum
miniProtocolNum :: MiniProtocolNum
miniProtocolNum,
MiniProtocolDirection mode
miniProtocolDir :: forall (mode :: Mode).
MiniProtocolInfo mode -> MiniProtocolDirection mode
miniProtocolDir :: MiniProtocolDirection mode
miniProtocolDir
},
IngressQueue m
miniProtocolIngressQueue :: forall (mode :: Mode) (m :: * -> *).
MiniProtocolState mode m -> IngressQueue m
miniProtocolIngressQueue :: IngressQueue m
miniProtocolIngressQueue,
StrictTVar m MiniProtocolStatus
miniProtocolStatusVar :: forall (mode :: Mode) (m :: * -> *).
MiniProtocolState mode m -> StrictTVar m MiniProtocolStatus
miniProtocolStatusVar :: StrictTVar m MiniProtocolStatus
miniProtocolStatusVar
}
MiniProtocolAction {
ByteChannel m -> m (a, Maybe ByteString)
miniProtocolAction :: ByteChannel m -> m (a, Maybe ByteString)
miniProtocolAction :: ()
miniProtocolAction,
StrictTMVar m (Either SomeException a)
completionVar :: StrictTMVar m (Either SomeException a)
completionVar :: ()
completionVar
} =
m JobResult
-> (SomeException -> m JobResult)
-> Group
-> String
-> Job Group m JobResult
forall group (m :: * -> *) a.
m a -> (SomeException -> m a) -> group -> String -> Job group m a
JobPool.Job m JobResult
jobAction
SomeException -> m JobResult
jobHandler
Group
MiniProtocolJob
(MiniProtocolNum -> String
forall a. Show a => a -> String
show MiniProtocolNum
miniProtocolNum String -> String -> String
forall a. [a] -> [a] -> [a]
++ String
"." String -> String -> String
forall a. [a] -> [a] -> [a]
++ MiniProtocolDir -> String
forall a. Show a => a -> String
show MiniProtocolDir
miniProtocolDirEnum)
where
jobAction :: m JobResult
jobAction = do
String -> m ()
forall (m :: * -> *). MonadThread m => String -> m ()
labelThisThread (case MiniProtocolNum
miniProtocolNum of
MiniProtocolNum Word16
a -> String
"prtcl-" String -> String -> String
forall a. [a] -> [a] -> [a]
++ Word16 -> String
forall a. Show a => a -> String
show Word16
a)
w <- ByteString -> m (IngressQueue m)
forall (m :: * -> *) a. MonadSTM m => a -> m (StrictTVar m a)
newTVarIO ByteString
BL.empty
let chan = Tracer m Trace
-> EgressQueue m
-> Wanton m
-> MiniProtocolNum
-> MiniProtocolDir
-> IngressQueue m
-> ByteChannel m
forall (m :: * -> *).
MonadSTM m =>
Tracer m Trace
-> EgressQueue m
-> Wanton m
-> MiniProtocolNum
-> MiniProtocolDir
-> IngressQueue m
-> ByteChannel m
muxChannel Tracer m Trace
tracer EgressQueue m
egressQueue (IngressQueue m -> Wanton m
forall (m :: * -> *). StrictTVar m ByteString -> Wanton m
Wanton IngressQueue m
w)
MiniProtocolNum
miniProtocolNum MiniProtocolDir
miniProtocolDirEnum
IngressQueue m
miniProtocolIngressQueue
(result, remainder) <- miniProtocolAction chan
traceWith tracer (TraceTerminating miniProtocolNum miniProtocolDirEnum)
atomically $ do
readTVar w >>= check . BL.null
writeTVar miniProtocolStatusVar StatusIdle
putTMVar completionVar (Right result)
`orElse` throwSTM (BlockedOnCompletionVar miniProtocolNum)
case remainder of
Just ByteString
trailing ->
IngressQueue m -> (ByteString -> ByteString) -> STM m ()
forall (m :: * -> *) a.
MonadSTM m =>
StrictTVar m a -> (a -> a) -> STM m ()
modifyTVar IngressQueue m
miniProtocolIngressQueue (ByteString -> ByteString -> ByteString
BL.append ByteString
trailing)
Maybe ByteString
Nothing ->
() -> STM m ()
forall a. a -> STM m a
forall (f :: * -> *) a. Applicative f => a -> f a
pure ()
return (MiniProtocolShutdown miniProtocolNum miniProtocolDirEnum)
jobHandler :: SomeException -> m JobResult
jobHandler :: SomeException -> m JobResult
jobHandler SomeException
e = do
STM m () -> m ()
forall a. HasCallStack => STM m a -> m a
forall (m :: * -> *) a.
(MonadSTM m, HasCallStack) =>
STM m a -> m a
atomically (STM m () -> m ()) -> STM m () -> m ()
forall a b. (a -> b) -> a -> b
$
StrictTMVar m (Either SomeException a)
-> Either SomeException a -> STM m ()
forall (m :: * -> *) a.
MonadSTM m =>
StrictTMVar m a -> a -> STM m ()
putTMVar StrictTMVar m (Either SomeException a)
completionVar (SomeException -> Either SomeException a
forall a b. a -> Either a b
Left SomeException
e)
STM m () -> STM m () -> STM m ()
forall a. STM m a -> STM m a -> STM m a
forall (m :: * -> *) a. MonadSTM m => STM m a -> STM m a -> STM m a
`orElse`
RuntimeError -> STM m ()
forall (m :: * -> *) e a.
(MonadSTM m, MonadThrow (STM m), Exception e) =>
e -> STM m a
throwSTM (MiniProtocolNum -> RuntimeError
BlockedOnCompletionVar MiniProtocolNum
miniProtocolNum)
JobResult -> m JobResult
forall a. a -> m a
forall (m :: * -> *) a. Monad m => a -> m a
return (MiniProtocolNum -> MiniProtocolDir -> SomeException -> JobResult
MiniProtocolException MiniProtocolNum
miniProtocolNum MiniProtocolDir
miniProtocolDirEnum SomeException
e)
miniProtocolDirEnum :: MiniProtocolDir
miniProtocolDirEnum :: MiniProtocolDir
miniProtocolDirEnum = MiniProtocolDirection mode -> MiniProtocolDir
forall (mode :: Mode).
MiniProtocolDirection mode -> MiniProtocolDir
protocolDirEnum MiniProtocolDirection mode
miniProtocolDir
data ControlCmd mode m =
CmdStartProtocolThread
!StartOnDemandOrEagerly
!(MiniProtocolState mode m)
!(MiniProtocolAction m)
| CmdShutdown
data StartOnDemandOrEagerly =
StartEagerly
| StartOnDemand
deriving StartOnDemandOrEagerly -> StartOnDemandOrEagerly -> Bool
(StartOnDemandOrEagerly -> StartOnDemandOrEagerly -> Bool)
-> (StartOnDemandOrEagerly -> StartOnDemandOrEagerly -> Bool)
-> Eq StartOnDemandOrEagerly
forall a. (a -> a -> Bool) -> (a -> a -> Bool) -> Eq a
$c== :: StartOnDemandOrEagerly -> StartOnDemandOrEagerly -> Bool
== :: StartOnDemandOrEagerly -> StartOnDemandOrEagerly -> Bool
$c/= :: StartOnDemandOrEagerly -> StartOnDemandOrEagerly -> Bool
/= :: StartOnDemandOrEagerly -> StartOnDemandOrEagerly -> Bool
Eq
data MiniProtocolAction m where
MiniProtocolAction :: forall m a.
{ ()
miniProtocolAction :: ByteChannel m -> m (a, Maybe BL.ByteString),
()
completionVar :: StrictTMVar m (Either SomeException a)
}
-> MiniProtocolAction m
type MiniProtocolKey = (MiniProtocolNum, MiniProtocolDir)
newtype MonitorCtx m mode = MonitorCtx {
forall (m :: * -> *) (mode :: Mode).
MonitorCtx m mode
-> Map
(MiniProtocolNum, MiniProtocolDir)
(MiniProtocolState mode m, MiniProtocolAction m)
mcOnDemandProtocols :: Map MiniProtocolKey
(MiniProtocolState mode m, MiniProtocolAction m)
}
monitor :: forall mode m.
( MonadAsync m
, MonadMask m
, Alternative (STM m)
, MonadThrow (STM m)
)
=> Tracer m Trace
-> TimeoutFn m
-> JobPool.JobPool Group m JobResult
-> EgressQueue m
-> StrictTQueue m (ControlCmd mode m)
-> StrictTVar m Status
-> m ()
monitor :: forall (mode :: Mode) (m :: * -> *).
(MonadAsync m, MonadMask m, Alternative (STM m),
MonadThrow (STM m)) =>
Tracer m Trace
-> TimeoutFn m
-> JobPool Group m JobResult
-> EgressQueue m
-> StrictTQueue m (ControlCmd mode m)
-> StrictTVar m Status
-> m ()
monitor Tracer m Trace
tracer TimeoutFn m
timeout JobPool Group m JobResult
jobpool EgressQueue m
egressQueue StrictTQueue m (ControlCmd mode m)
cmdQueue StrictTVar m Status
muxStatus =
MonitorCtx m mode -> m ()
go (Map
(MiniProtocolNum, MiniProtocolDir)
(MiniProtocolState mode m, MiniProtocolAction m)
-> MonitorCtx m mode
forall (m :: * -> *) (mode :: Mode).
Map
(MiniProtocolNum, MiniProtocolDir)
(MiniProtocolState mode m, MiniProtocolAction m)
-> MonitorCtx m mode
MonitorCtx Map
(MiniProtocolNum, MiniProtocolDir)
(MiniProtocolState mode m, MiniProtocolAction m)
forall k a. Map k a
Map.empty)
where
go :: MonitorCtx m mode -> m ()
go :: MonitorCtx m mode -> m ()
go !monitorCtx :: MonitorCtx m mode
monitorCtx@MonitorCtx { Map
(MiniProtocolNum, MiniProtocolDir)
(MiniProtocolState mode m, MiniProtocolAction m)
mcOnDemandProtocols :: forall (m :: * -> *) (mode :: Mode).
MonitorCtx m mode
-> Map
(MiniProtocolNum, MiniProtocolDir)
(MiniProtocolState mode m, MiniProtocolAction m)
mcOnDemandProtocols :: Map
(MiniProtocolNum, MiniProtocolDir)
(MiniProtocolState mode m, MiniProtocolAction m)
mcOnDemandProtocols } = do
result <- STM m (MonitorEvent mode m) -> m (MonitorEvent mode m)
forall a. HasCallStack => STM m a -> m a
forall (m :: * -> *) a.
(MonadSTM m, HasCallStack) =>
STM m a -> m a
atomically (STM m (MonitorEvent mode m) -> m (MonitorEvent mode m))
-> STM m (MonitorEvent mode m) -> m (MonitorEvent mode m)
forall a b. (a -> b) -> a -> b
$ FirstToFinish (STM m) (MonitorEvent mode m)
-> STM m (MonitorEvent mode m)
forall (m :: * -> *) a. FirstToFinish m a -> m a
runFirstToFinish (FirstToFinish (STM m) (MonitorEvent mode m)
-> STM m (MonitorEvent mode m))
-> FirstToFinish (STM m) (MonitorEvent mode m)
-> STM m (MonitorEvent mode m)
forall a b. (a -> b) -> a -> b
$
STM m (MonitorEvent mode m)
-> FirstToFinish (STM m) (MonitorEvent mode m)
forall (m :: * -> *) a. m a -> FirstToFinish m a
FirstToFinish (JobResult -> MonitorEvent mode m
forall (mode :: Mode) (m :: * -> *).
JobResult -> MonitorEvent mode m
EventJobResult (JobResult -> MonitorEvent mode m)
-> STM m JobResult -> STM m (MonitorEvent mode m)
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> JobPool Group m JobResult -> STM m JobResult
forall (m :: * -> *) group a.
MonadSTM m =>
JobPool group m a -> STM m a
JobPool.waitForJob JobPool Group m JobResult
jobpool)
FirstToFinish (STM m) (MonitorEvent mode m)
-> FirstToFinish (STM m) (MonitorEvent mode m)
-> FirstToFinish (STM m) (MonitorEvent mode m)
forall a. Semigroup a => a -> a -> a
<> STM m (MonitorEvent mode m)
-> FirstToFinish (STM m) (MonitorEvent mode m)
forall (m :: * -> *) a. m a -> FirstToFinish m a
FirstToFinish (ControlCmd mode m -> MonitorEvent mode m
forall (mode :: Mode) (m :: * -> *).
ControlCmd mode m -> MonitorEvent mode m
EventControlCmd (ControlCmd mode m -> MonitorEvent mode m)
-> STM m (ControlCmd mode m) -> STM m (MonitorEvent mode m)
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> StrictTQueue m (ControlCmd mode m) -> STM m (ControlCmd mode m)
forall (m :: * -> *) a. MonadSTM m => StrictTQueue m a -> STM m a
readTQueue StrictTQueue m (ControlCmd mode m)
cmdQueue)
FirstToFinish (STM m) (MonitorEvent mode m)
-> FirstToFinish (STM m) (MonitorEvent mode m)
-> FirstToFinish (STM m) (MonitorEvent mode m)
forall a. Semigroup a => a -> a -> a
<> ((MiniProtocolState mode m, MiniProtocolAction m)
-> FirstToFinish (STM m) (MonitorEvent mode m))
-> Map
(MiniProtocolNum, MiniProtocolDir)
(MiniProtocolState mode m, MiniProtocolAction m)
-> FirstToFinish (STM m) (MonitorEvent mode m)
forall m a.
Monoid m =>
(a -> m) -> Map (MiniProtocolNum, MiniProtocolDir) a -> m
forall (t :: * -> *) m a.
(Foldable t, Monoid m) =>
(a -> m) -> t a -> m
foldMap
(\(MiniProtocolState mode m
ptclState, MiniProtocolAction m
ptclAction) ->
STM m (MonitorEvent mode m)
-> FirstToFinish (STM m) (MonitorEvent mode m)
forall (m :: * -> *) a. m a -> FirstToFinish m a
FirstToFinish (STM m (MonitorEvent mode m)
-> FirstToFinish (STM m) (MonitorEvent mode m))
-> STM m (MonitorEvent mode m)
-> FirstToFinish (STM m) (MonitorEvent mode m)
forall a b. (a -> b) -> a -> b
$ do
IngressQueue m -> STM m ()
checkNonEmptyQueue (MiniProtocolState mode m -> IngressQueue m
forall (mode :: Mode) (m :: * -> *).
MiniProtocolState mode m -> IngressQueue m
miniProtocolIngressQueue MiniProtocolState mode m
ptclState)
MonitorEvent mode m -> STM m (MonitorEvent mode m)
forall a. a -> STM m a
forall (m :: * -> *) a. Monad m => a -> m a
return (MiniProtocolState mode m
-> MiniProtocolAction m -> MonitorEvent mode m
forall (mode :: Mode) (m :: * -> *).
MiniProtocolState mode m
-> MiniProtocolAction m -> MonitorEvent mode m
EventStartOnDemand MiniProtocolState mode m
ptclState MiniProtocolAction m
ptclAction)
)
Map
(MiniProtocolNum, MiniProtocolDir)
(MiniProtocolState mode m, MiniProtocolAction m)
mcOnDemandProtocols
case result of
EventJobResult (MiniProtocolShutdown MiniProtocolNum
pnum MiniProtocolDir
pmode) -> do
Tracer m Trace -> Trace -> m ()
forall (m :: * -> *) a. Tracer m a -> a -> m ()
traceWith Tracer m Trace
tracer (MiniProtocolNum -> MiniProtocolDir -> Trace
TraceCleanExit MiniProtocolNum
pnum MiniProtocolDir
pmode)
MonitorCtx m mode -> m ()
go MonitorCtx m mode
monitorCtx
EventJobResult (MiniProtocolException MiniProtocolNum
pnum MiniProtocolDir
pmode SomeException
e) -> do
Tracer m Trace -> Trace -> m ()
forall (m :: * -> *) a. Tracer m a -> a -> m ()
traceWith Tracer m Trace
tracer (BearerState -> Trace
TraceState BearerState
Dead)
Tracer m Trace -> Trace -> m ()
forall (m :: * -> *) a. Tracer m a -> a -> m ()
traceWith Tracer m Trace
tracer (MiniProtocolNum -> MiniProtocolDir -> SomeException -> Trace
TraceExceptionExit MiniProtocolNum
pnum MiniProtocolDir
pmode SomeException
e)
STM m () -> m ()
forall a. HasCallStack => STM m a -> m a
forall (m :: * -> *) a.
(MonadSTM m, HasCallStack) =>
STM m a -> m a
atomically (STM m () -> m ()) -> STM m () -> m ()
forall a b. (a -> b) -> a -> b
$ StrictTVar m Status -> Status -> STM m ()
forall (m :: * -> *) a.
MonadSTM m =>
StrictTVar m a -> a -> STM m ()
writeTVar StrictTVar m Status
muxStatus (Status -> STM m ()) -> Status -> STM m ()
forall a b. (a -> b) -> a -> b
$ SomeException -> Status
Failed SomeException
e
SomeException -> m ()
forall e a. Exception e => e -> m a
forall (m :: * -> *) e a. (MonadThrow m, Exception e) => e -> m a
throwIO SomeException
e
EventJobResult (MuxerException SomeException
e) -> do
Tracer m Trace -> Trace -> m ()
forall (m :: * -> *) a. Tracer m a -> a -> m ()
traceWith Tracer m Trace
tracer (BearerState -> Trace
TraceState BearerState
Dead)
STM m () -> m ()
forall a. HasCallStack => STM m a -> m a
forall (m :: * -> *) a.
(MonadSTM m, HasCallStack) =>
STM m a -> m a
atomically (STM m () -> m ()) -> STM m () -> m ()
forall a b. (a -> b) -> a -> b
$ StrictTVar m Status -> Status -> STM m ()
forall (m :: * -> *) a.
MonadSTM m =>
StrictTVar m a -> a -> STM m ()
writeTVar StrictTVar m Status
muxStatus (Status -> STM m ()) -> Status -> STM m ()
forall a b. (a -> b) -> a -> b
$ SomeException -> Status
Failed SomeException
e
SomeException -> m ()
forall e a. Exception e => e -> m a
forall (m :: * -> *) e a. (MonadThrow m, Exception e) => e -> m a
throwIO SomeException
e
EventJobResult (DemuxerException SomeException
e) -> do
Tracer m Trace -> Trace -> m ()
forall (m :: * -> *) a. Tracer m a -> a -> m ()
traceWith Tracer m Trace
tracer (BearerState -> Trace
TraceState BearerState
Dead)
r <- STM m Bool -> m Bool
forall a. HasCallStack => STM m a -> m a
forall (m :: * -> *) a.
(MonadSTM m, HasCallStack) =>
STM m a -> m a
atomically (STM m Bool -> m Bool) -> STM m Bool -> m Bool
forall a b. (a -> b) -> a -> b
$ do
size <- JobPool Group m JobResult -> Group -> STM m Int
forall (m :: * -> *) group a.
(MonadSTM m, Eq group) =>
JobPool group m a -> group -> STM m Int
JobPool.readGroupSize JobPool Group m JobResult
jobpool Group
MiniProtocolJob
case size of
Int
0 | Just BearerClosed {} <- SomeException -> Maybe Error
forall e. Exception e => SomeException -> Maybe e
fromException SomeException
e
-> StrictTVar m Status -> Status -> STM m ()
forall (m :: * -> *) a.
MonadSTM m =>
StrictTVar m a -> a -> STM m ()
writeTVar StrictTVar m Status
muxStatus Status
Stopped
STM m () -> STM m Bool -> STM m Bool
forall a b. STM m a -> STM m b -> STM m b
forall (m :: * -> *) a b. Monad m => m a -> m b -> m b
>> Bool -> STM m Bool
forall a. a -> STM m a
forall (m :: * -> *) a. Monad m => a -> m a
return Bool
True
Int
_ -> StrictTVar m Status -> Status -> STM m ()
forall (m :: * -> *) a.
MonadSTM m =>
StrictTVar m a -> a -> STM m ()
writeTVar StrictTVar m Status
muxStatus (SomeException -> Status
Failed SomeException
e)
STM m () -> STM m Bool -> STM m Bool
forall a b. STM m a -> STM m b -> STM m b
forall (m :: * -> *) a b. Monad m => m a -> m b -> m b
>> Bool -> STM m Bool
forall a. a -> STM m a
forall (m :: * -> *) a. Monad m => a -> m a
return Bool
False
unless r (throwIO e)
EventControlCmd (CmdStartProtocolThread
StartOnDemandOrEagerly
StartEagerly
ptclState :: MiniProtocolState mode m
ptclState@MiniProtocolState {
miniProtocolInfo :: forall (mode :: Mode) (m :: * -> *).
MiniProtocolState mode m -> MiniProtocolInfo mode
miniProtocolInfo = MiniProtocolInfo {
MiniProtocolNum
miniProtocolNum :: forall (mode :: Mode). MiniProtocolInfo mode -> MiniProtocolNum
miniProtocolNum :: MiniProtocolNum
miniProtocolNum,
MiniProtocolDirection mode
miniProtocolDir :: forall (mode :: Mode).
MiniProtocolInfo mode -> MiniProtocolDirection mode
miniProtocolDir :: MiniProtocolDirection mode
miniProtocolDir
}
}
MiniProtocolAction m
ptclAction) -> do
Tracer m Trace -> Trace -> m ()
forall (m :: * -> *) a. Tracer m a -> a -> m ()
traceWith Tracer m Trace
tracer (MiniProtocolNum -> MiniProtocolDir -> Trace
TraceStartEagerly MiniProtocolNum
miniProtocolNum
(MiniProtocolDirection mode -> MiniProtocolDir
forall (mode :: Mode).
MiniProtocolDirection mode -> MiniProtocolDir
protocolDirEnum MiniProtocolDirection mode
miniProtocolDir))
JobPool Group m JobResult -> Job Group m JobResult -> m ()
forall group (m :: * -> *) a.
(MonadAsync m, MonadMask m, Ord group) =>
JobPool group m a -> Job group m a -> m ()
JobPool.forkJob JobPool Group m JobResult
jobpool (Job Group m JobResult -> m ()) -> Job Group m JobResult -> m ()
forall a b. (a -> b) -> a -> b
$
Tracer m Trace
-> EgressQueue m
-> MiniProtocolState mode m
-> MiniProtocolAction m
-> Job Group m JobResult
forall (mode :: Mode) (m :: * -> *).
(MonadSTM m, MonadThread m, MonadThrow (STM m)) =>
Tracer m Trace
-> EgressQueue m
-> MiniProtocolState mode m
-> MiniProtocolAction m
-> Job Group m JobResult
miniProtocolJob
Tracer m Trace
tracer
EgressQueue m
egressQueue
MiniProtocolState mode m
ptclState
MiniProtocolAction m
ptclAction
MonitorCtx m mode -> m ()
go MonitorCtx m mode
monitorCtx
EventControlCmd (CmdStartProtocolThread
StartOnDemandOrEagerly
StartOnDemand
ptclState :: MiniProtocolState mode m
ptclState@MiniProtocolState {
miniProtocolInfo :: forall (mode :: Mode) (m :: * -> *).
MiniProtocolState mode m -> MiniProtocolInfo mode
miniProtocolInfo = MiniProtocolInfo {
MiniProtocolNum
miniProtocolNum :: forall (mode :: Mode). MiniProtocolInfo mode -> MiniProtocolNum
miniProtocolNum :: MiniProtocolNum
miniProtocolNum,
MiniProtocolDirection mode
miniProtocolDir :: forall (mode :: Mode).
MiniProtocolInfo mode -> MiniProtocolDirection mode
miniProtocolDir :: MiniProtocolDirection mode
miniProtocolDir
}
}
MiniProtocolAction m
ptclAction) -> do
let monitorCtx' :: MonitorCtx m mode
monitorCtx' = MonitorCtx m mode
monitorCtx { mcOnDemandProtocols =
Map.insert (protocolKey ptclState)
(ptclState, ptclAction)
mcOnDemandProtocols
}
Tracer m Trace -> Trace -> m ()
forall (m :: * -> *) a. Tracer m a -> a -> m ()
traceWith Tracer m Trace
tracer (MiniProtocolNum -> MiniProtocolDir -> Trace
TraceStartedOnDemand MiniProtocolNum
miniProtocolNum
(MiniProtocolDirection mode -> MiniProtocolDir
forall (mode :: Mode).
MiniProtocolDirection mode -> MiniProtocolDir
protocolDirEnum MiniProtocolDirection mode
miniProtocolDir))
MonitorCtx m mode -> m ()
go MonitorCtx m mode
monitorCtx'
EventControlCmd ControlCmd mode m
CmdShutdown -> do
Tracer m Trace -> Trace -> m ()
forall (m :: * -> *) a. Tracer m a -> a -> m ()
traceWith Tracer m Trace
tracer Trace
TraceStopping
STM m () -> m ()
forall a. HasCallStack => STM m a -> m a
forall (m :: * -> *) a.
(MonadSTM m, HasCallStack) =>
STM m a -> m a
atomically (STM m () -> m ()) -> STM m () -> m ()
forall a b. (a -> b) -> a -> b
$ StrictTVar m Status -> Status -> STM m ()
forall (m :: * -> *) a.
MonadSTM m =>
StrictTVar m a -> a -> STM m ()
writeTVar StrictTVar m Status
muxStatus Status
Stopping
JobPool Group m JobResult -> Group -> m ()
forall (m :: * -> *) group a.
(MonadAsync m, Eq group) =>
JobPool group m a -> group -> m ()
JobPool.cancelGroup JobPool Group m JobResult
jobpool Group
MiniProtocolJob
_ <- DiffTime -> m () -> m (Maybe ())
TimeoutFn m
timeout DiffTime
2 (m () -> m (Maybe ())) -> m () -> m (Maybe ())
forall a b. (a -> b) -> a -> b
$
STM m () -> m ()
forall a. HasCallStack => STM m a -> m a
forall (m :: * -> *) a.
(MonadSTM m, HasCallStack) =>
STM m a -> m a
atomically (STM m () -> m ()) -> STM m () -> m ()
forall a b. (a -> b) -> a -> b
$
EgressQueue m -> STM m (Maybe (TranslocationServiceRequest m))
forall (m :: * -> *) a.
MonadSTM m =>
StrictTBQueue m a -> STM m (Maybe a)
tryPeekTBQueue EgressQueue m
egressQueue
STM m (Maybe (TranslocationServiceRequest m))
-> (Maybe (TranslocationServiceRequest m) -> STM m ()) -> STM m ()
forall a b. STM m a -> (a -> STM m b) -> STM m b
forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= Bool -> STM m ()
forall (m :: * -> *). MonadSTM m => Bool -> STM m ()
check (Bool -> STM m ())
-> (Maybe (TranslocationServiceRequest m) -> Bool)
-> Maybe (TranslocationServiceRequest m)
-> STM m ()
forall b c a. (b -> c) -> (a -> b) -> a -> c
. Maybe (TranslocationServiceRequest m) -> Bool
forall a. Maybe a -> Bool
isNothing
atomically $ writeTVar muxStatus Stopped
traceWith tracer TraceStopped
EventStartOnDemand ptclState :: MiniProtocolState mode m
ptclState@MiniProtocolState {
miniProtocolInfo :: forall (mode :: Mode) (m :: * -> *).
MiniProtocolState mode m -> MiniProtocolInfo mode
miniProtocolInfo = MiniProtocolInfo {
MiniProtocolNum
miniProtocolNum :: forall (mode :: Mode). MiniProtocolInfo mode -> MiniProtocolNum
miniProtocolNum :: MiniProtocolNum
miniProtocolNum,
MiniProtocolDirection mode
miniProtocolDir :: forall (mode :: Mode).
MiniProtocolInfo mode -> MiniProtocolDirection mode
miniProtocolDir :: MiniProtocolDirection mode
miniProtocolDir
},
StrictTVar m MiniProtocolStatus
miniProtocolStatusVar :: forall (mode :: Mode) (m :: * -> *).
MiniProtocolState mode m -> StrictTVar m MiniProtocolStatus
miniProtocolStatusVar :: StrictTVar m MiniProtocolStatus
miniProtocolStatusVar
}
MiniProtocolAction m
ptclAction -> do
Tracer m Trace -> Trace -> m ()
forall (m :: * -> *) a. Tracer m a -> a -> m ()
traceWith Tracer m Trace
tracer (MiniProtocolNum -> MiniProtocolDir -> Trace
TraceStartOnDemand MiniProtocolNum
miniProtocolNum
(MiniProtocolDirection mode -> MiniProtocolDir
forall (mode :: Mode).
MiniProtocolDirection mode -> MiniProtocolDir
protocolDirEnum MiniProtocolDirection mode
miniProtocolDir))
STM m () -> m ()
forall a. HasCallStack => STM m a -> m a
forall (m :: * -> *) a.
(MonadSTM m, HasCallStack) =>
STM m a -> m a
atomically (STM m () -> m ()) -> STM m () -> m ()
forall a b. (a -> b) -> a -> b
$ StrictTVar m MiniProtocolStatus -> MiniProtocolStatus -> STM m ()
forall (m :: * -> *) a.
MonadSTM m =>
StrictTVar m a -> a -> STM m ()
writeTVar StrictTVar m MiniProtocolStatus
miniProtocolStatusVar MiniProtocolStatus
StatusRunning
JobPool Group m JobResult -> Job Group m JobResult -> m ()
forall group (m :: * -> *) a.
(MonadAsync m, MonadMask m, Ord group) =>
JobPool group m a -> Job group m a -> m ()
JobPool.forkJob JobPool Group m JobResult
jobpool (Job Group m JobResult -> m ()) -> Job Group m JobResult -> m ()
forall a b. (a -> b) -> a -> b
$
Tracer m Trace
-> EgressQueue m
-> MiniProtocolState mode m
-> MiniProtocolAction m
-> Job Group m JobResult
forall (mode :: Mode) (m :: * -> *).
(MonadSTM m, MonadThread m, MonadThrow (STM m)) =>
Tracer m Trace
-> EgressQueue m
-> MiniProtocolState mode m
-> MiniProtocolAction m
-> Job Group m JobResult
miniProtocolJob
Tracer m Trace
tracer
EgressQueue m
egressQueue
MiniProtocolState mode m
ptclState
MiniProtocolAction m
ptclAction
let ptclKey :: (MiniProtocolNum, MiniProtocolDir)
ptclKey = MiniProtocolState mode m -> (MiniProtocolNum, MiniProtocolDir)
protocolKey MiniProtocolState mode m
ptclState
monitorCtx' :: MonitorCtx m mode
monitorCtx' = MonitorCtx m mode
monitorCtx { mcOnDemandProtocols =
Map.delete ptclKey
mcOnDemandProtocols
}
MonitorCtx m mode -> m ()
go MonitorCtx m mode
monitorCtx'
checkNonEmptyQueue :: IngressQueue m -> STM m ()
checkNonEmptyQueue :: IngressQueue m -> STM m ()
checkNonEmptyQueue IngressQueue m
q = do
buf <- IngressQueue m -> STM m ByteString
forall (m :: * -> *) a. MonadSTM m => StrictTVar m a -> STM m a
readTVar IngressQueue m
q
check (not (BL.null buf))
protocolKey :: MiniProtocolState mode m -> MiniProtocolKey
protocolKey :: MiniProtocolState mode m -> (MiniProtocolNum, MiniProtocolDir)
protocolKey MiniProtocolState {
miniProtocolInfo :: forall (mode :: Mode) (m :: * -> *).
MiniProtocolState mode m -> MiniProtocolInfo mode
miniProtocolInfo = MiniProtocolInfo {
MiniProtocolNum
miniProtocolNum :: forall (mode :: Mode). MiniProtocolInfo mode -> MiniProtocolNum
miniProtocolNum :: MiniProtocolNum
miniProtocolNum,
MiniProtocolDirection mode
miniProtocolDir :: forall (mode :: Mode).
MiniProtocolInfo mode -> MiniProtocolDirection mode
miniProtocolDir :: MiniProtocolDirection mode
miniProtocolDir
}
} =
(MiniProtocolNum
miniProtocolNum, MiniProtocolDirection mode -> MiniProtocolDir
forall (mode :: Mode).
MiniProtocolDirection mode -> MiniProtocolDir
protocolDirEnum MiniProtocolDirection mode
miniProtocolDir)
data MonitorEvent mode m =
EventJobResult JobResult
| EventControlCmd (ControlCmd mode m)
| EventStartOnDemand (MiniProtocolState mode m)
(MiniProtocolAction m)
data JobResult =
MiniProtocolShutdown MiniProtocolNum MiniProtocolDir
| MiniProtocolException MiniProtocolNum MiniProtocolDir SomeException
| MuxerException SomeException
| DemuxerException SomeException
muxChannel
:: forall m.
( MonadSTM m
)
=> Tracer m Trace
-> EgressQueue m
-> Wanton m
-> MiniProtocolNum
-> MiniProtocolDir
-> IngressQueue m
-> ByteChannel m
muxChannel :: forall (m :: * -> *).
MonadSTM m =>
Tracer m Trace
-> EgressQueue m
-> Wanton m
-> MiniProtocolNum
-> MiniProtocolDir
-> IngressQueue m
-> ByteChannel m
muxChannel Tracer m Trace
tracer EgressQueue m
egressQueue want :: Wanton m
want@(Wanton StrictTVar m ByteString
w) MiniProtocolNum
mc MiniProtocolDir
md StrictTVar m ByteString
q =
Channel { ByteString -> m ()
send :: ByteString -> m ()
send :: ByteString -> m ()
send, m (Maybe ByteString)
recv :: m (Maybe ByteString)
recv :: m (Maybe ByteString)
recv}
where
perMiniProtocolBufferSize :: Int64
perMiniProtocolBufferSize :: Int64
perMiniProtocolBufferSize = Int64
0x3ffff
send :: BL.ByteString -> m ()
send :: ByteString -> m ()
send ByteString
encoding = do
Tracer m Trace -> Trace -> m ()
forall (m :: * -> *) a. Tracer m a -> a -> m ()
traceWith Tracer m Trace
tracer (Trace -> m ()) -> Trace -> m ()
forall a b. (a -> b) -> a -> b
$ MiniProtocolNum -> Int -> Trace
TraceChannelSendStart MiniProtocolNum
mc (Int64 -> Int
forall a b. (Integral a, Num b) => a -> b
fromIntegral (Int64 -> Int) -> Int64 -> Int
forall a b. (a -> b) -> a -> b
$ ByteString -> Int64
BL.length ByteString
encoding)
STM m () -> m ()
forall a. HasCallStack => STM m a -> m a
forall (m :: * -> *) a.
(MonadSTM m, HasCallStack) =>
STM m a -> m a
atomically (STM m () -> m ()) -> STM m () -> m ()
forall a b. (a -> b) -> a -> b
$ do
buf <- StrictTVar m ByteString -> STM m ByteString
forall (m :: * -> *) a. MonadSTM m => StrictTVar m a -> STM m a
readTVar StrictTVar m ByteString
w
if BL.length buf < perMiniProtocolBufferSize
then do
let wasEmpty = ByteString -> Bool
BL.null ByteString
buf
writeTVar w (BL.append buf encoding)
when wasEmpty $
writeTBQueue egressQueue (TLSRDemand mc md want)
else retry
Tracer m Trace -> Trace -> m ()
forall (m :: * -> *) a. Tracer m a -> a -> m ()
traceWith Tracer m Trace
tracer (Trace -> m ()) -> Trace -> m ()
forall a b. (a -> b) -> a -> b
$ MiniProtocolNum -> Trace
TraceChannelSendEnd MiniProtocolNum
mc
recv :: m (Maybe BL.ByteString)
recv :: m (Maybe ByteString)
recv = do
Tracer m Trace -> Trace -> m ()
forall (m :: * -> *) a. Tracer m a -> a -> m ()
traceWith Tracer m Trace
tracer (Trace -> m ()) -> Trace -> m ()
forall a b. (a -> b) -> a -> b
$ MiniProtocolNum -> Trace
TraceChannelRecvStart MiniProtocolNum
mc
blob <- STM m ByteString -> m ByteString
forall a. HasCallStack => STM m a -> m a
forall (m :: * -> *) a.
(MonadSTM m, HasCallStack) =>
STM m a -> m a
atomically (STM m ByteString -> m ByteString)
-> STM m ByteString -> m ByteString
forall a b. (a -> b) -> a -> b
$ do
blob <- StrictTVar m ByteString -> STM m ByteString
forall (m :: * -> *) a. MonadSTM m => StrictTVar m a -> STM m a
readTVar StrictTVar m ByteString
q
if blob == BL.empty
then retry
else writeTVar q BL.empty >> return blob
traceWith tracer $ TraceChannelRecvEnd mc (fromIntegral $ BL.length blob)
return $ Just blob
traceBearerState :: Tracer m Trace -> BearerState -> m ()
traceBearerState :: forall (m :: * -> *). Tracer m Trace -> BearerState -> m ()
traceBearerState Tracer m Trace
tracer BearerState
state =
Tracer m Trace -> Trace -> m ()
forall (m :: * -> *) a. Tracer m a -> a -> m ()
traceWith Tracer m Trace
tracer (BearerState -> Trace
TraceState BearerState
state)
runMiniProtocol :: forall mode m a.
( Alternative (STM m)
, MonadSTM m
, MonadThrow m
, MonadThrow (STM m)
)
=> Mux mode m
-> MiniProtocolNum
-> MiniProtocolDirection mode
-> StartOnDemandOrEagerly
-> (ByteChannel m -> m (a, Maybe BL.ByteString))
-> m (STM m (Either SomeException a))
runMiniProtocol :: forall (mode :: Mode) (m :: * -> *) a.
(Alternative (STM m), MonadSTM m, MonadThrow m,
MonadThrow (STM m)) =>
Mux mode m
-> MiniProtocolNum
-> MiniProtocolDirection mode
-> StartOnDemandOrEagerly
-> (ByteChannel m -> m (a, Maybe ByteString))
-> m (STM m (Either SomeException a))
runMiniProtocol Mux { Map (MiniProtocolNum, MiniProtocolDir) (MiniProtocolState mode m)
muxMiniProtocols :: forall (mode :: Mode) (m :: * -> *).
Mux mode m
-> Map
(MiniProtocolNum, MiniProtocolDir) (MiniProtocolState mode m)
muxMiniProtocols :: Map (MiniProtocolNum, MiniProtocolDir) (MiniProtocolState mode m)
muxMiniProtocols, StrictTQueue m (ControlCmd mode m)
muxControlCmdQueue :: forall (mode :: Mode) (m :: * -> *).
Mux mode m -> StrictTQueue m (ControlCmd mode m)
muxControlCmdQueue :: StrictTQueue m (ControlCmd mode m)
muxControlCmdQueue , StrictTVar m Status
muxStatus :: forall (mode :: Mode) (m :: * -> *).
Mux mode m -> StrictTVar m Status
muxStatus :: StrictTVar m Status
muxStatus}
MiniProtocolNum
ptclNum MiniProtocolDirection mode
ptclDir StartOnDemandOrEagerly
startMode ByteChannel m -> m (a, Maybe ByteString)
protocolAction
| Just ptclState :: MiniProtocolState mode m
ptclState@MiniProtocolState{StrictTVar m MiniProtocolStatus
miniProtocolStatusVar :: forall (mode :: Mode) (m :: * -> *).
MiniProtocolState mode m -> StrictTVar m MiniProtocolStatus
miniProtocolStatusVar :: StrictTVar m MiniProtocolStatus
miniProtocolStatusVar}
<- (MiniProtocolNum, MiniProtocolDir)
-> Map
(MiniProtocolNum, MiniProtocolDir) (MiniProtocolState mode m)
-> Maybe (MiniProtocolState mode m)
forall k a. Ord k => k -> Map k a -> Maybe a
Map.lookup (MiniProtocolNum
ptclNum, MiniProtocolDir
ptclDir') Map (MiniProtocolNum, MiniProtocolDir) (MiniProtocolState mode m)
muxMiniProtocols
= STM m (STM m (Either SomeException a))
-> m (STM m (Either SomeException a))
forall a. HasCallStack => STM m a -> m a
forall (m :: * -> *) a.
(MonadSTM m, HasCallStack) =>
STM m a -> m a
atomically (STM m (STM m (Either SomeException a))
-> m (STM m (Either SomeException a)))
-> STM m (STM m (Either SomeException a))
-> m (STM m (Either SomeException a))
forall a b. (a -> b) -> a -> b
$ do
st <- StrictTVar m Status -> STM m Status
forall (m :: * -> *) a. MonadSTM m => StrictTVar m a -> STM m a
readTVar StrictTVar m Status
muxStatus
case st of
Status
Stopping -> Error -> STM m ()
forall (m :: * -> *) e a.
(MonadSTM m, MonadThrow (STM m), Exception e) =>
e -> STM m a
throwSTM (Maybe SomeException -> Status -> Error
Shutdown Maybe SomeException
forall a. Maybe a
Nothing Status
st)
Status
Stopped -> Error -> STM m ()
forall (m :: * -> *) e a.
(MonadSTM m, MonadThrow (STM m), Exception e) =>
e -> STM m a
throwSTM (Maybe SomeException -> Status -> Error
Shutdown Maybe SomeException
forall a. Maybe a
Nothing Status
st)
Status
_ -> () -> STM m ()
forall a. a -> STM m a
forall (m :: * -> *) a. Monad m => a -> m a
return ()
status <- readTVar miniProtocolStatusVar
unless (status == StatusIdle) $
throwSTM (ProtocolAlreadyRunning ptclNum ptclDir' status)
let !status' = case StartOnDemandOrEagerly
startMode of
StartOnDemandOrEagerly
StartOnDemand -> MiniProtocolStatus
StatusStartOnDemand
StartOnDemandOrEagerly
StartEagerly -> MiniProtocolStatus
StatusRunning
writeTVar miniProtocolStatusVar status'
completionVar <- newEmptyTMVar
writeTQueue muxControlCmdQueue $
CmdStartProtocolThread
startMode
ptclState
(MiniProtocolAction protocolAction completionVar)
return $ completionAction completionVar
| Bool
otherwise
= RuntimeError -> m (STM m (Either SomeException a))
forall e a. Exception e => e -> m a
forall (m :: * -> *) e a. (MonadThrow m, Exception e) => e -> m a
throwIO (MiniProtocolNum -> MiniProtocolDir -> RuntimeError
UnknownProtocolInternalError MiniProtocolNum
ptclNum MiniProtocolDir
ptclDir')
where
ptclDir' :: MiniProtocolDir
ptclDir' = MiniProtocolDirection mode -> MiniProtocolDir
forall (mode :: Mode).
MiniProtocolDirection mode -> MiniProtocolDir
protocolDirEnum MiniProtocolDirection mode
ptclDir
completionAction :: StrictTMVar m (Either SomeException a)
-> STM m (Either SomeException a)
completionAction StrictTMVar m (Either SomeException a)
completionVar = do
st <- StrictTVar m Status -> STM m Status
forall (m :: * -> *) a. MonadSTM m => StrictTVar m a -> STM m a
readTVar StrictTVar m Status
muxStatus
case st of
Status
Ready -> StrictTMVar m (Either SomeException a)
-> STM m (Either SomeException a)
forall (m :: * -> *) a. MonadSTM m => StrictTMVar m a -> STM m a
readTMVar StrictTMVar m (Either SomeException a)
completionVar
Status
Stopping -> StrictTMVar m (Either SomeException a)
-> STM m (Either SomeException a)
forall (m :: * -> *) a. MonadSTM m => StrictTMVar m a -> STM m a
readTMVar StrictTMVar m (Either SomeException a)
completionVar
STM m (Either SomeException a)
-> STM m (Either SomeException a) -> STM m (Either SomeException a)
forall a. STM m a -> STM m a -> STM m a
forall (f :: * -> *) a. Alternative f => f a -> f a -> f a
<|> Either SomeException a -> STM m (Either SomeException a)
forall a. a -> STM m a
forall (m :: * -> *) a. Monad m => a -> m a
return (SomeException -> Either SomeException a
forall a b. a -> Either a b
Left (SomeException -> Either SomeException a)
-> SomeException -> Either SomeException a
forall a b. (a -> b) -> a -> b
$ Error -> SomeException
forall e. Exception e => e -> SomeException
toException (Maybe SomeException -> Status -> Error
Shutdown Maybe SomeException
forall a. Maybe a
Nothing Status
st))
Status
Stopped -> StrictTMVar m (Either SomeException a)
-> STM m (Either SomeException a)
forall (m :: * -> *) a. MonadSTM m => StrictTMVar m a -> STM m a
readTMVar StrictTMVar m (Either SomeException a)
completionVar
STM m (Either SomeException a)
-> STM m (Either SomeException a) -> STM m (Either SomeException a)
forall a. STM m a -> STM m a -> STM m a
forall (f :: * -> *) a. Alternative f => f a -> f a -> f a
<|> Either SomeException a -> STM m (Either SomeException a)
forall a. a -> STM m a
forall (m :: * -> *) a. Monad m => a -> m a
return (SomeException -> Either SomeException a
forall a b. a -> Either a b
Left (SomeException -> Either SomeException a)
-> SomeException -> Either SomeException a
forall a b. (a -> b) -> a -> b
$ Error -> SomeException
forall e. Exception e => e -> SomeException
toException (Maybe SomeException -> Status -> Error
Shutdown Maybe SomeException
forall a. Maybe a
Nothing Status
st))
Failed SomeException
e -> StrictTMVar m (Either SomeException a)
-> STM m (Either SomeException a)
forall (m :: * -> *) a. MonadSTM m => StrictTMVar m a -> STM m a
readTMVar StrictTMVar m (Either SomeException a)
completionVar
STM m (Either SomeException a)
-> STM m (Either SomeException a) -> STM m (Either SomeException a)
forall a. STM m a -> STM m a -> STM m a
forall (f :: * -> *) a. Alternative f => f a -> f a -> f a
<|> Either SomeException a -> STM m (Either SomeException a)
forall a. a -> STM m a
forall (m :: * -> *) a. Monad m => a -> m a
return (SomeException -> Either SomeException a
forall a b. a -> Either a b
Left (SomeException -> Either SomeException a)
-> SomeException -> Either SomeException a
forall a b. (a -> b) -> a -> b
$ Error -> SomeException
forall e. Exception e => e -> SomeException
toException (Maybe SomeException -> Status -> Error
Shutdown (SomeException -> Maybe SomeException
forall a. a -> Maybe a
Just SomeException
e) Status
st))