{-# LANGUAGE BangPatterns              #-}
{-# LANGUAGE DataKinds                 #-}
{-# LANGUAGE ExistentialQuantification #-}
{-# LANGUAGE FlexibleContexts          #-}
{-# LANGUAGE GADTSyntax                #-}
{-# LANGUAGE LambdaCase                #-}
{-# LANGUAGE NamedFieldPuns            #-}
{-# LANGUAGE RankNTypes                #-}
{-# LANGUAGE ScopedTypeVariables       #-}
{-# LANGUAGE TypeFamilies              #-}

-- | Network multiplexer API.
--
-- The module should be imported qualified.
module Network.Mux
  ( -- * Defining 'Mux' protocol bundles
    new
  , Mux
  , Mode (..)
  , HasInitiator
  , HasResponder
  , MiniProtocolInfo (..)
  , MiniProtocolNum (..)
  , MiniProtocolDirection (..)
  , MiniProtocolLimits (..)
    -- * Running the Mux
  , run
  , stop
    -- ** Run a mini-protocol
  , runMiniProtocol
  , StartOnDemandOrEagerly (..)
  , ByteChannel
  , Channel (..)
    -- * Bearer
  , Bearer
  , MakeBearer (..)
  , SDUSize (..)
    -- * Monitoring
  , miniProtocolStateMap
  , stopped
    -- * Errors
  , Error (..)
  , RuntimeError (..)
    -- * Tracing
  , traceBearerState
  , BearerState (..)
  , Trace (..)
  , WithBearer (..)
  ) where

import Data.ByteString.Lazy qualified as BL
import Data.Int (Int64)
import Data.Map (Map)
import Data.Map.Strict qualified as Map
import Data.Maybe (isNothing)
import Data.Monoid.Synchronisation (FirstToFinish (..))

import Control.Applicative
import Control.Concurrent.Class.MonadSTM.Strict
import Control.Concurrent.JobPool qualified as JobPool
import Control.Exception (SomeAsyncException (..))
import Control.Monad
import Control.Monad.Class.MonadAsync
import Control.Monad.Class.MonadFork
import Control.Monad.Class.MonadThrow
import Control.Monad.Class.MonadTimer.SI hiding (timeout)
import Control.Tracer

import Network.Mux.Bearer
import Network.Mux.Channel
import Network.Mux.Egress as Egress
import Network.Mux.Ingress as Ingress
import Network.Mux.Timeout
import Network.Mux.Trace
import Network.Mux.Types


-- | Mux handle which allows to control the multiplexer, e.g.
--
-- * `run`: run the multiplexer
-- * `runMiniProtocol`: start a mini-protocol (eagerly or lazily)
-- * `stop`: stop the multiplexer, causing `run` to return.
--
data Mux (mode :: Mode) m =
     Mux {
       forall (mode :: Mode) (m :: * -> *).
Mux mode m
-> Map
     (MiniProtocolNum, MiniProtocolDir) (MiniProtocolState mode m)
muxMiniProtocols   :: !(Map (MiniProtocolNum, MiniProtocolDir)
                                   (MiniProtocolState mode m)),
       forall (mode :: Mode) (m :: * -> *).
Mux mode m -> StrictTQueue m (ControlCmd mode m)
muxControlCmdQueue :: !(StrictTQueue m (ControlCmd mode m)),
       forall (mode :: Mode) (m :: * -> *).
Mux mode m -> StrictTVar m Status
muxStatus          :: StrictTVar m Status
     }


-- | Get information about all statically registered mini-protocols.
--
miniProtocolStateMap :: MonadSTM m
                     => Mux mode m
                     -> Map (MiniProtocolNum, MiniProtocolDir)
                            (STM m MiniProtocolStatus)
miniProtocolStateMap :: forall (m :: * -> *) (mode :: Mode).
MonadSTM m =>
Mux mode m
-> Map
     (MiniProtocolNum, MiniProtocolDir) (STM m MiniProtocolStatus)
miniProtocolStateMap = (MiniProtocolState mode m -> STM m MiniProtocolStatus)
-> Map
     (MiniProtocolNum, MiniProtocolDir) (MiniProtocolState mode m)
-> Map
     (MiniProtocolNum, MiniProtocolDir) (STM m MiniProtocolStatus)
forall a b.
(a -> b)
-> Map (MiniProtocolNum, MiniProtocolDir) a
-> Map (MiniProtocolNum, MiniProtocolDir) b
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
fmap (StrictTVar m MiniProtocolStatus -> STM m MiniProtocolStatus
forall (m :: * -> *) a. MonadSTM m => StrictTVar m a -> STM m a
readTVar (StrictTVar m MiniProtocolStatus -> STM m MiniProtocolStatus)
-> (MiniProtocolState mode m -> StrictTVar m MiniProtocolStatus)
-> MiniProtocolState mode m
-> STM m MiniProtocolStatus
forall b c a. (b -> c) -> (a -> b) -> a -> c
. MiniProtocolState mode m -> StrictTVar m MiniProtocolStatus
forall (mode :: Mode) (m :: * -> *).
MiniProtocolState mode m -> StrictTVar m MiniProtocolStatus
miniProtocolStatusVar)
                     (Map (MiniProtocolNum, MiniProtocolDir) (MiniProtocolState mode m)
 -> Map
      (MiniProtocolNum, MiniProtocolDir) (STM m MiniProtocolStatus))
-> (Mux mode m
    -> Map
         (MiniProtocolNum, MiniProtocolDir) (MiniProtocolState mode m))
-> Mux mode m
-> Map
     (MiniProtocolNum, MiniProtocolDir) (STM m MiniProtocolStatus)
forall b c a. (b -> c) -> (a -> b) -> a -> c
. Mux mode m
-> Map
     (MiniProtocolNum, MiniProtocolDir) (MiniProtocolState mode m)
forall (mode :: Mode) (m :: * -> *).
Mux mode m
-> Map
     (MiniProtocolNum, MiniProtocolDir) (MiniProtocolState mode m)
muxMiniProtocols

-- | Await until mux stopped.
--
stopped :: MonadSTM m => Mux mode m -> STM m (Maybe SomeException)
stopped :: forall (m :: * -> *) (mode :: Mode).
MonadSTM m =>
Mux mode m -> STM m (Maybe SomeException)
stopped Mux { StrictTVar m Status
muxStatus :: forall (mode :: Mode) (m :: * -> *).
Mux mode m -> StrictTVar m Status
muxStatus :: StrictTVar m Status
muxStatus } =
    StrictTVar m Status -> STM m Status
forall (m :: * -> *) a. MonadSTM m => StrictTVar m a -> STM m a
readTVar StrictTVar m Status
muxStatus STM m Status
-> (Status -> STM m (Maybe SomeException))
-> STM m (Maybe SomeException)
forall a b. STM m a -> (a -> STM m b) -> STM m b
forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= \case
      Status
Ready      -> STM m (Maybe SomeException)
forall a. STM m a
forall (m :: * -> *) a. MonadSTM m => STM m a
retry
      Failed SomeException
err -> Maybe SomeException -> STM m (Maybe SomeException)
forall a. a -> STM m a
forall (m :: * -> *) a. Monad m => a -> m a
return (SomeException -> Maybe SomeException
forall a. a -> Maybe a
Just SomeException
err)
      Status
Stopping   -> STM m (Maybe SomeException)
forall a. STM m a
forall (m :: * -> *) a. MonadSTM m => STM m a
retry
      Status
Stopped    -> Maybe SomeException -> STM m (Maybe SomeException)
forall a. a -> STM m a
forall (m :: * -> *) a. Monad m => a -> m a
return Maybe SomeException
forall a. Maybe a
Nothing


-- | Create a mux handle in `Mode` and register mini-protocols.
--
new :: forall (mode :: Mode) m.
       MonadLabelledSTM m
    => [MiniProtocolInfo mode]
    -- ^ description of protocols run by the mux layer.  Only these protocols
    -- one will be able to execute.
    -> m (Mux mode m)
new :: forall (mode :: Mode) (m :: * -> *).
MonadLabelledSTM m =>
[MiniProtocolInfo mode] -> m (Mux mode m)
new [MiniProtocolInfo mode]
ptcls = do
    muxMiniProtocols   <- [MiniProtocolInfo mode]
-> m (Map
        (MiniProtocolNum, MiniProtocolDir) (MiniProtocolState mode m))
forall (m :: * -> *) (mode :: Mode).
MonadSTM m =>
[MiniProtocolInfo mode]
-> m (Map
        (MiniProtocolNum, MiniProtocolDir) (MiniProtocolState mode m))
mkMiniProtocolStateMap [MiniProtocolInfo mode]
ptcls
    muxControlCmdQueue <- atomically newTQueue
    muxStatus <- newTVarIO Ready
    return Mux {
      muxMiniProtocols,
      muxControlCmdQueue,
      muxStatus
    }

mkMiniProtocolStateMap :: MonadSTM m
                       => [MiniProtocolInfo mode]
                       -> m (Map (MiniProtocolNum, MiniProtocolDir)
                                 (MiniProtocolState mode m))
mkMiniProtocolStateMap :: forall (m :: * -> *) (mode :: Mode).
MonadSTM m =>
[MiniProtocolInfo mode]
-> m (Map
        (MiniProtocolNum, MiniProtocolDir) (MiniProtocolState mode m))
mkMiniProtocolStateMap [MiniProtocolInfo mode]
ptcls =
    [((MiniProtocolNum, MiniProtocolDir), MiniProtocolState mode m)]
-> Map
     (MiniProtocolNum, MiniProtocolDir) (MiniProtocolState mode m)
forall k a. Ord k => [(k, a)] -> Map k a
Map.fromList ([((MiniProtocolNum, MiniProtocolDir), MiniProtocolState mode m)]
 -> Map
      (MiniProtocolNum, MiniProtocolDir) (MiniProtocolState mode m))
-> m [((MiniProtocolNum, MiniProtocolDir),
       MiniProtocolState mode m)]
-> m (Map
        (MiniProtocolNum, MiniProtocolDir) (MiniProtocolState mode m))
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$>
    [m ((MiniProtocolNum, MiniProtocolDir), MiniProtocolState mode m)]
-> m [((MiniProtocolNum, MiniProtocolDir),
       MiniProtocolState mode m)]
forall (t :: * -> *) (m :: * -> *) a.
(Traversable t, Monad m) =>
t (m a) -> m (t a)
forall (m :: * -> *) a. Monad m => [m a] -> m [a]
sequence
      [ do state <- MiniProtocolInfo mode -> m (MiniProtocolState mode m)
forall (m :: * -> *) (mode :: Mode).
MonadSTM m =>
MiniProtocolInfo mode -> m (MiniProtocolState mode m)
mkMiniProtocolState MiniProtocolInfo mode
ptcl
           return ((miniProtocolNum, protocolDirEnum miniProtocolDir), state)
      | ptcl :: MiniProtocolInfo mode
ptcl@MiniProtocolInfo {MiniProtocolNum
miniProtocolNum :: MiniProtocolNum
miniProtocolNum :: forall (mode :: Mode). MiniProtocolInfo mode -> MiniProtocolNum
miniProtocolNum, MiniProtocolDirection mode
miniProtocolDir :: MiniProtocolDirection mode
miniProtocolDir :: forall (mode :: Mode).
MiniProtocolInfo mode -> MiniProtocolDirection mode
miniProtocolDir} <- [MiniProtocolInfo mode]
ptcls ]

mkMiniProtocolState :: MonadSTM m
                    => MiniProtocolInfo mode
                    -> m (MiniProtocolState mode m)
mkMiniProtocolState :: forall (m :: * -> *) (mode :: Mode).
MonadSTM m =>
MiniProtocolInfo mode -> m (MiniProtocolState mode m)
mkMiniProtocolState MiniProtocolInfo mode
miniProtocolInfo = do
    miniProtocolIngressQueue <- ByteString -> m (StrictTVar m ByteString)
forall (m :: * -> *) a. MonadSTM m => a -> m (StrictTVar m a)
newTVarIO ByteString
BL.empty
    miniProtocolStatusVar    <- newTVarIO StatusIdle
    return MiniProtocolState {
       miniProtocolInfo,
       miniProtocolIngressQueue,
       miniProtocolStatusVar
     }

-- | Shut down the mux. This will cause 'run' to return. It does not
-- wait for any protocol threads to finish, so you should do that first if
-- necessary.
--
stop :: MonadSTM m  => Mux mode m -> m ()
stop :: forall (m :: * -> *) (mode :: Mode).
MonadSTM m =>
Mux mode m -> m ()
stop Mux{StrictTQueue m (ControlCmd mode m)
muxControlCmdQueue :: forall (mode :: Mode) (m :: * -> *).
Mux mode m -> StrictTQueue m (ControlCmd mode m)
muxControlCmdQueue :: StrictTQueue m (ControlCmd mode m)
muxControlCmdQueue} =
    STM m () -> m ()
forall a. HasCallStack => STM m a -> m a
forall (m :: * -> *) a.
(MonadSTM m, HasCallStack) =>
STM m a -> m a
atomically (STM m () -> m ()) -> STM m () -> m ()
forall a b. (a -> b) -> a -> b
$ StrictTQueue m (ControlCmd mode m) -> ControlCmd mode m -> STM m ()
forall (m :: * -> *) a.
MonadSTM m =>
StrictTQueue m a -> a -> STM m ()
writeTQueue StrictTQueue m (ControlCmd mode m)
muxControlCmdQueue ControlCmd mode m
forall (mode :: Mode) (m :: * -> *). ControlCmd mode m
CmdShutdown


-- | Mux classification of 'Job's
--
data Group = MuxJob
           | MiniProtocolJob
  deriving (Group -> Group -> Bool
(Group -> Group -> Bool) -> (Group -> Group -> Bool) -> Eq Group
forall a. (a -> a -> Bool) -> (a -> a -> Bool) -> Eq a
$c== :: Group -> Group -> Bool
== :: Group -> Group -> Bool
$c/= :: Group -> Group -> Bool
/= :: Group -> Group -> Bool
Eq, Eq Group
Eq Group =>
(Group -> Group -> Ordering)
-> (Group -> Group -> Bool)
-> (Group -> Group -> Bool)
-> (Group -> Group -> Bool)
-> (Group -> Group -> Bool)
-> (Group -> Group -> Group)
-> (Group -> Group -> Group)
-> Ord Group
Group -> Group -> Bool
Group -> Group -> Ordering
Group -> Group -> Group
forall a.
Eq a =>
(a -> a -> Ordering)
-> (a -> a -> Bool)
-> (a -> a -> Bool)
-> (a -> a -> Bool)
-> (a -> a -> Bool)
-> (a -> a -> a)
-> (a -> a -> a)
-> Ord a
$ccompare :: Group -> Group -> Ordering
compare :: Group -> Group -> Ordering
$c< :: Group -> Group -> Bool
< :: Group -> Group -> Bool
$c<= :: Group -> Group -> Bool
<= :: Group -> Group -> Bool
$c> :: Group -> Group -> Bool
> :: Group -> Group -> Bool
$c>= :: Group -> Group -> Bool
>= :: Group -> Group -> Bool
$cmax :: Group -> Group -> Group
max :: Group -> Group -> Group
$cmin :: Group -> Group -> Group
min :: Group -> Group -> Group
Ord)


-- | run starts a mux bearer for the specified protocols corresponding to
-- one of the provided Versions.
--
-- __Isometric flow control: analysis of head-of-line blocking of the ingress side of the multiplexer__
--
-- For each mini-protocol (enumerated by @ptcl@), mux will create two
-- channels. One for initiator and one for the responder.  Each channel will use
-- a single 'Wanton'.  When it is filled, it is put in a common queue
-- 'tsrQueue'.  This means that the queue is bound by @2 * |ptcl|@.  Every side
-- of a mini-protocol is served by a single 'Wanton': when an application sends
-- data, the channel will try to put it into the 'Wanton' (which might block).
-- 'Wanton's are taken from the 'tsrQueue' queue by one of mux threads.  This
-- eliminates head of line blocking: each mini-protocol thread can block on
-- putting more bytes into its 'Wanton', but it cannot block the other
-- mini-protocols or the thread that is reading the 'tsrQueue' queue.  This is
-- ensured since the 'muxChannel' will put only a non-empty 'Wanton' to the
-- 'tsrQueue' queue, and on such wantons the queue is never blocked.  This means
-- that  the only way the queue can block is when its empty, which means that
-- none of the mini-protocols wanted to send.  The egress part will read
-- a 'Wanton', take a fixed amount of bytes encode them in as an 'MuxSDU'; if
-- there are leftovers it will put them back in the 'Wanton' and place it at the
-- end of the queue (reading and writing to it will happen in a single STM
-- transaction which assures that the order of requests from a mini-protocol is
-- preserved.
--
-- Properties:
--
-- * at any given time the 'tsrQueue' contains at most one
--   'TranslocationServiceRequest' from a given mini-protocol of the given
--   'MiniProtocolDir', thus the queue contains at most @2 * |ptcl|@
--   translocation requests.
-- * at any given time each @TranslocationServiceRequest@ contains a non-empty
-- 'Wanton'
--
run :: forall m mode.
       ( MonadAsync m
       , MonadFork m
       , MonadLabelledSTM m
       , Alternative (STM m)
       , MonadThrow (STM m)
       , MonadTimer m
       , MonadMask m
       )
    => Tracer m Trace
    -> Mux mode m
    -> Bearer m
    -> m ()
run :: forall (m :: * -> *) (mode :: Mode).
(MonadAsync m, MonadFork m, MonadLabelledSTM m,
 Alternative (STM m), MonadThrow (STM m), MonadTimer m,
 MonadMask m) =>
Tracer m Trace -> Mux mode m -> Bearer m -> m ()
run Tracer m Trace
tracer Mux {Map (MiniProtocolNum, MiniProtocolDir) (MiniProtocolState mode m)
muxMiniProtocols :: forall (mode :: Mode) (m :: * -> *).
Mux mode m
-> Map
     (MiniProtocolNum, MiniProtocolDir) (MiniProtocolState mode m)
muxMiniProtocols :: Map (MiniProtocolNum, MiniProtocolDir) (MiniProtocolState mode m)
muxMiniProtocols, StrictTQueue m (ControlCmd mode m)
muxControlCmdQueue :: forall (mode :: Mode) (m :: * -> *).
Mux mode m -> StrictTQueue m (ControlCmd mode m)
muxControlCmdQueue :: StrictTQueue m (ControlCmd mode m)
muxControlCmdQueue, StrictTVar m Status
muxStatus :: forall (mode :: Mode) (m :: * -> *).
Mux mode m -> StrictTVar m Status
muxStatus :: StrictTVar m Status
muxStatus} bearer :: Bearer m
bearer@Bearer {String
name :: String
name :: forall (m :: * -> *). Bearer m -> String
name} = do
    egressQueue <- STM m (StrictTBQueue m (TranslocationServiceRequest m))
-> m (StrictTBQueue m (TranslocationServiceRequest m))
forall a. HasCallStack => STM m a -> m a
forall (m :: * -> *) a.
(MonadSTM m, HasCallStack) =>
STM m a -> m a
atomically (STM m (StrictTBQueue m (TranslocationServiceRequest m))
 -> m (StrictTBQueue m (TranslocationServiceRequest m)))
-> STM m (StrictTBQueue m (TranslocationServiceRequest m))
-> m (StrictTBQueue m (TranslocationServiceRequest m))
forall a b. (a -> b) -> a -> b
$ Natural -> STM m (StrictTBQueue m (TranslocationServiceRequest m))
forall (m :: * -> *) a.
MonadSTM m =>
Natural -> STM m (StrictTBQueue m a)
newTBQueue Natural
100

    -- label shared variables
    labelTBQueueIO egressQueue (name ++ "-mux-egress")
    labelTVarIO muxStatus (name ++ "-mux-status")
    labelTQueueIO muxControlCmdQueue (name ++ "-mux-ctrl")

    JobPool.withJobPool
      (\JobPool Group m JobResult
jobpool -> do
        JobPool Group m JobResult -> Job Group m JobResult -> m ()
forall group (m :: * -> *) a.
(MonadAsync m, MonadMask m, Ord group) =>
JobPool group m a -> Job group m a -> m ()
JobPool.forkJob JobPool Group m JobResult
jobpool (StrictTBQueue m (TranslocationServiceRequest m)
-> Job Group m JobResult
muxerJob StrictTBQueue m (TranslocationServiceRequest m)
egressQueue)
        JobPool Group m JobResult -> Job Group m JobResult -> m ()
forall group (m :: * -> *) a.
(MonadAsync m, MonadMask m, Ord group) =>
JobPool group m a -> Job group m a -> m ()
JobPool.forkJob JobPool Group m JobResult
jobpool Job Group m JobResult
demuxerJob
        Tracer m Trace -> Trace -> m ()
forall (m :: * -> *) a. Tracer m a -> a -> m ()
traceWith Tracer m Trace
tracer (BearerState -> Trace
TraceState BearerState
Mature)

        -- Wait for someone to shut us down by calling muxStop or an error.
        -- Outstanding jobs are shut down Upon completion of withJobPool.
        (TimeoutFn m -> m ()) -> m ()
forall (m :: * -> *) b.
(MonadAsync m, MonadFork m, MonadMonotonicTime m, MonadTimer m,
 MonadMask m, MonadThrow (STM m)) =>
(TimeoutFn m -> m b) -> m b
withTimeoutSerial ((TimeoutFn m -> m ()) -> m ()) -> (TimeoutFn m -> m ()) -> m ()
forall a b. (a -> b) -> a -> b
$ \TimeoutFn m
timeout ->
          Tracer m Trace
-> TimeoutFn m
-> JobPool Group m JobResult
-> StrictTBQueue m (TranslocationServiceRequest m)
-> StrictTQueue m (ControlCmd mode m)
-> StrictTVar m Status
-> m ()
forall (mode :: Mode) (m :: * -> *).
(MonadAsync m, MonadMask m, Alternative (STM m),
 MonadThrow (STM m)) =>
Tracer m Trace
-> TimeoutFn m
-> JobPool Group m JobResult
-> EgressQueue m
-> StrictTQueue m (ControlCmd mode m)
-> StrictTVar m Status
-> m ()
monitor Tracer m Trace
tracer
                  DiffTime -> m a -> m (Maybe a)
TimeoutFn m
timeout
                  JobPool Group m JobResult
jobpool
                  StrictTBQueue m (TranslocationServiceRequest m)
egressQueue
                  StrictTQueue m (ControlCmd mode m)
muxControlCmdQueue
                  StrictTVar m Status
muxStatus
      )
    -- Only handle async exceptions, 'monitor' sets 'muxStatus' before throwing
    -- an exception.  Setting 'muxStatus' is necessary to resolve a possible
    -- deadlock of mini-protocol completion action.
    m () -> (SomeAsyncException -> m ()) -> m ()
forall e a. Exception e => m a -> (e -> m a) -> m a
forall (m :: * -> *) e a.
(MonadCatch m, Exception e) =>
m a -> (e -> m a) -> m a
`catch` \(SomeAsyncException e
e) -> do
      STM m () -> m ()
forall a. HasCallStack => STM m a -> m a
forall (m :: * -> *) a.
(MonadSTM m, HasCallStack) =>
STM m a -> m a
atomically (STM m () -> m ()) -> STM m () -> m ()
forall a b. (a -> b) -> a -> b
$ StrictTVar m Status -> Status -> STM m ()
forall (m :: * -> *) a.
MonadSTM m =>
StrictTVar m a -> a -> STM m ()
writeTVar StrictTVar m Status
muxStatus (SomeException -> Status
Failed (SomeException -> Status) -> SomeException -> Status
forall a b. (a -> b) -> a -> b
$ e -> SomeException
forall e. Exception e => e -> SomeException
toException e
e)
      e -> m ()
forall e a. Exception e => e -> m a
forall (m :: * -> *) e a. (MonadThrow m, Exception e) => e -> m a
throwIO e
e
  where
    muxerJob :: StrictTBQueue m (TranslocationServiceRequest m)
-> Job Group m JobResult
muxerJob StrictTBQueue m (TranslocationServiceRequest m)
egressQueue =
      m JobResult
-> (SomeException -> m JobResult)
-> Group
-> String
-> Job Group m JobResult
forall group (m :: * -> *) a.
m a -> (SomeException -> m a) -> group -> String -> Job group m a
JobPool.Job (StrictTBQueue m (TranslocationServiceRequest m)
-> Bearer m -> m JobResult
forall (m :: * -> *) void.
(MonadAsync m, MonadFork m, MonadMask m, MonadThrow (STM m),
 MonadTimer m) =>
EgressQueue m -> Bearer m -> m void
muxer StrictTBQueue m (TranslocationServiceRequest m)
egressQueue Bearer m
bearer)
                  (JobResult -> m JobResult
forall a. a -> m a
forall (m :: * -> *) a. Monad m => a -> m a
return (JobResult -> m JobResult)
-> (SomeException -> JobResult) -> SomeException -> m JobResult
forall b c a. (b -> c) -> (a -> b) -> a -> c
. SomeException -> JobResult
MuxerException)
                  Group
MuxJob
                  (String
name String -> String -> String
forall a. [a] -> [a] -> [a]
++ String
"-muxer")

    demuxerJob :: Job Group m JobResult
demuxerJob =
      m JobResult
-> (SomeException -> m JobResult)
-> Group
-> String
-> Job Group m JobResult
forall group (m :: * -> *) a.
m a -> (SomeException -> m a) -> group -> String -> Job group m a
JobPool.Job ([MiniProtocolState mode m] -> Bearer m -> m JobResult
forall (m :: * -> *) (mode :: Mode) void.
(MonadAsync m, MonadFork m, MonadMask m, MonadThrow (STM m),
 MonadTimer m) =>
[MiniProtocolState mode m] -> Bearer m -> m void
demuxer (Map (MiniProtocolNum, MiniProtocolDir) (MiniProtocolState mode m)
-> [MiniProtocolState mode m]
forall k a. Map k a -> [a]
Map.elems Map (MiniProtocolNum, MiniProtocolDir) (MiniProtocolState mode m)
muxMiniProtocols) Bearer m
bearer)
                  (JobResult -> m JobResult
forall a. a -> m a
forall (m :: * -> *) a. Monad m => a -> m a
return (JobResult -> m JobResult)
-> (SomeException -> JobResult) -> SomeException -> m JobResult
forall b c a. (b -> c) -> (a -> b) -> a -> c
. SomeException -> JobResult
DemuxerException)
                  Group
MuxJob
                  (String
name String -> String -> String
forall a. [a] -> [a] -> [a]
++ String
"-demuxer")

-- | Mini-protocol thread executed by `JobPool` which executes `protocolAction`.
--
miniProtocolJob
  :: forall mode m.
     ( MonadSTM m
     , MonadThread m
     , MonadThrow (STM m)
     )
  => Tracer m Trace
  -> EgressQueue m
  -> MiniProtocolState mode m
  -> MiniProtocolAction m
  -> JobPool.Job Group m JobResult
miniProtocolJob :: forall (mode :: Mode) (m :: * -> *).
(MonadSTM m, MonadThread m, MonadThrow (STM m)) =>
Tracer m Trace
-> EgressQueue m
-> MiniProtocolState mode m
-> MiniProtocolAction m
-> Job Group m JobResult
miniProtocolJob Tracer m Trace
tracer EgressQueue m
egressQueue
                MiniProtocolState {
                  miniProtocolInfo :: forall (mode :: Mode) (m :: * -> *).
MiniProtocolState mode m -> MiniProtocolInfo mode
miniProtocolInfo =
                    MiniProtocolInfo {
                      MiniProtocolNum
miniProtocolNum :: forall (mode :: Mode). MiniProtocolInfo mode -> MiniProtocolNum
miniProtocolNum :: MiniProtocolNum
miniProtocolNum,
                      MiniProtocolDirection mode
miniProtocolDir :: forall (mode :: Mode).
MiniProtocolInfo mode -> MiniProtocolDirection mode
miniProtocolDir :: MiniProtocolDirection mode
miniProtocolDir
                    },
                  IngressQueue m
miniProtocolIngressQueue :: forall (mode :: Mode) (m :: * -> *).
MiniProtocolState mode m -> IngressQueue m
miniProtocolIngressQueue :: IngressQueue m
miniProtocolIngressQueue,
                  StrictTVar m MiniProtocolStatus
miniProtocolStatusVar :: forall (mode :: Mode) (m :: * -> *).
MiniProtocolState mode m -> StrictTVar m MiniProtocolStatus
miniProtocolStatusVar :: StrictTVar m MiniProtocolStatus
miniProtocolStatusVar
                }
               MiniProtocolAction {
                 ByteChannel m -> m (a, Maybe ByteString)
miniProtocolAction :: ByteChannel m -> m (a, Maybe ByteString)
miniProtocolAction :: ()
miniProtocolAction,
                 StrictTMVar m (Either SomeException a)
completionVar :: StrictTMVar m (Either SomeException a)
completionVar :: ()
completionVar
               } =
    m JobResult
-> (SomeException -> m JobResult)
-> Group
-> String
-> Job Group m JobResult
forall group (m :: * -> *) a.
m a -> (SomeException -> m a) -> group -> String -> Job group m a
JobPool.Job m JobResult
jobAction
                SomeException -> m JobResult
jobHandler
                Group
MiniProtocolJob
                (MiniProtocolNum -> String
forall a. Show a => a -> String
show MiniProtocolNum
miniProtocolNum String -> String -> String
forall a. [a] -> [a] -> [a]
++ String
"." String -> String -> String
forall a. [a] -> [a] -> [a]
++ MiniProtocolDir -> String
forall a. Show a => a -> String
show MiniProtocolDir
miniProtocolDirEnum)
  where
    jobAction :: m JobResult
jobAction = do
      String -> m ()
forall (m :: * -> *). MonadThread m => String -> m ()
labelThisThread (case MiniProtocolNum
miniProtocolNum of
                        MiniProtocolNum Word16
a -> String
"prtcl-" String -> String -> String
forall a. [a] -> [a] -> [a]
++ Word16 -> String
forall a. Show a => a -> String
show Word16
a)
      w <- ByteString -> m (IngressQueue m)
forall (m :: * -> *) a. MonadSTM m => a -> m (StrictTVar m a)
newTVarIO ByteString
BL.empty
      let chan = Tracer m Trace
-> EgressQueue m
-> Wanton m
-> MiniProtocolNum
-> MiniProtocolDir
-> IngressQueue m
-> ByteChannel m
forall (m :: * -> *).
MonadSTM m =>
Tracer m Trace
-> EgressQueue m
-> Wanton m
-> MiniProtocolNum
-> MiniProtocolDir
-> IngressQueue m
-> ByteChannel m
muxChannel Tracer m Trace
tracer EgressQueue m
egressQueue (IngressQueue m -> Wanton m
forall (m :: * -> *). StrictTVar m ByteString -> Wanton m
Wanton IngressQueue m
w)
                            MiniProtocolNum
miniProtocolNum MiniProtocolDir
miniProtocolDirEnum
                            IngressQueue m
miniProtocolIngressQueue
      (result, remainder) <- miniProtocolAction chan
      traceWith tracer (TraceTerminating miniProtocolNum miniProtocolDirEnum)
      atomically $ do
        -- The Wanton w is the SDUs that are queued but not yet sent for this job.
        -- Job threads will be prevented from exiting until all their SDUs have been
        -- transmitted unless an exception/error is encountered. In that case all
        -- jobs will be cancelled directly.
        readTVar w >>= check . BL.null
        writeTVar miniProtocolStatusVar StatusIdle
        putTMVar completionVar (Right result)
          `orElse` throwSTM (BlockedOnCompletionVar miniProtocolNum)
        case remainder of
          Just ByteString
trailing ->
            IngressQueue m -> (ByteString -> ByteString) -> STM m ()
forall (m :: * -> *) a.
MonadSTM m =>
StrictTVar m a -> (a -> a) -> STM m ()
modifyTVar IngressQueue m
miniProtocolIngressQueue (ByteString -> ByteString -> ByteString
BL.append ByteString
trailing)
          Maybe ByteString
Nothing ->
            () -> STM m ()
forall a. a -> STM m a
forall (f :: * -> *) a. Applicative f => a -> f a
pure ()

      return (MiniProtocolShutdown miniProtocolNum miniProtocolDirEnum)

    jobHandler :: SomeException -> m JobResult
    jobHandler :: SomeException -> m JobResult
jobHandler SomeException
e = do
      STM m () -> m ()
forall a. HasCallStack => STM m a -> m a
forall (m :: * -> *) a.
(MonadSTM m, HasCallStack) =>
STM m a -> m a
atomically (STM m () -> m ()) -> STM m () -> m ()
forall a b. (a -> b) -> a -> b
$
        StrictTMVar m (Either SomeException a)
-> Either SomeException a -> STM m ()
forall (m :: * -> *) a.
MonadSTM m =>
StrictTMVar m a -> a -> STM m ()
putTMVar StrictTMVar m (Either SomeException a)
completionVar (SomeException -> Either SomeException a
forall a b. a -> Either a b
Left SomeException
e)
        STM m () -> STM m () -> STM m ()
forall a. STM m a -> STM m a -> STM m a
forall (m :: * -> *) a. MonadSTM m => STM m a -> STM m a -> STM m a
`orElse`
        RuntimeError -> STM m ()
forall (m :: * -> *) e a.
(MonadSTM m, MonadThrow (STM m), Exception e) =>
e -> STM m a
throwSTM (MiniProtocolNum -> RuntimeError
BlockedOnCompletionVar MiniProtocolNum
miniProtocolNum)
      JobResult -> m JobResult
forall a. a -> m a
forall (m :: * -> *) a. Monad m => a -> m a
return (MiniProtocolNum -> MiniProtocolDir -> SomeException -> JobResult
MiniProtocolException MiniProtocolNum
miniProtocolNum MiniProtocolDir
miniProtocolDirEnum SomeException
e)

    miniProtocolDirEnum :: MiniProtocolDir
    miniProtocolDirEnum :: MiniProtocolDir
miniProtocolDirEnum = MiniProtocolDirection mode -> MiniProtocolDir
forall (mode :: Mode).
MiniProtocolDirection mode -> MiniProtocolDir
protocolDirEnum MiniProtocolDirection mode
miniProtocolDir

data ControlCmd mode m =
     CmdStartProtocolThread
       !StartOnDemandOrEagerly
       !(MiniProtocolState mode m)
       !(MiniProtocolAction m)
   | CmdShutdown

-- | Strategy how to start a mini-protocol.
--
data StartOnDemandOrEagerly =
    -- | Start a mini-protocol promptly.
    StartEagerly
    -- | Start a mini-protocol when data is received for the given
    -- mini-protocol.  Must be used only when initial message is sent by the
    -- remote side.
  | StartOnDemand
  deriving StartOnDemandOrEagerly -> StartOnDemandOrEagerly -> Bool
(StartOnDemandOrEagerly -> StartOnDemandOrEagerly -> Bool)
-> (StartOnDemandOrEagerly -> StartOnDemandOrEagerly -> Bool)
-> Eq StartOnDemandOrEagerly
forall a. (a -> a -> Bool) -> (a -> a -> Bool) -> Eq a
$c== :: StartOnDemandOrEagerly -> StartOnDemandOrEagerly -> Bool
== :: StartOnDemandOrEagerly -> StartOnDemandOrEagerly -> Bool
$c/= :: StartOnDemandOrEagerly -> StartOnDemandOrEagerly -> Bool
/= :: StartOnDemandOrEagerly -> StartOnDemandOrEagerly -> Bool
Eq

data MiniProtocolAction m where
    MiniProtocolAction :: forall m a.
      { ()
miniProtocolAction :: ByteChannel m -> m (a, Maybe BL.ByteString),
        -- ^ mini-protocol action
        ()
completionVar      :: StrictTMVar m (Either SomeException a)
        -- ^ Completion var
      }
      -> MiniProtocolAction m

type MiniProtocolKey = (MiniProtocolNum, MiniProtocolDir)

newtype MonitorCtx m mode = MonitorCtx {
    -- | Mini-Protocols started on demand and waiting to be scheduled.
    --
    forall (m :: * -> *) (mode :: Mode).
MonitorCtx m mode
-> Map
     (MiniProtocolNum, MiniProtocolDir)
     (MiniProtocolState mode m, MiniProtocolAction m)
mcOnDemandProtocols :: Map MiniProtocolKey
                               (MiniProtocolState mode m, MiniProtocolAction m)

  }

-- | The monitoring loop does two jobs:
--
--  1. It waits for mini-protocol threads to terminate.
--  2. It starts responder protocol threads on demand when the first
--     incoming message arrives.
--
monitor :: forall mode m.
           ( MonadAsync m
           , MonadMask m
           , Alternative (STM m)
           , MonadThrow (STM m)
           )
        => Tracer m Trace
        -> TimeoutFn m
        -> JobPool.JobPool Group m JobResult
        -> EgressQueue m
        -> StrictTQueue m (ControlCmd mode m)
        -> StrictTVar m Status
        -> m ()
monitor :: forall (mode :: Mode) (m :: * -> *).
(MonadAsync m, MonadMask m, Alternative (STM m),
 MonadThrow (STM m)) =>
Tracer m Trace
-> TimeoutFn m
-> JobPool Group m JobResult
-> EgressQueue m
-> StrictTQueue m (ControlCmd mode m)
-> StrictTVar m Status
-> m ()
monitor Tracer m Trace
tracer TimeoutFn m
timeout JobPool Group m JobResult
jobpool EgressQueue m
egressQueue StrictTQueue m (ControlCmd mode m)
cmdQueue StrictTVar m Status
muxStatus =
    MonitorCtx m mode -> m ()
go (Map
  (MiniProtocolNum, MiniProtocolDir)
  (MiniProtocolState mode m, MiniProtocolAction m)
-> MonitorCtx m mode
forall (m :: * -> *) (mode :: Mode).
Map
  (MiniProtocolNum, MiniProtocolDir)
  (MiniProtocolState mode m, MiniProtocolAction m)
-> MonitorCtx m mode
MonitorCtx Map
  (MiniProtocolNum, MiniProtocolDir)
  (MiniProtocolState mode m, MiniProtocolAction m)
forall k a. Map k a
Map.empty)
  where
    go :: MonitorCtx m mode -> m ()
    go :: MonitorCtx m mode -> m ()
go !monitorCtx :: MonitorCtx m mode
monitorCtx@MonitorCtx { Map
  (MiniProtocolNum, MiniProtocolDir)
  (MiniProtocolState mode m, MiniProtocolAction m)
mcOnDemandProtocols :: forall (m :: * -> *) (mode :: Mode).
MonitorCtx m mode
-> Map
     (MiniProtocolNum, MiniProtocolDir)
     (MiniProtocolState mode m, MiniProtocolAction m)
mcOnDemandProtocols :: Map
  (MiniProtocolNum, MiniProtocolDir)
  (MiniProtocolState mode m, MiniProtocolAction m)
mcOnDemandProtocols } = do
      result <- STM m (MonitorEvent mode m) -> m (MonitorEvent mode m)
forall a. HasCallStack => STM m a -> m a
forall (m :: * -> *) a.
(MonadSTM m, HasCallStack) =>
STM m a -> m a
atomically (STM m (MonitorEvent mode m) -> m (MonitorEvent mode m))
-> STM m (MonitorEvent mode m) -> m (MonitorEvent mode m)
forall a b. (a -> b) -> a -> b
$ FirstToFinish (STM m) (MonitorEvent mode m)
-> STM m (MonitorEvent mode m)
forall (m :: * -> *) a. FirstToFinish m a -> m a
runFirstToFinish (FirstToFinish (STM m) (MonitorEvent mode m)
 -> STM m (MonitorEvent mode m))
-> FirstToFinish (STM m) (MonitorEvent mode m)
-> STM m (MonitorEvent mode m)
forall a b. (a -> b) -> a -> b
$
            -- wait for a mini-protocol thread to terminate
           STM m (MonitorEvent mode m)
-> FirstToFinish (STM m) (MonitorEvent mode m)
forall (m :: * -> *) a. m a -> FirstToFinish m a
FirstToFinish (JobResult -> MonitorEvent mode m
forall (mode :: Mode) (m :: * -> *).
JobResult -> MonitorEvent mode m
EventJobResult (JobResult -> MonitorEvent mode m)
-> STM m JobResult -> STM m (MonitorEvent mode m)
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> JobPool Group m JobResult -> STM m JobResult
forall (m :: * -> *) group a.
MonadSTM m =>
JobPool group m a -> STM m a
JobPool.waitForJob JobPool Group m JobResult
jobpool)

            -- wait for a new control command
        FirstToFinish (STM m) (MonitorEvent mode m)
-> FirstToFinish (STM m) (MonitorEvent mode m)
-> FirstToFinish (STM m) (MonitorEvent mode m)
forall a. Semigroup a => a -> a -> a
<> STM m (MonitorEvent mode m)
-> FirstToFinish (STM m) (MonitorEvent mode m)
forall (m :: * -> *) a. m a -> FirstToFinish m a
FirstToFinish (ControlCmd mode m -> MonitorEvent mode m
forall (mode :: Mode) (m :: * -> *).
ControlCmd mode m -> MonitorEvent mode m
EventControlCmd (ControlCmd mode m -> MonitorEvent mode m)
-> STM m (ControlCmd mode m) -> STM m (MonitorEvent mode m)
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> StrictTQueue m (ControlCmd mode m) -> STM m (ControlCmd mode m)
forall (m :: * -> *) a. MonadSTM m => StrictTQueue m a -> STM m a
readTQueue StrictTQueue m (ControlCmd mode m)
cmdQueue)

            -- or wait for data to arrive on the channels that do not yet have
            -- responder threads running
        FirstToFinish (STM m) (MonitorEvent mode m)
-> FirstToFinish (STM m) (MonitorEvent mode m)
-> FirstToFinish (STM m) (MonitorEvent mode m)
forall a. Semigroup a => a -> a -> a
<> ((MiniProtocolState mode m, MiniProtocolAction m)
 -> FirstToFinish (STM m) (MonitorEvent mode m))
-> Map
     (MiniProtocolNum, MiniProtocolDir)
     (MiniProtocolState mode m, MiniProtocolAction m)
-> FirstToFinish (STM m) (MonitorEvent mode m)
forall m a.
Monoid m =>
(a -> m) -> Map (MiniProtocolNum, MiniProtocolDir) a -> m
forall (t :: * -> *) m a.
(Foldable t, Monoid m) =>
(a -> m) -> t a -> m
foldMap
             (\(MiniProtocolState mode m
ptclState, MiniProtocolAction m
ptclAction) ->
               STM m (MonitorEvent mode m)
-> FirstToFinish (STM m) (MonitorEvent mode m)
forall (m :: * -> *) a. m a -> FirstToFinish m a
FirstToFinish (STM m (MonitorEvent mode m)
 -> FirstToFinish (STM m) (MonitorEvent mode m))
-> STM m (MonitorEvent mode m)
-> FirstToFinish (STM m) (MonitorEvent mode m)
forall a b. (a -> b) -> a -> b
$ do
                 IngressQueue m -> STM m ()
checkNonEmptyQueue (MiniProtocolState mode m -> IngressQueue m
forall (mode :: Mode) (m :: * -> *).
MiniProtocolState mode m -> IngressQueue m
miniProtocolIngressQueue MiniProtocolState mode m
ptclState)
                 MonitorEvent mode m -> STM m (MonitorEvent mode m)
forall a. a -> STM m a
forall (m :: * -> *) a. Monad m => a -> m a
return (MiniProtocolState mode m
-> MiniProtocolAction m -> MonitorEvent mode m
forall (mode :: Mode) (m :: * -> *).
MiniProtocolState mode m
-> MiniProtocolAction m -> MonitorEvent mode m
EventStartOnDemand MiniProtocolState mode m
ptclState MiniProtocolAction m
ptclAction)
             )
             Map
  (MiniProtocolNum, MiniProtocolDir)
  (MiniProtocolState mode m, MiniProtocolAction m)
mcOnDemandProtocols

      case result of
        -- Protocols that runs to completion are not automatically restarted.
        EventJobResult (MiniProtocolShutdown MiniProtocolNum
pnum MiniProtocolDir
pmode) -> do
          Tracer m Trace -> Trace -> m ()
forall (m :: * -> *) a. Tracer m a -> a -> m ()
traceWith Tracer m Trace
tracer (MiniProtocolNum -> MiniProtocolDir -> Trace
TraceCleanExit MiniProtocolNum
pnum MiniProtocolDir
pmode)
          MonitorCtx m mode -> m ()
go MonitorCtx m mode
monitorCtx

        EventJobResult (MiniProtocolException MiniProtocolNum
pnum MiniProtocolDir
pmode SomeException
e) -> do
          Tracer m Trace -> Trace -> m ()
forall (m :: * -> *) a. Tracer m a -> a -> m ()
traceWith Tracer m Trace
tracer (BearerState -> Trace
TraceState BearerState
Dead)
          Tracer m Trace -> Trace -> m ()
forall (m :: * -> *) a. Tracer m a -> a -> m ()
traceWith Tracer m Trace
tracer (MiniProtocolNum -> MiniProtocolDir -> SomeException -> Trace
TraceExceptionExit MiniProtocolNum
pnum MiniProtocolDir
pmode SomeException
e)
          STM m () -> m ()
forall a. HasCallStack => STM m a -> m a
forall (m :: * -> *) a.
(MonadSTM m, HasCallStack) =>
STM m a -> m a
atomically (STM m () -> m ()) -> STM m () -> m ()
forall a b. (a -> b) -> a -> b
$ StrictTVar m Status -> Status -> STM m ()
forall (m :: * -> *) a.
MonadSTM m =>
StrictTVar m a -> a -> STM m ()
writeTVar StrictTVar m Status
muxStatus (Status -> STM m ()) -> Status -> STM m ()
forall a b. (a -> b) -> a -> b
$ SomeException -> Status
Failed SomeException
e
          SomeException -> m ()
forall e a. Exception e => e -> m a
forall (m :: * -> *) e a. (MonadThrow m, Exception e) => e -> m a
throwIO SomeException
e

        -- These two cover internal and protocol errors.  The muxer exception is
        -- always fatal.  The demuxer exception 'BearerClosed' when all
        -- mini-protocols stopped indicates a normal shutdown and thus it is not
        -- propagated.
        --
        -- TODO: decide if we should have exception wrappers here to identify
        -- the source of the failure, e.g. specific mini-protocol. If we're
        -- propagating exceptions, we don't need to log them.
        EventJobResult (MuxerException SomeException
e) -> do
          Tracer m Trace -> Trace -> m ()
forall (m :: * -> *) a. Tracer m a -> a -> m ()
traceWith Tracer m Trace
tracer (BearerState -> Trace
TraceState BearerState
Dead)
          STM m () -> m ()
forall a. HasCallStack => STM m a -> m a
forall (m :: * -> *) a.
(MonadSTM m, HasCallStack) =>
STM m a -> m a
atomically (STM m () -> m ()) -> STM m () -> m ()
forall a b. (a -> b) -> a -> b
$ StrictTVar m Status -> Status -> STM m ()
forall (m :: * -> *) a.
MonadSTM m =>
StrictTVar m a -> a -> STM m ()
writeTVar StrictTVar m Status
muxStatus (Status -> STM m ()) -> Status -> STM m ()
forall a b. (a -> b) -> a -> b
$ SomeException -> Status
Failed SomeException
e
          SomeException -> m ()
forall e a. Exception e => e -> m a
forall (m :: * -> *) e a. (MonadThrow m, Exception e) => e -> m a
throwIO SomeException
e
        EventJobResult (DemuxerException SomeException
e) -> do
          Tracer m Trace -> Trace -> m ()
forall (m :: * -> *) a. Tracer m a -> a -> m ()
traceWith Tracer m Trace
tracer (BearerState -> Trace
TraceState BearerState
Dead)
          r <- STM m Bool -> m Bool
forall a. HasCallStack => STM m a -> m a
forall (m :: * -> *) a.
(MonadSTM m, HasCallStack) =>
STM m a -> m a
atomically (STM m Bool -> m Bool) -> STM m Bool -> m Bool
forall a b. (a -> b) -> a -> b
$ do
            size <- JobPool Group m JobResult -> Group -> STM m Int
forall (m :: * -> *) group a.
(MonadSTM m, Eq group) =>
JobPool group m a -> group -> STM m Int
JobPool.readGroupSize JobPool Group m JobResult
jobpool Group
MiniProtocolJob
            case size of
              Int
0  | Just BearerClosed {} <- SomeException -> Maybe Error
forall e. Exception e => SomeException -> Maybe e
fromException SomeException
e
                -> StrictTVar m Status -> Status -> STM m ()
forall (m :: * -> *) a.
MonadSTM m =>
StrictTVar m a -> a -> STM m ()
writeTVar StrictTVar m Status
muxStatus Status
Stopped
                STM m () -> STM m Bool -> STM m Bool
forall a b. STM m a -> STM m b -> STM m b
forall (m :: * -> *) a b. Monad m => m a -> m b -> m b
>> Bool -> STM m Bool
forall a. a -> STM m a
forall (m :: * -> *) a. Monad m => a -> m a
return Bool
True
              Int
_ -> StrictTVar m Status -> Status -> STM m ()
forall (m :: * -> *) a.
MonadSTM m =>
StrictTVar m a -> a -> STM m ()
writeTVar StrictTVar m Status
muxStatus (SomeException -> Status
Failed SomeException
e)
                STM m () -> STM m Bool -> STM m Bool
forall a b. STM m a -> STM m b -> STM m b
forall (m :: * -> *) a b. Monad m => m a -> m b -> m b
>> Bool -> STM m Bool
forall a. a -> STM m a
forall (m :: * -> *) a. Monad m => a -> m a
return Bool
False
          unless r (throwIO e)

        EventControlCmd (CmdStartProtocolThread
                           StartOnDemandOrEagerly
StartEagerly
                           ptclState :: MiniProtocolState mode m
ptclState@MiniProtocolState {
                             miniProtocolInfo :: forall (mode :: Mode) (m :: * -> *).
MiniProtocolState mode m -> MiniProtocolInfo mode
miniProtocolInfo = MiniProtocolInfo {
                               MiniProtocolNum
miniProtocolNum :: forall (mode :: Mode). MiniProtocolInfo mode -> MiniProtocolNum
miniProtocolNum :: MiniProtocolNum
miniProtocolNum,
                               MiniProtocolDirection mode
miniProtocolDir :: forall (mode :: Mode).
MiniProtocolInfo mode -> MiniProtocolDirection mode
miniProtocolDir :: MiniProtocolDirection mode
miniProtocolDir
                             }
                           }
                           MiniProtocolAction m
ptclAction) -> do
          Tracer m Trace -> Trace -> m ()
forall (m :: * -> *) a. Tracer m a -> a -> m ()
traceWith Tracer m Trace
tracer (MiniProtocolNum -> MiniProtocolDir -> Trace
TraceStartEagerly MiniProtocolNum
miniProtocolNum
                             (MiniProtocolDirection mode -> MiniProtocolDir
forall (mode :: Mode).
MiniProtocolDirection mode -> MiniProtocolDir
protocolDirEnum MiniProtocolDirection mode
miniProtocolDir))
          JobPool Group m JobResult -> Job Group m JobResult -> m ()
forall group (m :: * -> *) a.
(MonadAsync m, MonadMask m, Ord group) =>
JobPool group m a -> Job group m a -> m ()
JobPool.forkJob JobPool Group m JobResult
jobpool (Job Group m JobResult -> m ()) -> Job Group m JobResult -> m ()
forall a b. (a -> b) -> a -> b
$
            Tracer m Trace
-> EgressQueue m
-> MiniProtocolState mode m
-> MiniProtocolAction m
-> Job Group m JobResult
forall (mode :: Mode) (m :: * -> *).
(MonadSTM m, MonadThread m, MonadThrow (STM m)) =>
Tracer m Trace
-> EgressQueue m
-> MiniProtocolState mode m
-> MiniProtocolAction m
-> Job Group m JobResult
miniProtocolJob
              Tracer m Trace
tracer
              EgressQueue m
egressQueue
              MiniProtocolState mode m
ptclState
              MiniProtocolAction m
ptclAction
          MonitorCtx m mode -> m ()
go MonitorCtx m mode
monitorCtx

        EventControlCmd (CmdStartProtocolThread
                           StartOnDemandOrEagerly
StartOnDemand
                           ptclState :: MiniProtocolState mode m
ptclState@MiniProtocolState {
                             miniProtocolInfo :: forall (mode :: Mode) (m :: * -> *).
MiniProtocolState mode m -> MiniProtocolInfo mode
miniProtocolInfo = MiniProtocolInfo {
                               MiniProtocolNum
miniProtocolNum :: forall (mode :: Mode). MiniProtocolInfo mode -> MiniProtocolNum
miniProtocolNum :: MiniProtocolNum
miniProtocolNum,
                               MiniProtocolDirection mode
miniProtocolDir :: forall (mode :: Mode).
MiniProtocolInfo mode -> MiniProtocolDirection mode
miniProtocolDir :: MiniProtocolDirection mode
miniProtocolDir
                             }
                           }
                           MiniProtocolAction m
ptclAction) -> do
          let monitorCtx' :: MonitorCtx m mode
monitorCtx' = MonitorCtx m mode
monitorCtx { mcOnDemandProtocols =
                                           Map.insert (protocolKey ptclState)
                                                      (ptclState, ptclAction)
                                                      mcOnDemandProtocols
                                       }
          Tracer m Trace -> Trace -> m ()
forall (m :: * -> *) a. Tracer m a -> a -> m ()
traceWith Tracer m Trace
tracer (MiniProtocolNum -> MiniProtocolDir -> Trace
TraceStartedOnDemand MiniProtocolNum
miniProtocolNum
                             (MiniProtocolDirection mode -> MiniProtocolDir
forall (mode :: Mode).
MiniProtocolDirection mode -> MiniProtocolDir
protocolDirEnum MiniProtocolDirection mode
miniProtocolDir))
          MonitorCtx m mode -> m ()
go MonitorCtx m mode
monitorCtx'

        EventControlCmd ControlCmd mode m
CmdShutdown -> do
          Tracer m Trace -> Trace -> m ()
forall (m :: * -> *) a. Tracer m a -> a -> m ()
traceWith Tracer m Trace
tracer Trace
TraceStopping
          STM m () -> m ()
forall a. HasCallStack => STM m a -> m a
forall (m :: * -> *) a.
(MonadSTM m, HasCallStack) =>
STM m a -> m a
atomically (STM m () -> m ()) -> STM m () -> m ()
forall a b. (a -> b) -> a -> b
$ StrictTVar m Status -> Status -> STM m ()
forall (m :: * -> *) a.
MonadSTM m =>
StrictTVar m a -> a -> STM m ()
writeTVar StrictTVar m Status
muxStatus Status
Stopping
          JobPool Group m JobResult -> Group -> m ()
forall (m :: * -> *) group a.
(MonadAsync m, Eq group) =>
JobPool group m a -> group -> m ()
JobPool.cancelGroup JobPool Group m JobResult
jobpool Group
MiniProtocolJob
          -- wait for 2 seconds before the egress queue is drained
          _ <- DiffTime -> m () -> m (Maybe ())
TimeoutFn m
timeout DiffTime
2 (m () -> m (Maybe ())) -> m () -> m (Maybe ())
forall a b. (a -> b) -> a -> b
$
            STM m () -> m ()
forall a. HasCallStack => STM m a -> m a
forall (m :: * -> *) a.
(MonadSTM m, HasCallStack) =>
STM m a -> m a
atomically (STM m () -> m ()) -> STM m () -> m ()
forall a b. (a -> b) -> a -> b
$
                  EgressQueue m -> STM m (Maybe (TranslocationServiceRequest m))
forall (m :: * -> *) a.
MonadSTM m =>
StrictTBQueue m a -> STM m (Maybe a)
tryPeekTBQueue EgressQueue m
egressQueue
              STM m (Maybe (TranslocationServiceRequest m))
-> (Maybe (TranslocationServiceRequest m) -> STM m ()) -> STM m ()
forall a b. STM m a -> (a -> STM m b) -> STM m b
forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= Bool -> STM m ()
forall (m :: * -> *). MonadSTM m => Bool -> STM m ()
check (Bool -> STM m ())
-> (Maybe (TranslocationServiceRequest m) -> Bool)
-> Maybe (TranslocationServiceRequest m)
-> STM m ()
forall b c a. (b -> c) -> (a -> b) -> a -> c
. Maybe (TranslocationServiceRequest m) -> Bool
forall a. Maybe a -> Bool
isNothing
          atomically $ writeTVar muxStatus Stopped
          traceWith tracer TraceStopped
          -- by exiting the 'monitor' loop we let the job pool kill demuxer and
          -- muxer threads

        -- Data has arrived on a channel for a mini-protocol for which we have
        -- an on-demand-start protocol thread. So we start it now.
        EventStartOnDemand ptclState :: MiniProtocolState mode m
ptclState@MiniProtocolState {
                             miniProtocolInfo :: forall (mode :: Mode) (m :: * -> *).
MiniProtocolState mode m -> MiniProtocolInfo mode
miniProtocolInfo = MiniProtocolInfo {
                               MiniProtocolNum
miniProtocolNum :: forall (mode :: Mode). MiniProtocolInfo mode -> MiniProtocolNum
miniProtocolNum :: MiniProtocolNum
miniProtocolNum,
                               MiniProtocolDirection mode
miniProtocolDir :: forall (mode :: Mode).
MiniProtocolInfo mode -> MiniProtocolDirection mode
miniProtocolDir :: MiniProtocolDirection mode
miniProtocolDir
                             },
                             StrictTVar m MiniProtocolStatus
miniProtocolStatusVar :: forall (mode :: Mode) (m :: * -> *).
MiniProtocolState mode m -> StrictTVar m MiniProtocolStatus
miniProtocolStatusVar :: StrictTVar m MiniProtocolStatus
miniProtocolStatusVar
                           }
                           MiniProtocolAction m
ptclAction -> do
          Tracer m Trace -> Trace -> m ()
forall (m :: * -> *) a. Tracer m a -> a -> m ()
traceWith Tracer m Trace
tracer (MiniProtocolNum -> MiniProtocolDir -> Trace
TraceStartOnDemand MiniProtocolNum
miniProtocolNum
                             (MiniProtocolDirection mode -> MiniProtocolDir
forall (mode :: Mode).
MiniProtocolDirection mode -> MiniProtocolDir
protocolDirEnum MiniProtocolDirection mode
miniProtocolDir))
          STM m () -> m ()
forall a. HasCallStack => STM m a -> m a
forall (m :: * -> *) a.
(MonadSTM m, HasCallStack) =>
STM m a -> m a
atomically (STM m () -> m ()) -> STM m () -> m ()
forall a b. (a -> b) -> a -> b
$ StrictTVar m MiniProtocolStatus -> MiniProtocolStatus -> STM m ()
forall (m :: * -> *) a.
MonadSTM m =>
StrictTVar m a -> a -> STM m ()
writeTVar StrictTVar m MiniProtocolStatus
miniProtocolStatusVar MiniProtocolStatus
StatusRunning
          JobPool Group m JobResult -> Job Group m JobResult -> m ()
forall group (m :: * -> *) a.
(MonadAsync m, MonadMask m, Ord group) =>
JobPool group m a -> Job group m a -> m ()
JobPool.forkJob JobPool Group m JobResult
jobpool (Job Group m JobResult -> m ()) -> Job Group m JobResult -> m ()
forall a b. (a -> b) -> a -> b
$
            Tracer m Trace
-> EgressQueue m
-> MiniProtocolState mode m
-> MiniProtocolAction m
-> Job Group m JobResult
forall (mode :: Mode) (m :: * -> *).
(MonadSTM m, MonadThread m, MonadThrow (STM m)) =>
Tracer m Trace
-> EgressQueue m
-> MiniProtocolState mode m
-> MiniProtocolAction m
-> Job Group m JobResult
miniProtocolJob
              Tracer m Trace
tracer
              EgressQueue m
egressQueue
              MiniProtocolState mode m
ptclState
              MiniProtocolAction m
ptclAction
          let ptclKey :: (MiniProtocolNum, MiniProtocolDir)
ptclKey = MiniProtocolState mode m -> (MiniProtocolNum, MiniProtocolDir)
protocolKey MiniProtocolState mode m
ptclState
              monitorCtx' :: MonitorCtx m mode
monitorCtx' = MonitorCtx m mode
monitorCtx { mcOnDemandProtocols =
                                           Map.delete ptclKey
                                                      mcOnDemandProtocols
                                       }
          MonitorCtx m mode -> m ()
go MonitorCtx m mode
monitorCtx'

    checkNonEmptyQueue :: IngressQueue m -> STM m ()
    checkNonEmptyQueue :: IngressQueue m -> STM m ()
checkNonEmptyQueue IngressQueue m
q = do
      buf <- IngressQueue m -> STM m ByteString
forall (m :: * -> *) a. MonadSTM m => StrictTVar m a -> STM m a
readTVar IngressQueue m
q
      check (not (BL.null buf))

    protocolKey :: MiniProtocolState mode m -> MiniProtocolKey
    protocolKey :: MiniProtocolState mode m -> (MiniProtocolNum, MiniProtocolDir)
protocolKey MiniProtocolState {
                  miniProtocolInfo :: forall (mode :: Mode) (m :: * -> *).
MiniProtocolState mode m -> MiniProtocolInfo mode
miniProtocolInfo = MiniProtocolInfo {
                    MiniProtocolNum
miniProtocolNum :: forall (mode :: Mode). MiniProtocolInfo mode -> MiniProtocolNum
miniProtocolNum :: MiniProtocolNum
miniProtocolNum,
                    MiniProtocolDirection mode
miniProtocolDir :: forall (mode :: Mode).
MiniProtocolInfo mode -> MiniProtocolDirection mode
miniProtocolDir :: MiniProtocolDirection mode
miniProtocolDir
                  }
                } =
      (MiniProtocolNum
miniProtocolNum, MiniProtocolDirection mode -> MiniProtocolDir
forall (mode :: Mode).
MiniProtocolDirection mode -> MiniProtocolDir
protocolDirEnum MiniProtocolDirection mode
miniProtocolDir)

data MonitorEvent mode m =
     EventJobResult  JobResult
   | EventControlCmd (ControlCmd mode m)
   | EventStartOnDemand (MiniProtocolState mode m)
                        (MiniProtocolAction m)

-- | The mux forks off a number of threads and its main thread waits and
-- monitors them all. This type covers the different thread and their possible
-- termination behaviour.
--
data JobResult =

       -- | A mini-protocol thread terminated with a result.
       --
       MiniProtocolShutdown MiniProtocolNum MiniProtocolDir

       -- | A mini-protocol thread terminated with an exception. We always
       -- respond by terminating the whole mux.
     | MiniProtocolException MiniProtocolNum MiniProtocolDir SomeException

       -- | Exception in the 'mux' thread. Always fatal.
     | MuxerException   SomeException

       -- | Exception in the 'demux' thread. Always fatal.
     | DemuxerException SomeException


-- | muxChannel creates a duplex channel for a specific 'MiniProtocolId' and
-- 'MiniProtocolDir'.
--
muxChannel
    :: forall m.
       ( MonadSTM m
       )
    => Tracer m Trace
    -> EgressQueue m
    -> Wanton m
    -> MiniProtocolNum
    -> MiniProtocolDir
    -> IngressQueue m
    -> ByteChannel m
muxChannel :: forall (m :: * -> *).
MonadSTM m =>
Tracer m Trace
-> EgressQueue m
-> Wanton m
-> MiniProtocolNum
-> MiniProtocolDir
-> IngressQueue m
-> ByteChannel m
muxChannel Tracer m Trace
tracer EgressQueue m
egressQueue want :: Wanton m
want@(Wanton StrictTVar m ByteString
w) MiniProtocolNum
mc MiniProtocolDir
md StrictTVar m ByteString
q =
    Channel { ByteString -> m ()
send :: ByteString -> m ()
send :: ByteString -> m ()
send, m (Maybe ByteString)
recv :: m (Maybe ByteString)
recv :: m (Maybe ByteString)
recv}
  where
    -- Limit for the message buffer between send and mux thread.
    perMiniProtocolBufferSize :: Int64
    perMiniProtocolBufferSize :: Int64
perMiniProtocolBufferSize = Int64
0x3ffff

    send :: BL.ByteString -> m ()
    send :: ByteString -> m ()
send ByteString
encoding = do
        -- We send CBOR encoded messages by encoding them into by ByteString
        -- forwarding them to the 'mux' thread, see 'Desired servicing semantics'.

        Tracer m Trace -> Trace -> m ()
forall (m :: * -> *) a. Tracer m a -> a -> m ()
traceWith Tracer m Trace
tracer (Trace -> m ()) -> Trace -> m ()
forall a b. (a -> b) -> a -> b
$ MiniProtocolNum -> Int -> Trace
TraceChannelSendStart MiniProtocolNum
mc (Int64 -> Int
forall a b. (Integral a, Num b) => a -> b
fromIntegral (Int64 -> Int) -> Int64 -> Int
forall a b. (a -> b) -> a -> b
$ ByteString -> Int64
BL.length ByteString
encoding)

        STM m () -> m ()
forall a. HasCallStack => STM m a -> m a
forall (m :: * -> *) a.
(MonadSTM m, HasCallStack) =>
STM m a -> m a
atomically (STM m () -> m ()) -> STM m () -> m ()
forall a b. (a -> b) -> a -> b
$ do
            buf <- StrictTVar m ByteString -> STM m ByteString
forall (m :: * -> *) a. MonadSTM m => StrictTVar m a -> STM m a
readTVar StrictTVar m ByteString
w
            if BL.length buf < perMiniProtocolBufferSize
               then do
                   let wasEmpty = ByteString -> Bool
BL.null ByteString
buf
                   writeTVar w (BL.append buf encoding)
                   when wasEmpty $
                     writeTBQueue egressQueue (TLSRDemand mc md want)
               else retry

        Tracer m Trace -> Trace -> m ()
forall (m :: * -> *) a. Tracer m a -> a -> m ()
traceWith Tracer m Trace
tracer (Trace -> m ()) -> Trace -> m ()
forall a b. (a -> b) -> a -> b
$ MiniProtocolNum -> Trace
TraceChannelSendEnd MiniProtocolNum
mc

    recv :: m (Maybe BL.ByteString)
    recv :: m (Maybe ByteString)
recv = do
        -- We receive CBOR encoded messages as ByteStrings (possibly partial) from the
        -- matching ingress queue. This is the same queue the 'demux' thread writes to.
        Tracer m Trace -> Trace -> m ()
forall (m :: * -> *) a. Tracer m a -> a -> m ()
traceWith Tracer m Trace
tracer (Trace -> m ()) -> Trace -> m ()
forall a b. (a -> b) -> a -> b
$ MiniProtocolNum -> Trace
TraceChannelRecvStart MiniProtocolNum
mc
        blob <- STM m ByteString -> m ByteString
forall a. HasCallStack => STM m a -> m a
forall (m :: * -> *) a.
(MonadSTM m, HasCallStack) =>
STM m a -> m a
atomically (STM m ByteString -> m ByteString)
-> STM m ByteString -> m ByteString
forall a b. (a -> b) -> a -> b
$ do
            blob <- StrictTVar m ByteString -> STM m ByteString
forall (m :: * -> *) a. MonadSTM m => StrictTVar m a -> STM m a
readTVar StrictTVar m ByteString
q
            if blob == BL.empty
                then retry
                else writeTVar q BL.empty >> return blob
        -- say $ printf "recv mid %s mode %s blob len %d" (show mid) (show md) (BL.length blob)
        traceWith tracer $ TraceChannelRecvEnd mc (fromIntegral $ BL.length blob)
        return $ Just blob

traceBearerState :: Tracer m Trace -> BearerState -> m ()
traceBearerState :: forall (m :: * -> *). Tracer m Trace -> BearerState -> m ()
traceBearerState Tracer m Trace
tracer BearerState
state =
    Tracer m Trace -> Trace -> m ()
forall (m :: * -> *) a. Tracer m a -> a -> m ()
traceWith Tracer m Trace
tracer (BearerState -> Trace
TraceState BearerState
state)


--
-- Starting mini-protocol threads
--

-- | Arrange to run a protocol thread (for a particular 'MiniProtocolNum' and
-- 'MiniProtocolDirection') to interact on this protocol's 'Channel'.
--
-- The protocol thread can either be started eagerly or on-demand:
--
-- * With 'StartEagerly', the thread is started promptly. This is appropriate
--   for mini-protocols where the opening message may be sent by this thread.
--
-- * With 'StartOnDemand', the thread is not started until the first data is
--   received for this mini-protocol. This is appropriate for mini-protocols
--   where the opening message is sent by the remote peer.
--
-- The result is a STM action to block and wait on the protocol completion.
-- It is safe to call this completion action multiple times: it will always
-- return the same result once the protocol thread completes.
-- In case the Mux has stopped, either due to an exception or because of a call
-- to muxStop a `Left Error` will be returned from the STM action.
--
-- It is an error to start a new protocol thread while one is still running,
-- for the same 'MiniProtocolNum' and 'MiniProtocolDirection'. This can easily be
-- avoided by using the STM completion action to wait for the previous one to
-- finish.
--
-- It is safe to ask to start a protocol thread before 'run'. In this case
-- the protocol thread will not actually start until 'run' is called,
-- irrespective of the 'StartOnDemandOrEagerly' value.
--
runMiniProtocol :: forall mode m a.
                   ( Alternative (STM m)
                   , MonadSTM   m
                   , MonadThrow m
                   , MonadThrow (STM m)
                   )
                => Mux mode m
                -> MiniProtocolNum
                -> MiniProtocolDirection mode
                -> StartOnDemandOrEagerly
                -> (ByteChannel m -> m (a, Maybe BL.ByteString))
                -> m (STM m (Either SomeException a))
runMiniProtocol :: forall (mode :: Mode) (m :: * -> *) a.
(Alternative (STM m), MonadSTM m, MonadThrow m,
 MonadThrow (STM m)) =>
Mux mode m
-> MiniProtocolNum
-> MiniProtocolDirection mode
-> StartOnDemandOrEagerly
-> (ByteChannel m -> m (a, Maybe ByteString))
-> m (STM m (Either SomeException a))
runMiniProtocol Mux { Map (MiniProtocolNum, MiniProtocolDir) (MiniProtocolState mode m)
muxMiniProtocols :: forall (mode :: Mode) (m :: * -> *).
Mux mode m
-> Map
     (MiniProtocolNum, MiniProtocolDir) (MiniProtocolState mode m)
muxMiniProtocols :: Map (MiniProtocolNum, MiniProtocolDir) (MiniProtocolState mode m)
muxMiniProtocols, StrictTQueue m (ControlCmd mode m)
muxControlCmdQueue :: forall (mode :: Mode) (m :: * -> *).
Mux mode m -> StrictTQueue m (ControlCmd mode m)
muxControlCmdQueue :: StrictTQueue m (ControlCmd mode m)
muxControlCmdQueue , StrictTVar m Status
muxStatus :: forall (mode :: Mode) (m :: * -> *).
Mux mode m -> StrictTVar m Status
muxStatus :: StrictTVar m Status
muxStatus}
                MiniProtocolNum
ptclNum MiniProtocolDirection mode
ptclDir StartOnDemandOrEagerly
startMode ByteChannel m -> m (a, Maybe ByteString)
protocolAction

    -- Ensure the mini-protocol is known and get the status var
  | Just ptclState :: MiniProtocolState mode m
ptclState@MiniProtocolState{StrictTVar m MiniProtocolStatus
miniProtocolStatusVar :: forall (mode :: Mode) (m :: * -> *).
MiniProtocolState mode m -> StrictTVar m MiniProtocolStatus
miniProtocolStatusVar :: StrictTVar m MiniProtocolStatus
miniProtocolStatusVar}
      <- (MiniProtocolNum, MiniProtocolDir)
-> Map
     (MiniProtocolNum, MiniProtocolDir) (MiniProtocolState mode m)
-> Maybe (MiniProtocolState mode m)
forall k a. Ord k => k -> Map k a -> Maybe a
Map.lookup (MiniProtocolNum
ptclNum, MiniProtocolDir
ptclDir') Map (MiniProtocolNum, MiniProtocolDir) (MiniProtocolState mode m)
muxMiniProtocols

  = STM m (STM m (Either SomeException a))
-> m (STM m (Either SomeException a))
forall a. HasCallStack => STM m a -> m a
forall (m :: * -> *) a.
(MonadSTM m, HasCallStack) =>
STM m a -> m a
atomically (STM m (STM m (Either SomeException a))
 -> m (STM m (Either SomeException a)))
-> STM m (STM m (Either SomeException a))
-> m (STM m (Either SomeException a))
forall a b. (a -> b) -> a -> b
$ do
      st <- StrictTVar m Status -> STM m Status
forall (m :: * -> *) a. MonadSTM m => StrictTVar m a -> STM m a
readTVar StrictTVar m Status
muxStatus
      case st of
        Status
Stopping -> Error -> STM m ()
forall (m :: * -> *) e a.
(MonadSTM m, MonadThrow (STM m), Exception e) =>
e -> STM m a
throwSTM (Maybe SomeException -> Status -> Error
Shutdown Maybe SomeException
forall a. Maybe a
Nothing Status
st)
        Status
Stopped  -> Error -> STM m ()
forall (m :: * -> *) e a.
(MonadSTM m, MonadThrow (STM m), Exception e) =>
e -> STM m a
throwSTM (Maybe SomeException -> Status -> Error
Shutdown Maybe SomeException
forall a. Maybe a
Nothing Status
st)
        Status
_        -> () -> STM m ()
forall a. a -> STM m a
forall (m :: * -> *) a. Monad m => a -> m a
return ()

      -- Make sure no thread is currently running, and update the status to
      -- indicate a thread is running (or ready to start on demand)
      status <- readTVar miniProtocolStatusVar
      unless (status == StatusIdle) $
        throwSTM (ProtocolAlreadyRunning ptclNum ptclDir' status)
      let !status' = case StartOnDemandOrEagerly
startMode of
                       StartOnDemandOrEagerly
StartOnDemand -> MiniProtocolStatus
StatusStartOnDemand
                       StartOnDemandOrEagerly
StartEagerly  -> MiniProtocolStatus
StatusRunning
      writeTVar miniProtocolStatusVar status'

      -- Tell the mux control to start the thread
      completionVar <- newEmptyTMVar
      writeTQueue muxControlCmdQueue $
        CmdStartProtocolThread
          startMode
          ptclState
          (MiniProtocolAction protocolAction completionVar)

      return $ completionAction completionVar

    -- It is a programmer error to get the wrong protocol, but this is also
    -- very easy to avoid.
  | Bool
otherwise
  = RuntimeError -> m (STM m (Either SomeException a))
forall e a. Exception e => e -> m a
forall (m :: * -> *) e a. (MonadThrow m, Exception e) => e -> m a
throwIO (MiniProtocolNum -> MiniProtocolDir -> RuntimeError
UnknownProtocolInternalError MiniProtocolNum
ptclNum MiniProtocolDir
ptclDir')
  where
    ptclDir' :: MiniProtocolDir
ptclDir' = MiniProtocolDirection mode -> MiniProtocolDir
forall (mode :: Mode).
MiniProtocolDirection mode -> MiniProtocolDir
protocolDirEnum MiniProtocolDirection mode
ptclDir

    -- Wait for the miniprotocol to complete.
    -- If the mux was stopped through a call to 'stop' (Stopped)
    -- or in case of an error (Failed) we return the result of
    -- the miniprotocol, or a `Error` if it was still running.
    completionAction :: StrictTMVar m (Either SomeException a)
-> STM m (Either SomeException a)
completionAction StrictTMVar m (Either SomeException a)
completionVar = do
      st <- StrictTVar m Status -> STM m Status
forall (m :: * -> *) a. MonadSTM m => StrictTVar m a -> STM m a
readTVar StrictTVar m Status
muxStatus
      case st of
           Status
Ready    -> StrictTMVar m (Either SomeException a)
-> STM m (Either SomeException a)
forall (m :: * -> *) a. MonadSTM m => StrictTMVar m a -> STM m a
readTMVar StrictTMVar m (Either SomeException a)
completionVar
           Status
Stopping -> StrictTMVar m (Either SomeException a)
-> STM m (Either SomeException a)
forall (m :: * -> *) a. MonadSTM m => StrictTMVar m a -> STM m a
readTMVar StrictTMVar m (Either SomeException a)
completionVar
                   STM m (Either SomeException a)
-> STM m (Either SomeException a) -> STM m (Either SomeException a)
forall a. STM m a -> STM m a -> STM m a
forall (f :: * -> *) a. Alternative f => f a -> f a -> f a
<|> Either SomeException a -> STM m (Either SomeException a)
forall a. a -> STM m a
forall (m :: * -> *) a. Monad m => a -> m a
return (SomeException -> Either SomeException a
forall a b. a -> Either a b
Left (SomeException -> Either SomeException a)
-> SomeException -> Either SomeException a
forall a b. (a -> b) -> a -> b
$ Error -> SomeException
forall e. Exception e => e -> SomeException
toException (Maybe SomeException -> Status -> Error
Shutdown Maybe SomeException
forall a. Maybe a
Nothing Status
st))
           Status
Stopped  -> StrictTMVar m (Either SomeException a)
-> STM m (Either SomeException a)
forall (m :: * -> *) a. MonadSTM m => StrictTMVar m a -> STM m a
readTMVar StrictTMVar m (Either SomeException a)
completionVar
                   STM m (Either SomeException a)
-> STM m (Either SomeException a) -> STM m (Either SomeException a)
forall a. STM m a -> STM m a -> STM m a
forall (f :: * -> *) a. Alternative f => f a -> f a -> f a
<|> Either SomeException a -> STM m (Either SomeException a)
forall a. a -> STM m a
forall (m :: * -> *) a. Monad m => a -> m a
return (SomeException -> Either SomeException a
forall a b. a -> Either a b
Left (SomeException -> Either SomeException a)
-> SomeException -> Either SomeException a
forall a b. (a -> b) -> a -> b
$ Error -> SomeException
forall e. Exception e => e -> SomeException
toException (Maybe SomeException -> Status -> Error
Shutdown Maybe SomeException
forall a. Maybe a
Nothing Status
st))
           Failed SomeException
e -> StrictTMVar m (Either SomeException a)
-> STM m (Either SomeException a)
forall (m :: * -> *) a. MonadSTM m => StrictTMVar m a -> STM m a
readTMVar StrictTMVar m (Either SomeException a)
completionVar
                   STM m (Either SomeException a)
-> STM m (Either SomeException a) -> STM m (Either SomeException a)
forall a. STM m a -> STM m a -> STM m a
forall (f :: * -> *) a. Alternative f => f a -> f a -> f a
<|> Either SomeException a -> STM m (Either SomeException a)
forall a. a -> STM m a
forall (m :: * -> *) a. Monad m => a -> m a
return (SomeException -> Either SomeException a
forall a b. a -> Either a b
Left (SomeException -> Either SomeException a)
-> SomeException -> Either SomeException a
forall a b. (a -> b) -> a -> b
$ Error -> SomeException
forall e. Exception e => e -> SomeException
toException (Maybe SomeException -> Status -> Error
Shutdown (SomeException -> Maybe SomeException
forall a. a -> Maybe a
Just SomeException
e) Status
st))