{-# LANGUAGE BangPatterns          #-}
{-# LANGUAGE BlockArguments        #-}
{-# LANGUAGE DataKinds             #-}
{-# LANGUAGE DuplicateRecordFields #-}
{-# LANGUAGE FlexibleContexts      #-}
{-# LANGUAGE GADTs                 #-}
{-# LANGUAGE KindSignatures        #-}
{-# LANGUAGE LambdaCase            #-}
{-# LANGUAGE NamedFieldPuns        #-}
{-# LANGUAGE RankNTypes            #-}
{-# LANGUAGE ScopedTypeVariables   #-}
{-# LANGUAGE TypeOperators         #-}

-- 'runResponder' is using a redundant constraint.
{-# OPTIONS_GHC -Wno-redundant-constraints #-}
{-# OPTIONS_GHC -Wno-deferred-out-of-scope-variables #-}

-- | Server implementation based on 'ConnectionManager'
--
-- This module should be imported qualified.
--
module Ouroboros.Network.InboundGovernor
  ( -- * Run Inbound Protocol Governor
    PublicState (..)
  , Arguments (..)
  , with
    -- * Trace
  , Trace (..)
  , Debug (..)
  , Event (..)
  , NewConnectionInfo (..)
  , RemoteSt (..)
  , RemoteTransition
  , RemoteTransitionTrace
  , AcceptConnectionsPolicyTrace (..)
    -- * Re-exports
  , Transition' (..)
  , TransitionTrace' (..)
  , ResponderCounters (..)
    -- * API's exported for testing purposes
  , maturedPeers
  ) where

import Control.Applicative (Alternative)
import Control.Concurrent.Class.MonadSTM qualified as LazySTM
import Control.Concurrent.Class.MonadSTM.Strict
import Control.Exception (SomeAsyncException (..))
import Control.Monad (foldM, forM_, forever, when)
import Control.Monad.Class.MonadAsync
import Control.Monad.Class.MonadFork
import Control.Monad.Class.MonadThrow
import Control.Monad.Class.MonadTime.SI
import Control.Monad.Class.MonadTimer.SI
import Control.Tracer (Tracer (..), traceWith)

import Data.Bifunctor (first)
import Data.ByteString.Lazy (ByteString)
import Data.Cache
import Data.Map.Strict (Map)
import Data.Map.Strict qualified as Map
import Data.Maybe.Strict
import Data.Monoid.Synchronisation
import Data.OrdPSQ (OrdPSQ)
import Data.OrdPSQ qualified as OrdPSQ
import Data.Set (Set)
import Data.Set qualified as Set
import Data.Void (Void)

import Network.Mux qualified as Mux
import Network.Mux.Types qualified as Mux

import Ouroboros.Network.ConnectionHandler
import Ouroboros.Network.ConnectionManager.Types
import Ouroboros.Network.Context
import Ouroboros.Network.InboundGovernor.InformationChannel (InformationChannel)
import Ouroboros.Network.InboundGovernor.InformationChannel qualified as InfoChannel
import Ouroboros.Network.InboundGovernor.State
import Ouroboros.Network.Mux
import Ouroboros.Network.Server.RateLimiting

-- | Period of time after which a peer transitions from a fresh to a mature one,
-- see `matureDuplexPeers` and `freshDuplexPeers`.
--
inboundMaturePeerDelay :: DiffTime
inboundMaturePeerDelay :: DiffTime
inboundMaturePeerDelay = DiffTime
15 DiffTime -> DiffTime -> DiffTime
forall a. Num a => a -> a -> a
* DiffTime
60


-- | Every ~30s we wake up the inbound governor.  This is to give a chance to
-- mark some of the inbound connections as mature.
--
inactionTimeout :: DiffTime
inactionTimeout :: DiffTime
inactionTimeout = DiffTime
31.415927


data Arguments muxMode handlerTrace socket peerAddr initiatorCtx responderCtx
               handle handleError versionNumber versionData bytes m a b x =
  Arguments {
      forall (muxMode :: Mode) handlerTrace socket peerAddr initiatorCtx
       responderCtx handle handleError versionNumber versionData bytes
       (m :: * -> *) a b x.
Arguments
  muxMode
  handlerTrace
  socket
  peerAddr
  initiatorCtx
  responderCtx
  handle
  handleError
  versionNumber
  versionData
  bytes
  m
  a
  b
  x
-> Tracer m (RemoteTransitionTrace peerAddr)
transitionTracer   :: Tracer m (RemoteTransitionTrace peerAddr),
      -- ^ transition tracer
      forall (muxMode :: Mode) handlerTrace socket peerAddr initiatorCtx
       responderCtx handle handleError versionNumber versionData bytes
       (m :: * -> *) a b x.
Arguments
  muxMode
  handlerTrace
  socket
  peerAddr
  initiatorCtx
  responderCtx
  handle
  handleError
  versionNumber
  versionData
  bytes
  m
  a
  b
  x
-> Tracer m (Trace peerAddr)
tracer             :: Tracer m (Trace peerAddr),
      -- ^ main inbound governor tracer
      forall (muxMode :: Mode) handlerTrace socket peerAddr initiatorCtx
       responderCtx handle handleError versionNumber versionData bytes
       (m :: * -> *) a b x.
Arguments
  muxMode
  handlerTrace
  socket
  peerAddr
  initiatorCtx
  responderCtx
  handle
  handleError
  versionNumber
  versionData
  bytes
  m
  a
  b
  x
-> Tracer m (Debug peerAddr versionData)
debugTracer        :: Tracer m (Debug peerAddr versionData),
      -- ^ debug inbound governor tracer
      forall (muxMode :: Mode) handlerTrace socket peerAddr initiatorCtx
       responderCtx handle handleError versionNumber versionData bytes
       (m :: * -> *) a b x.
Arguments
  muxMode
  handlerTrace
  socket
  peerAddr
  initiatorCtx
  responderCtx
  handle
  handleError
  versionNumber
  versionData
  bytes
  m
  a
  b
  x
-> versionData -> DataFlow
connectionDataFlow :: versionData -> DataFlow,
      -- ^ connection data flow
      forall (muxMode :: Mode) handlerTrace socket peerAddr initiatorCtx
       responderCtx handle handleError versionNumber versionData bytes
       (m :: * -> *) a b x.
Arguments
  muxMode
  handlerTrace
  socket
  peerAddr
  initiatorCtx
  responderCtx
  handle
  handleError
  versionNumber
  versionData
  bytes
  m
  a
  b
  x
-> InboundGovernorInfoChannel
     muxMode initiatorCtx peerAddr versionData ByteString m a b
infoChannel        :: InboundGovernorInfoChannel muxMode initiatorCtx peerAddr versionData ByteString m a b,
      -- ^ 'InformationChannel' which passes 'NewConnectionInfo' for outbound
      -- connections from connection manager to the inbound governor.
      forall (muxMode :: Mode) handlerTrace socket peerAddr initiatorCtx
       responderCtx handle handleError versionNumber versionData bytes
       (m :: * -> *) a b x.
Arguments
  muxMode
  handlerTrace
  socket
  peerAddr
  initiatorCtx
  responderCtx
  handle
  handleError
  versionNumber
  versionData
  bytes
  m
  a
  b
  x
-> Maybe DiffTime
idleTimeout        :: Maybe DiffTime,
      -- ^ protocol idle timeout.  The remote site must restart a mini-protocol
      -- within given timeframe (Nothing indicates no timeout).
      forall (muxMode :: Mode) handlerTrace socket peerAddr initiatorCtx
       responderCtx handle handleError versionNumber versionData bytes
       (m :: * -> *) a b x.
Arguments
  muxMode
  handlerTrace
  socket
  peerAddr
  initiatorCtx
  responderCtx
  handle
  handleError
  versionNumber
  versionData
  bytes
  m
  a
  b
  x
-> ConnectionHandler
     muxMode
     handlerTrace
     socket
     peerAddr
     handle
     handleError
     versionNumber
     versionData
     m
-> (ConnectionManager muxMode socket peerAddr handle handleError m
    -> m x)
-> m x
withConnectionManager
        :: ConnectionHandler muxMode handlerTrace socket peerAddr handle handleError versionNumber versionData m
        -> (ConnectionManager muxMode socket peerAddr handle handleError m -> m x)
        -> m x,
      -- ^ connection manager continuation
      forall (muxMode :: Mode) handlerTrace socket peerAddr initiatorCtx
       responderCtx handle handleError versionNumber versionData bytes
       (m :: * -> *) a b x.
Arguments
  muxMode
  handlerTrace
  socket
  peerAddr
  initiatorCtx
  responderCtx
  handle
  handleError
  versionNumber
  versionData
  bytes
  m
  a
  b
  x
-> (StrictTVar m (StrictMaybe ResponderCounters)
    -> Tracer m (WithBearer (ConnectionId peerAddr) Trace))
-> ConnectionHandler
     muxMode
     handlerTrace
     socket
     peerAddr
     handle
     handleError
     versionNumber
     versionData
     m
mkConnectionHandler
        :: (   StrictTVar m (StrictMaybe ResponderCounters)
            -> Tracer m (Mux.WithBearer (ConnectionId peerAddr) Mux.Trace))
        -> ConnectionHandler muxMode handlerTrace socket peerAddr handle handleError versionNumber versionData m
      -- ^ Connection handler builder, which injects a special tracer
      -- created here and routed into the muxer via the connection manager.
      -- The purpose is to inform the IG loop
      -- of miniprotocol responder activity such that proper and efficient
      -- peer cold/warm/hot transitions can be tracked.
    }


-- | Run the server, which consists of the following components:
--
-- * /inbound governor/, it corresponds to p2p-governor on outbound side
-- * /accept loop(s)/, one per given ip address.  We support up to one ipv4
--   address and up to one ipv6 address, i.e. an ipv6 enabled node will run two
--   accept loops on listening on different addresses with shared /inbound governor/.
--
-- The server can be run in either of two 'Network.Mux.Mode'-es:
--
-- * 'InitiatorResponderMode'
-- * 'ResponderMode'
--
-- The first one is used in data diffusion for /Node-To-Node protocol/, while the
-- other is useful for running a server for the /Node-To-Client protocol/.
--
with :: forall (muxMode :: Mux.Mode) socket peerAddr initiatorCtx responderCtx
               handle handlerTrace handleError versionData versionNumber bytes m a b x.
        ( Alternative (STM m)
        , MonadAsync       m
        , MonadCatch       m
        , MonadEvaluate    m
        , MonadLabelledSTM m
        , MonadThrow       m
        , MonadThrow  (STM m)
        , MonadTime        m
        , MonadTimer       m
        , MonadMask        m
        , Ord peerAddr
        , HasResponder muxMode ~ True
        , MonadTraceSTM m
        , MonadFork m
        , MonadDelay m
        , Show peerAddr
        )
     => Arguments muxMode handlerTrace socket peerAddr initiatorCtx responderCtx
                  handle handleError versionNumber versionData bytes m a b x
     -> (   Async m Void
         -> m (PublicState peerAddr versionData)
         -> ConnectionManager muxMode socket peerAddr handle handleError m
         -> m x)
     -> m x
with :: forall (muxMode :: Mode) socket peerAddr initiatorCtx responderCtx
       handle handlerTrace handleError versionData versionNumber bytes
       (m :: * -> *) a b x.
(Alternative (STM m), MonadAsync m, MonadCatch m, MonadEvaluate m,
 MonadLabelledSTM m, MonadThrow m, MonadThrow (STM m), MonadTime m,
 MonadTimer m, MonadMask m, Ord peerAddr,
 HasResponder muxMode ~ 'True, MonadTraceSTM m, MonadFork m,
 MonadDelay m, Show peerAddr) =>
Arguments
  muxMode
  handlerTrace
  socket
  peerAddr
  initiatorCtx
  responderCtx
  handle
  handleError
  versionNumber
  versionData
  bytes
  m
  a
  b
  x
-> (Async m Void
    -> m (PublicState peerAddr versionData)
    -> ConnectionManager muxMode socket peerAddr handle handleError m
    -> m x)
-> m x
with
    Arguments {
      transitionTracer :: forall (muxMode :: Mode) handlerTrace socket peerAddr initiatorCtx
       responderCtx handle handleError versionNumber versionData bytes
       (m :: * -> *) a b x.
Arguments
  muxMode
  handlerTrace
  socket
  peerAddr
  initiatorCtx
  responderCtx
  handle
  handleError
  versionNumber
  versionData
  bytes
  m
  a
  b
  x
-> Tracer m (RemoteTransitionTrace peerAddr)
transitionTracer = Tracer m (RemoteTransitionTrace peerAddr)
trTracer,
      Tracer m (Trace peerAddr)
tracer :: forall (muxMode :: Mode) handlerTrace socket peerAddr initiatorCtx
       responderCtx handle handleError versionNumber versionData bytes
       (m :: * -> *) a b x.
Arguments
  muxMode
  handlerTrace
  socket
  peerAddr
  initiatorCtx
  responderCtx
  handle
  handleError
  versionNumber
  versionData
  bytes
  m
  a
  b
  x
-> Tracer m (Trace peerAddr)
tracer :: Tracer m (Trace peerAddr)
tracer,
      Tracer m (Debug peerAddr versionData)
debugTracer :: forall (muxMode :: Mode) handlerTrace socket peerAddr initiatorCtx
       responderCtx handle handleError versionNumber versionData bytes
       (m :: * -> *) a b x.
Arguments
  muxMode
  handlerTrace
  socket
  peerAddr
  initiatorCtx
  responderCtx
  handle
  handleError
  versionNumber
  versionData
  bytes
  m
  a
  b
  x
-> Tracer m (Debug peerAddr versionData)
debugTracer :: Tracer m (Debug peerAddr versionData)
debugTracer,
      versionData -> DataFlow
connectionDataFlow :: forall (muxMode :: Mode) handlerTrace socket peerAddr initiatorCtx
       responderCtx handle handleError versionNumber versionData bytes
       (m :: * -> *) a b x.
Arguments
  muxMode
  handlerTrace
  socket
  peerAddr
  initiatorCtx
  responderCtx
  handle
  handleError
  versionNumber
  versionData
  bytes
  m
  a
  b
  x
-> versionData -> DataFlow
connectionDataFlow :: versionData -> DataFlow
connectionDataFlow,
      Maybe DiffTime
idleTimeout :: forall (muxMode :: Mode) handlerTrace socket peerAddr initiatorCtx
       responderCtx handle handleError versionNumber versionData bytes
       (m :: * -> *) a b x.
Arguments
  muxMode
  handlerTrace
  socket
  peerAddr
  initiatorCtx
  responderCtx
  handle
  handleError
  versionNumber
  versionData
  bytes
  m
  a
  b
  x
-> Maybe DiffTime
idleTimeout :: Maybe DiffTime
idleTimeout,
      InboundGovernorInfoChannel
  muxMode initiatorCtx peerAddr versionData ByteString m a b
infoChannel :: forall (muxMode :: Mode) handlerTrace socket peerAddr initiatorCtx
       responderCtx handle handleError versionNumber versionData bytes
       (m :: * -> *) a b x.
Arguments
  muxMode
  handlerTrace
  socket
  peerAddr
  initiatorCtx
  responderCtx
  handle
  handleError
  versionNumber
  versionData
  bytes
  m
  a
  b
  x
-> InboundGovernorInfoChannel
     muxMode initiatorCtx peerAddr versionData ByteString m a b
infoChannel :: InboundGovernorInfoChannel
  muxMode initiatorCtx peerAddr versionData ByteString m a b
infoChannel,
      ConnectionHandler
  muxMode
  handlerTrace
  socket
  peerAddr
  handle
  handleError
  versionNumber
  versionData
  m
-> (ConnectionManager muxMode socket peerAddr handle handleError m
    -> m x)
-> m x
withConnectionManager :: forall (muxMode :: Mode) handlerTrace socket peerAddr initiatorCtx
       responderCtx handle handleError versionNumber versionData bytes
       (m :: * -> *) a b x.
Arguments
  muxMode
  handlerTrace
  socket
  peerAddr
  initiatorCtx
  responderCtx
  handle
  handleError
  versionNumber
  versionData
  bytes
  m
  a
  b
  x
-> ConnectionHandler
     muxMode
     handlerTrace
     socket
     peerAddr
     handle
     handleError
     versionNumber
     versionData
     m
-> (ConnectionManager muxMode socket peerAddr handle handleError m
    -> m x)
-> m x
withConnectionManager :: ConnectionHandler
  muxMode
  handlerTrace
  socket
  peerAddr
  handle
  handleError
  versionNumber
  versionData
  m
-> (ConnectionManager muxMode socket peerAddr handle handleError m
    -> m x)
-> m x
withConnectionManager,
      (StrictTVar m (StrictMaybe ResponderCounters)
 -> Tracer m (WithBearer (ConnectionId peerAddr) Trace))
-> ConnectionHandler
     muxMode
     handlerTrace
     socket
     peerAddr
     handle
     handleError
     versionNumber
     versionData
     m
mkConnectionHandler :: forall (muxMode :: Mode) handlerTrace socket peerAddr initiatorCtx
       responderCtx handle handleError versionNumber versionData bytes
       (m :: * -> *) a b x.
Arguments
  muxMode
  handlerTrace
  socket
  peerAddr
  initiatorCtx
  responderCtx
  handle
  handleError
  versionNumber
  versionData
  bytes
  m
  a
  b
  x
-> (StrictTVar m (StrictMaybe ResponderCounters)
    -> Tracer m (WithBearer (ConnectionId peerAddr) Trace))
-> ConnectionHandler
     muxMode
     handlerTrace
     socket
     peerAddr
     handle
     handleError
     versionNumber
     versionData
     m
mkConnectionHandler :: (StrictTVar m (StrictMaybe ResponderCounters)
 -> Tracer m (WithBearer (ConnectionId peerAddr) Trace))
-> ConnectionHandler
     muxMode
     handlerTrace
     socket
     peerAddr
     handle
     handleError
     versionNumber
     versionData
     m
mkConnectionHandler
    }
    Async m Void
-> m (PublicState peerAddr versionData)
-> ConnectionManager muxMode socket peerAddr handle handleError m
-> m x
k
    = do
    stateVar <- State muxMode initiatorCtx peerAddr versionData m a b
-> m (StrictTVar
        m (State muxMode initiatorCtx peerAddr versionData m a b))
forall (m :: * -> *) a. MonadSTM m => a -> m (StrictTVar m a)
newTVarIO State muxMode initiatorCtx peerAddr versionData m a b
emptyState
    active   <- newTVarIO True -- ^ inbound governor status: True = Active
    let connectionHandler =
          (StrictTVar m (StrictMaybe ResponderCounters)
 -> Tracer m (WithBearer (ConnectionId peerAddr) Trace))
-> ConnectionHandler
     muxMode
     handlerTrace
     socket
     peerAddr
     handle
     handleError
     versionNumber
     versionData
     m
mkConnectionHandler ((StrictTVar m (StrictMaybe ResponderCounters)
  -> Tracer m (WithBearer (ConnectionId peerAddr) Trace))
 -> ConnectionHandler
      muxMode
      handlerTrace
      socket
      peerAddr
      handle
      handleError
      versionNumber
      versionData
      m)
-> (StrictTVar m (StrictMaybe ResponderCounters)
    -> Tracer m (WithBearer (ConnectionId peerAddr) Trace))
-> ConnectionHandler
     muxMode
     handlerTrace
     socket
     peerAddr
     handle
     handleError
     versionNumber
     versionData
     m
forall a b. (a -> b) -> a -> b
$ InboundGovernorInfoChannel
  muxMode initiatorCtx peerAddr versionData ByteString m a b
-> (versionData -> DataFlow)
-> StrictTVar
     m (State muxMode initiatorCtx peerAddr versionData m a b)
-> StrictTVar m Bool
-> StrictTVar m (StrictMaybe ResponderCounters)
-> Tracer m (WithBearer (ConnectionId peerAddr) Trace)
forall (m :: * -> *) peerAddr (muxMode :: Mode) initiatorCtx
       versionData a b.
(MonadSTM m, Ord peerAddr) =>
InboundGovernorInfoChannel
  muxMode initiatorCtx peerAddr versionData ByteString m a b
-> (versionData -> DataFlow)
-> StrictTVar
     m (State muxMode initiatorCtx peerAddr versionData m a b)
-> StrictTVar m Bool
-> StrictTVar m (StrictMaybe ResponderCounters)
-> Tracer m (WithBearer (ConnectionId peerAddr) Trace)
inboundGovernorMuxTracer InboundGovernorInfoChannel
  muxMode initiatorCtx peerAddr versionData ByteString m a b
infoChannel
                                                         versionData -> DataFlow
connectionDataFlow
                                                         StrictTVar
  m (State muxMode initiatorCtx peerAddr versionData m a b)
stateVar
                                                         StrictTVar m Bool
active
    withConnectionManager connectionHandler \ConnectionManager muxMode socket peerAddr handle handleError m
connectionManager ->
      m Void -> (Async m Void -> m x) -> m x
forall a b. m a -> (Async m a -> m b) -> m b
forall (m :: * -> *) a b.
MonadAsync m =>
m a -> (Async m a -> m b) -> m b
withAsync
        (  String -> m ()
forall (m :: * -> *). MonadThread m => String -> m ()
labelThisThread String
"inbound-governor-loop" m () -> m Void -> m Void
forall a b. m a -> m b -> m b
forall (m :: * -> *) a b. Monad m => m a -> m b -> m b
>>
           m () -> m Void
forall (f :: * -> *) a b. Applicative f => f a -> f b
forever (ConnectionManager muxMode socket peerAddr handle handleError m
-> StrictTVar
     m (State muxMode initiatorCtx peerAddr versionData m a b)
-> m ()
inboundGovernorStep ConnectionManager muxMode socket peerAddr handle handleError m
connectionManager StrictTVar
  m (State muxMode initiatorCtx peerAddr versionData m a b)
stateVar m () -> m () -> m ()
forall a b. m a -> m b -> m b
forall (m :: * -> *) a b. Monad m => m a -> m b -> m b
>> m ()
forall (m :: * -> *). MonadFork m => m ()
yield)
         m Void -> (SomeException -> m Void) -> m Void
forall e a. Exception e => m a -> (e -> m a) -> m a
forall (m :: * -> *) e a.
(MonadCatch m, Exception e) =>
m a -> (e -> m a) -> m a
`catch` \SomeException
e -> do
           -- following the next statement, the ig tracer will no longer
           -- write to the info channel queue.
           STM m () -> m ()
forall a. HasCallStack => STM m a -> m a
forall (m :: * -> *) a.
(MonadSTM m, HasCallStack) =>
STM m a -> m a
atomically (STM m () -> m ()) -> STM m () -> m ()
forall a b. (a -> b) -> a -> b
$ StrictTVar m Bool -> Bool -> STM m ()
forall (m :: * -> *) a.
MonadSTM m =>
StrictTVar m a -> a -> STM m ()
writeTVar StrictTVar m Bool
active Bool
False
           -- To avoid the risk of a full information channel queue
           -- and blocking on mux traces which will prevent connection cleanup,
           -- we drain it here just in case one last time.
           _ <- STM
  m
  [Event
     muxMode
     (Handle
        muxMode
        initiatorCtx
        (ResponderContext peerAddr)
        versionData
        ByteString
        m
        a
        b)
     initiatorCtx
     peerAddr
     versionData
     m
     a
     b]
-> m [Event
        muxMode
        (Handle
           muxMode
           initiatorCtx
           (ResponderContext peerAddr)
           versionData
           ByteString
           m
           a
           b)
        initiatorCtx
        peerAddr
        versionData
        m
        a
        b]
forall a. HasCallStack => STM m a -> m a
forall (m :: * -> *) a.
(MonadSTM m, HasCallStack) =>
STM m a -> m a
atomically (STM
   m
   [Event
      muxMode
      (Handle
         muxMode
         initiatorCtx
         (ResponderContext peerAddr)
         versionData
         ByteString
         m
         a
         b)
      initiatorCtx
      peerAddr
      versionData
      m
      a
      b]
 -> m [Event
         muxMode
         (Handle
            muxMode
            initiatorCtx
            (ResponderContext peerAddr)
            versionData
            ByteString
            m
            a
            b)
         initiatorCtx
         peerAddr
         versionData
         m
         a
         b])
-> STM
     m
     [Event
        muxMode
        (Handle
           muxMode
           initiatorCtx
           (ResponderContext peerAddr)
           versionData
           ByteString
           m
           a
           b)
        initiatorCtx
        peerAddr
        versionData
        m
        a
        b]
-> m [Event
        muxMode
        (Handle
           muxMode
           initiatorCtx
           (ResponderContext peerAddr)
           versionData
           ByteString
           m
           a
           b)
        initiatorCtx
        peerAddr
        versionData
        m
        a
        b]
forall a b. (a -> b) -> a -> b
$ InboundGovernorInfoChannel
  muxMode initiatorCtx peerAddr versionData ByteString m a b
-> STM
     m
     [Event
        muxMode
        (Handle
           muxMode
           initiatorCtx
           (ResponderContext peerAddr)
           versionData
           ByteString
           m
           a
           b)
        initiatorCtx
        peerAddr
        versionData
        m
        a
        b]
forall a (m :: * -> *). InformationChannel a m -> STM m [a]
InfoChannel.readMessages InboundGovernorInfoChannel
  muxMode initiatorCtx peerAddr versionData ByteString m a b
infoChannel
           handleError stateVar e)
      \Async m Void
thread ->
        Async m Void
-> m (PublicState peerAddr versionData)
-> ConnectionManager muxMode socket peerAddr handle handleError m
-> m x
k Async m Void
thread (State muxMode initiatorCtx peerAddr versionData m a b
-> PublicState peerAddr versionData
forall (muxMode :: Mode) initatorCtx versionData peerAddr
       (m :: * -> *) a b.
State muxMode initatorCtx peerAddr versionData m a b
-> PublicState peerAddr versionData
mkPublicState (State muxMode initiatorCtx peerAddr versionData m a b
 -> PublicState peerAddr versionData)
-> m (State muxMode initiatorCtx peerAddr versionData m a b)
-> m (PublicState peerAddr versionData)
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> StrictTVar
  m (State muxMode initiatorCtx peerAddr versionData m a b)
-> m (State muxMode initiatorCtx peerAddr versionData m a b)
forall (m :: * -> *) a. MonadSTM m => StrictTVar m a -> m a
readTVarIO StrictTVar
  m (State muxMode initiatorCtx peerAddr versionData m a b)
stateVar) ConnectionManager muxMode socket peerAddr handle handleError m
connectionManager
  where
    emptyState :: State muxMode initiatorCtx peerAddr versionData m a b
    emptyState :: State muxMode initiatorCtx peerAddr versionData m a b
emptyState = State {
        connections :: Map
  (ConnectionId peerAddr)
  (ConnectionState muxMode initiatorCtx peerAddr versionData m a b)
connections       = Map
  (ConnectionId peerAddr)
  (ConnectionState muxMode initiatorCtx peerAddr versionData m a b)
forall k a. Map k a
Map.empty,
        matureDuplexPeers :: Map peerAddr versionData
matureDuplexPeers = Map peerAddr versionData
forall k a. Map k a
Map.empty,
        freshDuplexPeers :: OrdPSQ peerAddr Time versionData
freshDuplexPeers  = OrdPSQ peerAddr Time versionData
forall k p v. OrdPSQ k p v
OrdPSQ.empty,
        countersCache :: Cache Counters
countersCache     = Cache Counters
forall a. Monoid a => a
mempty
      }

    -- Trace final transition mostly for testing purposes.
    --
    -- NOTE: `inboundGovernorLoop` doesn't throw synchronous exceptions, this is
    -- just need to handle asynchronous exceptions.
    handleError
      :: StrictTVar m (State muxMode initiatorCtx peerAddr versionData m a b)
      -> SomeException
      -> m Void
    handleError :: StrictTVar
  m (State muxMode initiatorCtx peerAddr versionData m a b)
-> SomeException -> m Void
handleError StrictTVar
  m (State muxMode initiatorCtx peerAddr versionData m a b)
var SomeException
e = do
      PublicState { remoteStateMap } <- State muxMode initiatorCtx peerAddr versionData m a b
-> PublicState peerAddr versionData
forall (muxMode :: Mode) initatorCtx versionData peerAddr
       (m :: * -> *) a b.
State muxMode initatorCtx peerAddr versionData m a b
-> PublicState peerAddr versionData
mkPublicState (State muxMode initiatorCtx peerAddr versionData m a b
 -> PublicState peerAddr versionData)
-> m (State muxMode initiatorCtx peerAddr versionData m a b)
-> m (PublicState peerAddr versionData)
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> StrictTVar
  m (State muxMode initiatorCtx peerAddr versionData m a b)
-> m (State muxMode initiatorCtx peerAddr versionData m a b)
forall (m :: * -> *) a. MonadSTM m => StrictTVar m a -> m a
readTVarIO StrictTVar
  m (State muxMode initiatorCtx peerAddr versionData m a b)
var
      _ <- Map.traverseWithKey
             (\ConnectionId peerAddr
connId RemoteSt
remoteSt ->
               Tracer m (RemoteTransitionTrace peerAddr)
-> RemoteTransitionTrace peerAddr -> m ()
forall (m :: * -> *) a. Tracer m a -> a -> m ()
traceWith Tracer m (RemoteTransitionTrace peerAddr)
trTracer (RemoteTransitionTrace peerAddr -> m ())
-> RemoteTransitionTrace peerAddr -> m ()
forall a b. (a -> b) -> a -> b
$
                 peerAddr
-> Transition' (Maybe RemoteSt) -> RemoteTransitionTrace peerAddr
forall id state.
id -> Transition' state -> TransitionTrace' id state
TransitionTrace (ConnectionId peerAddr -> peerAddr
forall addr. ConnectionId addr -> addr
remoteAddress ConnectionId peerAddr
connId)
                 Transition { fromState :: Maybe RemoteSt
fromState = RemoteSt -> Maybe RemoteSt
forall a. a -> Maybe a
Just RemoteSt
remoteSt,
                              toState :: Maybe RemoteSt
toState   = Maybe RemoteSt
forall a. Maybe a
Nothing }
             )
             remoteStateMap
      throwIO e

    -- The inbound protocol governor single step, which may
    -- process multipe events from the information channel
    inboundGovernorStep
      :: ConnectionManager muxMode socket peerAddr handle handleError m
      -> StrictTVar m (State muxMode initiatorCtx peerAddr versionData m a b)
      -> m ()
    inboundGovernorStep :: ConnectionManager muxMode socket peerAddr handle handleError m
-> StrictTVar
     m (State muxMode initiatorCtx peerAddr versionData m a b)
-> m ()
inboundGovernorStep ConnectionManager muxMode socket peerAddr handle handleError m
connectionManager StrictTVar
  m (State muxMode initiatorCtx peerAddr versionData m a b)
stateVar = do
      time <- m Time
forall (m :: * -> *). MonadMonotonicTime m => m Time
getMonotonicTime
      inactivityVar <- registerDelay inactionTimeout
      events <- atomically do
        state <- readTVar stateVar
        runFirstToFinish $
            -- we deliberately read the info channel queue after
            -- the relevant item in each firsttofinish to limit
            -- contention
            FirstToFinish do
              -- mark connections as mature
              case maturedPeers time (freshDuplexPeers state) of
                (Map peerAddr versionData
as, OrdPSQ peerAddr Time versionData
_)     | Map peerAddr versionData -> Bool
forall k a. Map k a -> Bool
Map.null Map peerAddr versionData
as
                            -> STM
  m
  [Event
     muxMode
     (Handle
        muxMode
        initiatorCtx
        (ResponderContext peerAddr)
        versionData
        ByteString
        m
        a
        b)
     initiatorCtx
     peerAddr
     versionData
     m
     a
     b]
forall a. STM m a
forall (m :: * -> *) a. MonadSTM m => STM m a
retry
                (Map peerAddr versionData
as, OrdPSQ peerAddr Time versionData
fresh) ->
                  (Map peerAddr versionData
-> OrdPSQ peerAddr Time versionData
-> Event
     muxMode
     (Handle
        muxMode
        initiatorCtx
        (ResponderContext peerAddr)
        versionData
        ByteString
        m
        a
        b)
     initiatorCtx
     peerAddr
     versionData
     m
     a
     b
forall (muxMode :: Mode) handle initiatorCtx peerAddr versionData
       (m :: * -> *) a b.
Map peerAddr versionData
-> OrdPSQ peerAddr Time versionData
-> Event muxMode handle initiatorCtx peerAddr versionData m a b
MaturedDuplexPeers Map peerAddr versionData
as OrdPSQ peerAddr Time versionData
fresh Event
  muxMode
  (Handle
     muxMode
     initiatorCtx
     (ResponderContext peerAddr)
     versionData
     ByteString
     m
     a
     b)
  initiatorCtx
  peerAddr
  versionData
  m
  a
  b
-> [Event
      muxMode
      (Handle
         muxMode
         initiatorCtx
         (ResponderContext peerAddr)
         versionData
         ByteString
         m
         a
         b)
      initiatorCtx
      peerAddr
      versionData
      m
      a
      b]
-> [Event
      muxMode
      (Handle
         muxMode
         initiatorCtx
         (ResponderContext peerAddr)
         versionData
         ByteString
         m
         a
         b)
      initiatorCtx
      peerAddr
      versionData
      m
      a
      b]
forall a. a -> [a] -> [a]
:) ([Event
    muxMode
    (Handle
       muxMode
       initiatorCtx
       (ResponderContext peerAddr)
       versionData
       ByteString
       m
       a
       b)
    initiatorCtx
    peerAddr
    versionData
    m
    a
    b]
 -> [Event
       muxMode
       (Handle
          muxMode
          initiatorCtx
          (ResponderContext peerAddr)
          versionData
          ByteString
          m
          a
          b)
       initiatorCtx
       peerAddr
       versionData
       m
       a
       b])
-> STM
     m
     [Event
        muxMode
        (Handle
           muxMode
           initiatorCtx
           (ResponderContext peerAddr)
           versionData
           ByteString
           m
           a
           b)
        initiatorCtx
        peerAddr
        versionData
        m
        a
        b]
-> STM
     m
     [Event
        muxMode
        (Handle
           muxMode
           initiatorCtx
           (ResponderContext peerAddr)
           versionData
           ByteString
           m
           a
           b)
        initiatorCtx
        peerAddr
        versionData
        m
        a
        b]
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> InboundGovernorInfoChannel
  muxMode initiatorCtx peerAddr versionData ByteString m a b
-> STM
     m
     [Event
        muxMode
        (Handle
           muxMode
           initiatorCtx
           (ResponderContext peerAddr)
           versionData
           ByteString
           m
           a
           b)
        initiatorCtx
        peerAddr
        versionData
        m
        a
        b]
forall a (m :: * -> *). InformationChannel a m -> STM m [a]
InfoChannel.readMessages InboundGovernorInfoChannel
  muxMode initiatorCtx peerAddr versionData ByteString m a b
infoChannel
         <> FirstToFinish do
              firstCommit <- runFirstToFinish $
                Map.foldMapWithKey firstPeerCommitRemote (connections state)
              -- it is important we read the channel here, and join it after
              -- the firstCommit. Registering responder starts are atomic wrt
              -- handling an expired peer in the tracer. If the CM drops the
              -- expired connection (CommitRemote below), the tracer must not
              -- register a promotion activity.
              (firstCommit :) <$> InfoChannel.readMessages infoChannel
         <> FirstToFinish do
              muxEvents <- InfoChannel.readMessages infoChannel
              check (not . null $ muxEvents) >> pure muxEvents
         <> FirstToFinish do
              -- spin the inbound governor loop; it will re-run with new
              -- time, which allows to make some peers mature.
              LazySTM.readTVar inactivityVar >>= check >> pure [InactivityTimeout]

      forM_ events \Event
  muxMode
  (Handle
     muxMode
     initiatorCtx
     (ResponderContext peerAddr)
     versionData
     ByteString
     m
     a
     b)
  initiatorCtx
  peerAddr
  versionData
  m
  a
  b
event -> do
        state <- StrictTVar
  m (State muxMode initiatorCtx peerAddr versionData m a b)
-> m (State muxMode initiatorCtx peerAddr versionData m a b)
forall (m :: * -> *) a. MonadSTM m => StrictTVar m a -> m a
readTVarIO StrictTVar
  m (State muxMode initiatorCtx peerAddr versionData m a b)
stateVar
        decision <- case event of
          NewConnection
            -- new connection has been announced by either accept loop or
            -- by connection manager (in which case the connection is in
            -- 'DuplexState').
            (NewConnectionInfo
              Provenance
provenance
              ConnectionId peerAddr
connId
              DataFlow
dataFlow
              Handle {
                hMux :: forall (muxMode :: Mode) initiatorCtx responderCtx versionData
       bytes (m :: * -> *) a b.
Handle muxMode initiatorCtx responderCtx versionData bytes m a b
-> Mux muxMode m
hMux         = Mux muxMode m
csMux,
                hMuxBundle :: forall (muxMode :: Mode) initiatorCtx responderCtx versionData
       bytes (m :: * -> *) a b.
Handle muxMode initiatorCtx responderCtx versionData bytes m a b
-> OuroborosBundle muxMode initiatorCtx responderCtx bytes m a b
hMuxBundle   = OuroborosBundle
  muxMode initiatorCtx (ResponderContext peerAddr) ByteString m a b
muxBundle,
                hVersionData :: forall (muxMode :: Mode) initiatorCtx responderCtx versionData
       bytes (m :: * -> *) a b.
Handle muxMode initiatorCtx responderCtx versionData bytes m a b
-> versionData
hVersionData = versionData
csVersionData
              }) -> do

                Tracer m (Trace peerAddr) -> Trace peerAddr -> m ()
forall (m :: * -> *) a. Tracer m a -> a -> m ()
traceWith Tracer m (Trace peerAddr)
tracer (Provenance -> ConnectionId peerAddr -> Trace peerAddr
forall peerAddr.
Provenance -> ConnectionId peerAddr -> Trace peerAddr
TrNewConnection Provenance
provenance ConnectionId peerAddr
connId)
                let responderContext :: ResponderContext peerAddr
responderContext = ResponderContext { rcConnectionId :: ConnectionId peerAddr
rcConnectionId = ConnectionId peerAddr
connId }

                connections <- (Maybe
   (ConnectionState muxMode initiatorCtx peerAddr versionData m a b)
 -> m (Maybe
         (ConnectionState muxMode initiatorCtx peerAddr versionData m a b)))
-> ConnectionId peerAddr
-> Map
     (ConnectionId peerAddr)
     (ConnectionState muxMode initiatorCtx peerAddr versionData m a b)
-> m (Map
        (ConnectionId peerAddr)
        (ConnectionState muxMode initiatorCtx peerAddr versionData m a b))
forall (f :: * -> *) k a.
(Functor f, Ord k) =>
(Maybe a -> f (Maybe a)) -> k -> Map k a -> f (Map k a)
Map.alterF
                  (\case
                    -- connection
                    Maybe
  (ConnectionState muxMode initiatorCtx peerAddr versionData m a b)
Nothing -> do
                      let csMPMHot :: [(MiniProtocolNum,
  MiniProtocolData muxMode initiatorCtx peerAddr m a b)]
csMPMHot =
                            [ ( MiniProtocol
  muxMode initiatorCtx (ResponderContext peerAddr) ByteString m a b
-> MiniProtocolNum
forall (mode :: Mode) initiatorCtx responderCtx bytes (m :: * -> *)
       a b.
MiniProtocol mode initiatorCtx responderCtx bytes m a b
-> MiniProtocolNum
miniProtocolNum MiniProtocol
  muxMode initiatorCtx (ResponderContext peerAddr) ByteString m a b
mpH
                              , MiniProtocol
  muxMode initiatorCtx (ResponderContext peerAddr) ByteString m a b
-> ResponderContext peerAddr
-> ProtocolTemperature
-> MiniProtocolData muxMode initiatorCtx peerAddr m a b
forall (muxMode :: Mode) initiatorCtx peerAddr (m :: * -> *) a b.
MiniProtocol
  muxMode initiatorCtx (ResponderContext peerAddr) ByteString m a b
-> ResponderContext peerAddr
-> ProtocolTemperature
-> MiniProtocolData muxMode initiatorCtx peerAddr m a b
MiniProtocolData MiniProtocol
  muxMode initiatorCtx (ResponderContext peerAddr) ByteString m a b
mpH ResponderContext peerAddr
responderContext ProtocolTemperature
Hot
                              )
                            | MiniProtocol
  muxMode initiatorCtx (ResponderContext peerAddr) ByteString m a b
mpH <- SingProtocolTemperature 'Hot
-> OuroborosBundle
     muxMode initiatorCtx (ResponderContext peerAddr) ByteString m a b
-> [MiniProtocol
      muxMode initiatorCtx (ResponderContext peerAddr) ByteString m a b]
forall (pt :: ProtocolTemperature) a.
SingProtocolTemperature pt -> TemperatureBundle a -> a
projectBundle SingProtocolTemperature 'Hot
SingHot OuroborosBundle
  muxMode initiatorCtx (ResponderContext peerAddr) ByteString m a b
muxBundle
                            ]
                          csMPMWarm :: [(MiniProtocolNum,
  MiniProtocolData muxMode initiatorCtx peerAddr m a b)]
csMPMWarm =
                            [ ( MiniProtocol
  muxMode initiatorCtx (ResponderContext peerAddr) ByteString m a b
-> MiniProtocolNum
forall (mode :: Mode) initiatorCtx responderCtx bytes (m :: * -> *)
       a b.
MiniProtocol mode initiatorCtx responderCtx bytes m a b
-> MiniProtocolNum
miniProtocolNum MiniProtocol
  muxMode initiatorCtx (ResponderContext peerAddr) ByteString m a b
mpW
                              , MiniProtocol
  muxMode initiatorCtx (ResponderContext peerAddr) ByteString m a b
-> ResponderContext peerAddr
-> ProtocolTemperature
-> MiniProtocolData muxMode initiatorCtx peerAddr m a b
forall (muxMode :: Mode) initiatorCtx peerAddr (m :: * -> *) a b.
MiniProtocol
  muxMode initiatorCtx (ResponderContext peerAddr) ByteString m a b
-> ResponderContext peerAddr
-> ProtocolTemperature
-> MiniProtocolData muxMode initiatorCtx peerAddr m a b
MiniProtocolData MiniProtocol
  muxMode initiatorCtx (ResponderContext peerAddr) ByteString m a b
mpW ResponderContext peerAddr
responderContext ProtocolTemperature
Warm
                              )
                            | MiniProtocol
  muxMode initiatorCtx (ResponderContext peerAddr) ByteString m a b
mpW <- SingProtocolTemperature 'Warm
-> OuroborosBundle
     muxMode initiatorCtx (ResponderContext peerAddr) ByteString m a b
-> [MiniProtocol
      muxMode initiatorCtx (ResponderContext peerAddr) ByteString m a b]
forall (pt :: ProtocolTemperature) a.
SingProtocolTemperature pt -> TemperatureBundle a -> a
projectBundle SingProtocolTemperature 'Warm
SingWarm OuroborosBundle
  muxMode initiatorCtx (ResponderContext peerAddr) ByteString m a b
muxBundle
                            ]
                          csMPMEstablished :: [(MiniProtocolNum,
  MiniProtocolData muxMode initiatorCtx peerAddr m a b)]
csMPMEstablished =
                            [ ( MiniProtocol
  muxMode initiatorCtx (ResponderContext peerAddr) ByteString m a b
-> MiniProtocolNum
forall (mode :: Mode) initiatorCtx responderCtx bytes (m :: * -> *)
       a b.
MiniProtocol mode initiatorCtx responderCtx bytes m a b
-> MiniProtocolNum
miniProtocolNum MiniProtocol
  muxMode initiatorCtx (ResponderContext peerAddr) ByteString m a b
mpE
                              , MiniProtocol
  muxMode initiatorCtx (ResponderContext peerAddr) ByteString m a b
-> ResponderContext peerAddr
-> ProtocolTemperature
-> MiniProtocolData muxMode initiatorCtx peerAddr m a b
forall (muxMode :: Mode) initiatorCtx peerAddr (m :: * -> *) a b.
MiniProtocol
  muxMode initiatorCtx (ResponderContext peerAddr) ByteString m a b
-> ResponderContext peerAddr
-> ProtocolTemperature
-> MiniProtocolData muxMode initiatorCtx peerAddr m a b
MiniProtocolData MiniProtocol
  muxMode initiatorCtx (ResponderContext peerAddr) ByteString m a b
mpE ResponderContext peerAddr
responderContext ProtocolTemperature
Established
                              )
                            | MiniProtocol
  muxMode initiatorCtx (ResponderContext peerAddr) ByteString m a b
mpE <- SingProtocolTemperature 'Established
-> OuroborosBundle
     muxMode initiatorCtx (ResponderContext peerAddr) ByteString m a b
-> [MiniProtocol
      muxMode initiatorCtx (ResponderContext peerAddr) ByteString m a b]
forall (pt :: ProtocolTemperature) a.
SingProtocolTemperature pt -> TemperatureBundle a -> a
projectBundle SingProtocolTemperature 'Established
SingEstablished OuroborosBundle
  muxMode initiatorCtx (ResponderContext peerAddr) ByteString m a b
muxBundle
                            ]
                          csMiniProtocolMap :: Map
  MiniProtocolNum
  (MiniProtocolData muxMode initiatorCtx peerAddr m a b)
csMiniProtocolMap =
                              [(MiniProtocolNum,
  MiniProtocolData muxMode initiatorCtx peerAddr m a b)]
-> Map
     MiniProtocolNum
     (MiniProtocolData muxMode initiatorCtx peerAddr m a b)
forall k a. Ord k => [(k, a)] -> Map k a
Map.fromList
                              ([(MiniProtocolNum,
  MiniProtocolData muxMode initiatorCtx peerAddr m a b)]
csMPMHot [(MiniProtocolNum,
  MiniProtocolData muxMode initiatorCtx peerAddr m a b)]
-> [(MiniProtocolNum,
     MiniProtocolData muxMode initiatorCtx peerAddr m a b)]
-> [(MiniProtocolNum,
     MiniProtocolData muxMode initiatorCtx peerAddr m a b)]
forall a. [a] -> [a] -> [a]
++ [(MiniProtocolNum,
  MiniProtocolData muxMode initiatorCtx peerAddr m a b)]
csMPMWarm [(MiniProtocolNum,
  MiniProtocolData muxMode initiatorCtx peerAddr m a b)]
-> [(MiniProtocolNum,
     MiniProtocolData muxMode initiatorCtx peerAddr m a b)]
-> [(MiniProtocolNum,
     MiniProtocolData muxMode initiatorCtx peerAddr m a b)]
forall a. [a] -> [a] -> [a]
++ [(MiniProtocolNum,
  MiniProtocolData muxMode initiatorCtx peerAddr m a b)]
csMPMEstablished)

                      mCompletionMap
                        <-
                        (Maybe (Map MiniProtocolNum (STM m (Either SomeException b)))
 -> MiniProtocolData muxMode initiatorCtx peerAddr m a b
 -> m (Maybe
         (Map MiniProtocolNum (STM m (Either SomeException b)))))
-> Maybe (Map MiniProtocolNum (STM m (Either SomeException b)))
-> Map
     MiniProtocolNum
     (MiniProtocolData muxMode initiatorCtx peerAddr m a b)
-> m (Maybe (Map MiniProtocolNum (STM m (Either SomeException b))))
forall (t :: * -> *) (m :: * -> *) b a.
(Foldable t, Monad m) =>
(b -> a -> m b) -> b -> t a -> m b
foldM
                          (\Maybe (Map MiniProtocolNum (STM m (Either SomeException b)))
acc mpd :: MiniProtocolData muxMode initiatorCtx peerAddr m a b
mpd@MiniProtocolData { MiniProtocol
  muxMode initiatorCtx (ResponderContext peerAddr) ByteString m a b
mpdMiniProtocol :: MiniProtocol
  muxMode initiatorCtx (ResponderContext peerAddr) ByteString m a b
mpdMiniProtocol :: forall (muxMode :: Mode) initiatorCtx peerAddr (m :: * -> *) a b.
MiniProtocolData muxMode initiatorCtx peerAddr m a b
-> MiniProtocol
     muxMode initiatorCtx (ResponderContext peerAddr) ByteString m a b
mpdMiniProtocol } ->
                            Mux muxMode m
-> MiniProtocolData muxMode initiatorCtx peerAddr m a b
-> m (Either SomeException (STM m (Either SomeException b)))
forall (mode :: Mode) initiatorCtx peerAddr (m :: * -> *) a b.
(Alternative (STM m), HasResponder mode ~ 'True, MonadAsync m,
 MonadLabelledSTM m, MonadCatch m, MonadMask m,
 MonadThrow (STM m)) =>
Mux mode m
-> MiniProtocolData mode initiatorCtx peerAddr m a b
-> m (Either SomeException (STM m (Either SomeException b)))
runResponder Mux muxMode m
csMux MiniProtocolData muxMode initiatorCtx peerAddr m a b
mpd m (Either SomeException (STM m (Either SomeException b)))
-> (Either SomeException (STM m (Either SomeException b))
    -> m (Maybe
            (Map MiniProtocolNum (STM m (Either SomeException b)))))
-> m (Maybe (Map MiniProtocolNum (STM m (Either SomeException b))))
forall a b. m a -> (a -> m b) -> m b
forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= \case
                              -- synchronous exceptions when starting
                              -- a mini-protocol are non-recoverable; we
                              -- close the connection and allow the server
                              -- to continue.
                              Left SomeException
err -> do
                                Tracer m (Trace peerAddr) -> Trace peerAddr -> m ()
forall (m :: * -> *) a. Tracer m a -> a -> m ()
traceWith Tracer m (Trace peerAddr)
tracer (ConnectionId peerAddr
-> MiniProtocolNum -> SomeException -> Trace peerAddr
forall peerAddr.
ConnectionId peerAddr
-> MiniProtocolNum -> SomeException -> Trace peerAddr
TrResponderStartFailure ConnectionId peerAddr
connId (MiniProtocol
  muxMode initiatorCtx (ResponderContext peerAddr) ByteString m a b
-> MiniProtocolNum
forall (mode :: Mode) initiatorCtx responderCtx bytes (m :: * -> *)
       a b.
MiniProtocol mode initiatorCtx responderCtx bytes m a b
-> MiniProtocolNum
miniProtocolNum MiniProtocol
  muxMode initiatorCtx (ResponderContext peerAddr) ByteString m a b
mpdMiniProtocol) SomeException
err)
                                Mux muxMode m -> m ()
forall (m :: * -> *) (mode :: Mode).
MonadSTM m =>
Mux mode m -> m ()
Mux.stop Mux muxMode m
csMux
                                Maybe (Map MiniProtocolNum (STM m (Either SomeException b)))
-> m (Maybe (Map MiniProtocolNum (STM m (Either SomeException b))))
forall a. a -> m a
forall (m :: * -> *) a. Monad m => a -> m a
return Maybe (Map MiniProtocolNum (STM m (Either SomeException b)))
forall a. Maybe a
Nothing

                              Right STM m (Either SomeException b)
completion ->  do
                                let acc' :: Maybe (Map MiniProtocolNum (STM m (Either SomeException b)))
acc' = MiniProtocolNum
-> STM m (Either SomeException b)
-> Map MiniProtocolNum (STM m (Either SomeException b))
-> Map MiniProtocolNum (STM m (Either SomeException b))
forall k a. Ord k => k -> a -> Map k a -> Map k a
Map.insert (MiniProtocol
  muxMode initiatorCtx (ResponderContext peerAddr) ByteString m a b
-> MiniProtocolNum
forall (mode :: Mode) initiatorCtx responderCtx bytes (m :: * -> *)
       a b.
MiniProtocol mode initiatorCtx responderCtx bytes m a b
-> MiniProtocolNum
miniProtocolNum MiniProtocol
  muxMode initiatorCtx (ResponderContext peerAddr) ByteString m a b
mpdMiniProtocol)
                                                      STM m (Either SomeException b)
completion
                                       (Map MiniProtocolNum (STM m (Either SomeException b))
 -> Map MiniProtocolNum (STM m (Either SomeException b)))
-> Maybe (Map MiniProtocolNum (STM m (Either SomeException b)))
-> Maybe (Map MiniProtocolNum (STM m (Either SomeException b)))
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> Maybe (Map MiniProtocolNum (STM m (Either SomeException b)))
acc
                                -- force under lazy 'Maybe'
                                case Maybe (Map MiniProtocolNum (STM m (Either SomeException b)))
acc' of
                                  Just !Map MiniProtocolNum (STM m (Either SomeException b))
_ -> Maybe (Map MiniProtocolNum (STM m (Either SomeException b)))
-> m (Maybe (Map MiniProtocolNum (STM m (Either SomeException b))))
forall a. a -> m a
forall (m :: * -> *) a. Monad m => a -> m a
return Maybe (Map MiniProtocolNum (STM m (Either SomeException b)))
acc'
                                  Maybe (Map MiniProtocolNum (STM m (Either SomeException b)))
Nothing -> Maybe (Map MiniProtocolNum (STM m (Either SomeException b)))
-> m (Maybe (Map MiniProtocolNum (STM m (Either SomeException b))))
forall a. a -> m a
forall (m :: * -> *) a. Monad m => a -> m a
return Maybe (Map MiniProtocolNum (STM m (Either SomeException b)))
acc'
                          )
                          (Map MiniProtocolNum (STM m (Either SomeException b))
-> Maybe (Map MiniProtocolNum (STM m (Either SomeException b)))
forall a. a -> Maybe a
Just Map MiniProtocolNum (STM m (Either SomeException b))
forall k a. Map k a
Map.empty)
                          Map
  MiniProtocolNum
  (MiniProtocolData muxMode initiatorCtx peerAddr m a b)
csMiniProtocolMap

                      case mCompletionMap of
                        -- there was an error when starting one of the
                        -- responders, we let the server continue without this
                        -- connection.
                        Maybe (Map MiniProtocolNum (STM m (Either SomeException b)))
Nothing -> Maybe
  (ConnectionState muxMode initiatorCtx peerAddr versionData m a b)
-> m (Maybe
        (ConnectionState muxMode initiatorCtx peerAddr versionData m a b))
forall a. a -> m a
forall (m :: * -> *) a. Monad m => a -> m a
return Maybe
  (ConnectionState muxMode initiatorCtx peerAddr versionData m a b)
forall a. Maybe a
Nothing

                        Just Map MiniProtocolNum (STM m (Either SomeException b))
csCompletionMap -> do
                          mv <- (DiffTime -> m (TVar m Bool))
-> Maybe DiffTime -> m (Maybe (TVar m Bool))
forall (t :: * -> *) (f :: * -> *) a b.
(Traversable t, Applicative f) =>
(a -> f b) -> t a -> f (t b)
forall (f :: * -> *) a b.
Applicative f =>
(a -> f b) -> Maybe a -> f (Maybe b)
traverse DiffTime -> m (TVar m Bool)
forall (m :: * -> *). MonadTimer m => DiffTime -> m (TVar m Bool)
registerDelay Maybe DiffTime
idleTimeout
                          let -- initial state is 'RemoteIdle', if the remote end will not
                              -- start any responders this will unregister the inbound side.
                              csRemoteState :: RemoteState m
                              csRemoteState = STM m Bool -> RemoteState m
forall (m :: * -> *). STM m Bool -> RemoteState m
RemoteIdle (case Maybe (TVar m Bool)
mv of
                                                            Maybe (TVar m Bool)
Nothing -> Bool -> STM m Bool
forall a. a -> STM m a
forall (f :: * -> *) a. Applicative f => a -> f a
pure Bool
False
                                                            Just TVar m Bool
v  -> TVar m Bool -> STM m Bool
forall a. TVar m a -> STM m a
forall (m :: * -> *) a. MonadSTM m => TVar m a -> STM m a
LazySTM.readTVar TVar m Bool
v)

                              connState = ConnectionState {
                                  Mux muxMode m
csMux :: Mux muxMode m
csMux :: Mux muxMode m
csMux,
                                  versionData
csVersionData :: versionData
csVersionData :: versionData
csVersionData,
                                  Map
  MiniProtocolNum
  (MiniProtocolData muxMode initiatorCtx peerAddr m a b)
csMiniProtocolMap :: Map
  MiniProtocolNum
  (MiniProtocolData muxMode initiatorCtx peerAddr m a b)
csMiniProtocolMap :: Map
  MiniProtocolNum
  (MiniProtocolData muxMode initiatorCtx peerAddr m a b)
csMiniProtocolMap,
                                  Map MiniProtocolNum (STM m (Either SomeException b))
csCompletionMap :: Map MiniProtocolNum (STM m (Either SomeException b))
csCompletionMap :: Map MiniProtocolNum (STM m (Either SomeException b))
csCompletionMap,
                                  RemoteState m
csRemoteState :: RemoteState m
csRemoteState :: RemoteState m
csRemoteState
                                }

                          return (Just connState)

                    -- inbound governor might be notified about a connection
                    -- which is already tracked.  In such case we preserve its
                    -- state.
                    --
                    -- In particular we preserve an ongoing timeout on
                    -- 'RemoteIdle' state.
                    Just ConnectionState muxMode initiatorCtx peerAddr versionData m a b
connState -> Maybe
  (ConnectionState muxMode initiatorCtx peerAddr versionData m a b)
-> m (Maybe
        (ConnectionState muxMode initiatorCtx peerAddr versionData m a b))
forall a. a -> m a
forall (m :: * -> *) a. Monad m => a -> m a
return (ConnectionState muxMode initiatorCtx peerAddr versionData m a b
-> Maybe
     (ConnectionState muxMode initiatorCtx peerAddr versionData m a b)
forall a. a -> Maybe a
Just ConnectionState muxMode initiatorCtx peerAddr versionData m a b
connState)

                  )
                  ConnectionId peerAddr
connId
                  (State muxMode initiatorCtx peerAddr versionData m a b
-> Map
     (ConnectionId peerAddr)
     (ConnectionState muxMode initiatorCtx peerAddr versionData m a b)
forall (muxMode :: Mode) initiatorCtx peerAddr versionData
       (m :: * -> *) a b.
State muxMode initiatorCtx peerAddr versionData m a b
-> Map
     (ConnectionId peerAddr)
     (ConnectionState muxMode initiatorCtx peerAddr versionData m a b)
connections State muxMode initiatorCtx peerAddr versionData m a b
state)

                -- update state and continue the recursive loop
                let state' = State muxMode initiatorCtx peerAddr versionData m a b
state {
                        connections,
                        freshDuplexPeers =
                          case dataFlow of
                            DataFlow
Unidirectional -> State muxMode initiatorCtx peerAddr versionData m a b
-> OrdPSQ peerAddr Time versionData
forall (muxMode :: Mode) initiatorCtx peerAddr versionData
       (m :: * -> *) a b.
State muxMode initiatorCtx peerAddr versionData m a b
-> OrdPSQ peerAddr Time versionData
freshDuplexPeers State muxMode initiatorCtx peerAddr versionData m a b
state
                            DataFlow
Duplex         -> peerAddr
-> Time
-> versionData
-> OrdPSQ peerAddr Time versionData
-> OrdPSQ peerAddr Time versionData
forall k p v.
(Ord k, Ord p) =>
k -> p -> v -> OrdPSQ k p v -> OrdPSQ k p v
OrdPSQ.insert (ConnectionId peerAddr -> peerAddr
forall addr. ConnectionId addr -> addr
remoteAddress ConnectionId peerAddr
connId) Time
time versionData
csVersionData
                                                            (State muxMode initiatorCtx peerAddr versionData m a b
-> OrdPSQ peerAddr Time versionData
forall (muxMode :: Mode) initiatorCtx peerAddr versionData
       (m :: * -> *) a b.
State muxMode initiatorCtx peerAddr versionData m a b
-> OrdPSQ peerAddr Time versionData
freshDuplexPeers State muxMode initiatorCtx peerAddr versionData m a b
state)
                      }
                return . Just $ StateWithPeerTransition state' connId

          MuxFinished ConnectionId peerAddr
connId STM m (Maybe SomeException)
result -> do

            merr <- STM m (Maybe SomeException) -> m (Maybe SomeException)
forall a. HasCallStack => STM m a -> m a
forall (m :: * -> *) a.
(MonadSTM m, HasCallStack) =>
STM m a -> m a
atomically STM m (Maybe SomeException)
result
            case merr of
              Maybe SomeException
Nothing  -> Tracer m (Trace peerAddr) -> Trace peerAddr -> m ()
forall (m :: * -> *) a. Tracer m a -> a -> m ()
traceWith Tracer m (Trace peerAddr)
tracer (ConnectionId peerAddr -> Trace peerAddr
forall peerAddr. ConnectionId peerAddr -> Trace peerAddr
TrMuxCleanExit ConnectionId peerAddr
connId)
              Just SomeException
err -> Tracer m (Trace peerAddr) -> Trace peerAddr -> m ()
forall (m :: * -> *) a. Tracer m a -> a -> m ()
traceWith Tracer m (Trace peerAddr)
tracer (ConnectionId peerAddr -> SomeException -> Trace peerAddr
forall peerAddr.
ConnectionId peerAddr -> SomeException -> Trace peerAddr
TrMuxErrored ConnectionId peerAddr
connId SomeException
err)

            -- the connection manager does should realise this on itself.
            -- we bypass the assertion check since MuxFinished could have been
            -- placed on the queue by a racing thread before we managed
            -- to remove the connection from our state at the end of this loop.
            let state' = Bool
-> ConnectionId peerAddr
-> State muxMode initiatorCtx peerAddr versionData m a b
-> State muxMode initiatorCtx peerAddr versionData m a b
forall peerAddr (muxMode :: Mode) initiatorCtx versionData
       (m :: * -> *) a b.
Ord peerAddr =>
Bool
-> ConnectionId peerAddr
-> State muxMode initiatorCtx peerAddr versionData m a b
-> State muxMode initiatorCtx peerAddr versionData m a b
unregisterConnection Bool
True ConnectionId peerAddr
connId State muxMode initiatorCtx peerAddr versionData m a b
state
            return . Just $ StateWithPeerTransition state' connId -- ^ even though it might not be true, but it's benign

          MiniProtocolTerminated
            Terminated {
                ConnectionId peerAddr
tConnId :: ConnectionId peerAddr
tConnId :: forall (muxMode :: Mode) initiatorCtx peerAddr (m :: * -> *) a b.
Terminated muxMode initiatorCtx peerAddr m a b
-> ConnectionId peerAddr
tConnId,
                Mux muxMode m
tMux :: Mux muxMode m
tMux :: forall (muxMode :: Mode) initiatorCtx peerAddr (m :: * -> *) a b.
Terminated muxMode initiatorCtx peerAddr m a b -> Mux muxMode m
tMux,
                tMiniProtocolData :: forall (muxMode :: Mode) initiatorCtx peerAddr (m :: * -> *) a b.
Terminated muxMode initiatorCtx peerAddr m a b
-> MiniProtocolData muxMode initiatorCtx peerAddr m a b
tMiniProtocolData = mpd :: MiniProtocolData muxMode initiatorCtx peerAddr m a b
mpd@MiniProtocolData { mpdMiniProtocol :: forall (muxMode :: Mode) initiatorCtx peerAddr (m :: * -> *) a b.
MiniProtocolData muxMode initiatorCtx peerAddr m a b
-> MiniProtocol
     muxMode initiatorCtx (ResponderContext peerAddr) ByteString m a b
mpdMiniProtocol = MiniProtocol
  muxMode initiatorCtx (ResponderContext peerAddr) ByteString m a b
miniProtocol },
                STM m (Either SomeException b)
tResult :: STM m (Either SomeException b)
tResult :: forall (muxMode :: Mode) initiatorCtx peerAddr (m :: * -> *) a b.
Terminated muxMode initiatorCtx peerAddr m a b
-> STM m (Either SomeException b)
tResult
              } -> do
            tResult' <- STM m (Either SomeException b) -> m (Either SomeException b)
forall a. HasCallStack => STM m a -> m a
forall (m :: * -> *) a.
(MonadSTM m, HasCallStack) =>
STM m a -> m a
atomically STM m (Either SomeException b)
tResult
            let num = MiniProtocol
  muxMode initiatorCtx (ResponderContext peerAddr) ByteString m a b
-> MiniProtocolNum
forall (mode :: Mode) initiatorCtx responderCtx bytes (m :: * -> *)
       a b.
MiniProtocol mode initiatorCtx responderCtx bytes m a b
-> MiniProtocolNum
miniProtocolNum MiniProtocol
  muxMode initiatorCtx (ResponderContext peerAddr) ByteString m a b
miniProtocol
            case tResult' of
              Left SomeException
e -> do
                -- a mini-protocol errored.  In this case mux will shutdown, and
                -- the connection manager will tear down the socket. Before bailing out,
                -- the IG tracer will emit BearState Dead which will unregister the connection
                -- in some following iteration via MuxFinished, but for this peer it should
                -- be the very next message.
                Tracer m (Trace peerAddr) -> Trace peerAddr -> m ()
forall (m :: * -> *) a. Tracer m a -> a -> m ()
traceWith Tracer m (Trace peerAddr)
tracer (Trace peerAddr -> m ()) -> Trace peerAddr -> m ()
forall a b. (a -> b) -> a -> b
$
                  ConnectionId peerAddr
-> MiniProtocolNum -> SomeException -> Trace peerAddr
forall peerAddr.
ConnectionId peerAddr
-> MiniProtocolNum -> SomeException -> Trace peerAddr
TrResponderErrored ConnectionId peerAddr
tConnId MiniProtocolNum
num SomeException
e
                Maybe
  (LoopDecision
     (State muxMode initiatorCtx peerAddr versionData m a b)
     (ConnectionId peerAddr))
-> m (Maybe
        (LoopDecision
           (State muxMode initiatorCtx peerAddr versionData m a b)
           (ConnectionId peerAddr)))
forall a. a -> m a
forall (m :: * -> *) a. Monad m => a -> m a
return Maybe
  (LoopDecision
     (State muxMode initiatorCtx peerAddr versionData m a b)
     (ConnectionId peerAddr))
forall a. Maybe a
Nothing

              Right b
_ ->
                Mux muxMode m
-> MiniProtocolData muxMode initiatorCtx peerAddr m a b
-> m (Either SomeException (STM m (Either SomeException b)))
forall (mode :: Mode) initiatorCtx peerAddr (m :: * -> *) a b.
(Alternative (STM m), HasResponder mode ~ 'True, MonadAsync m,
 MonadLabelledSTM m, MonadCatch m, MonadMask m,
 MonadThrow (STM m)) =>
Mux mode m
-> MiniProtocolData mode initiatorCtx peerAddr m a b
-> m (Either SomeException (STM m (Either SomeException b)))
runResponder Mux muxMode m
tMux MiniProtocolData muxMode initiatorCtx peerAddr m a b
mpd m (Either SomeException (STM m (Either SomeException b)))
-> (Either SomeException (STM m (Either SomeException b))
    -> m (Maybe
            (LoopDecision
               (State muxMode initiatorCtx peerAddr versionData m a b)
               (ConnectionId peerAddr))))
-> m (Maybe
        (LoopDecision
           (State muxMode initiatorCtx peerAddr versionData m a b)
           (ConnectionId peerAddr)))
forall a b. m a -> (a -> m b) -> m b
forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= \case
                  Right STM m (Either SomeException b)
completionAction -> do
                    Tracer m (Trace peerAddr) -> Trace peerAddr -> m ()
forall (m :: * -> *) a. Tracer m a -> a -> m ()
traceWith Tracer m (Trace peerAddr)
tracer (ConnectionId peerAddr -> MiniProtocolNum -> Trace peerAddr
forall peerAddr.
ConnectionId peerAddr -> MiniProtocolNum -> Trace peerAddr
TrResponderRestarted ConnectionId peerAddr
tConnId MiniProtocolNum
num)
                    let state' :: State muxMode initiatorCtx peerAddr versionData m a b
state' = ConnectionId peerAddr
-> MiniProtocolNum
-> STM m (Either SomeException b)
-> State muxMode initiatorCtx peerAddr versionData m a b
-> State muxMode initiatorCtx peerAddr versionData m a b
forall peerAddr (m :: * -> *) b (muxMode :: Mode) initiatorCtx
       versionData a.
Ord peerAddr =>
ConnectionId peerAddr
-> MiniProtocolNum
-> STM m (Either SomeException b)
-> State muxMode initiatorCtx peerAddr versionData m a b
-> State muxMode initiatorCtx peerAddr versionData m a b
updateMiniProtocol ConnectionId peerAddr
tConnId MiniProtocolNum
num STM m (Either SomeException b)
completionAction State muxMode initiatorCtx peerAddr versionData m a b
state
                    Maybe
  (LoopDecision
     (State muxMode initiatorCtx peerAddr versionData m a b)
     (ConnectionId peerAddr))
-> m (Maybe
        (LoopDecision
           (State muxMode initiatorCtx peerAddr versionData m a b)
           (ConnectionId peerAddr)))
forall a. a -> m a
forall (m :: * -> *) a. Monad m => a -> m a
return (Maybe
   (LoopDecision
      (State muxMode initiatorCtx peerAddr versionData m a b)
      (ConnectionId peerAddr))
 -> m (Maybe
         (LoopDecision
            (State muxMode initiatorCtx peerAddr versionData m a b)
            (ConnectionId peerAddr))))
-> (LoopDecision
      (State muxMode initiatorCtx peerAddr versionData m a b)
      (ConnectionId peerAddr)
    -> Maybe
         (LoopDecision
            (State muxMode initiatorCtx peerAddr versionData m a b)
            (ConnectionId peerAddr)))
-> LoopDecision
     (State muxMode initiatorCtx peerAddr versionData m a b)
     (ConnectionId peerAddr)
-> m (Maybe
        (LoopDecision
           (State muxMode initiatorCtx peerAddr versionData m a b)
           (ConnectionId peerAddr)))
forall b c a. (b -> c) -> (a -> b) -> a -> c
. LoopDecision
  (State muxMode initiatorCtx peerAddr versionData m a b)
  (ConnectionId peerAddr)
-> Maybe
     (LoopDecision
        (State muxMode initiatorCtx peerAddr versionData m a b)
        (ConnectionId peerAddr))
forall a. a -> Maybe a
Just (LoopDecision
   (State muxMode initiatorCtx peerAddr versionData m a b)
   (ConnectionId peerAddr)
 -> m (Maybe
         (LoopDecision
            (State muxMode initiatorCtx peerAddr versionData m a b)
            (ConnectionId peerAddr))))
-> LoopDecision
     (State muxMode initiatorCtx peerAddr versionData m a b)
     (ConnectionId peerAddr)
-> m (Maybe
        (LoopDecision
           (State muxMode initiatorCtx peerAddr versionData m a b)
           (ConnectionId peerAddr)))
forall a b. (a -> b) -> a -> b
$ State muxMode initiatorCtx peerAddr versionData m a b
-> LoopDecision
     (State muxMode initiatorCtx peerAddr versionData m a b)
     (ConnectionId peerAddr)
forall state peer. state -> LoopDecision state peer
OnlyStateChange State muxMode initiatorCtx peerAddr versionData m a b
state'

                  Left SomeException
err -> do
                    -- there is no way to recover from synchronous exceptions; we
                    -- stop mux which allows to close resources held by
                    -- connection manager.
                    Tracer m (Trace peerAddr) -> Trace peerAddr -> m ()
forall (m :: * -> *) a. Tracer m a -> a -> m ()
traceWith Tracer m (Trace peerAddr)
tracer (ConnectionId peerAddr
-> MiniProtocolNum -> SomeException -> Trace peerAddr
forall peerAddr.
ConnectionId peerAddr
-> MiniProtocolNum -> SomeException -> Trace peerAddr
TrResponderStartFailure ConnectionId peerAddr
tConnId MiniProtocolNum
num SomeException
err)
                    Mux muxMode m -> m ()
forall (m :: * -> *) (mode :: Mode).
MonadSTM m =>
Mux mode m -> m ()
Mux.stop Mux muxMode m
tMux
                    Maybe
  (LoopDecision
     (State muxMode initiatorCtx peerAddr versionData m a b)
     (ConnectionId peerAddr))
-> m (Maybe
        (LoopDecision
           (State muxMode initiatorCtx peerAddr versionData m a b)
           (ConnectionId peerAddr)))
forall a. a -> m a
forall (m :: * -> *) a. Monad m => a -> m a
return Maybe
  (LoopDecision
     (State muxMode initiatorCtx peerAddr versionData m a b)
     (ConnectionId peerAddr))
forall a. Maybe a
Nothing

          WaitIdleRemote ConnectionId peerAddr
connId -> do
            -- @
            --    DemotedToCold^{dataFlow}_{Remote} : InboundState Duplex
            --                                      → InboundIdleState Duplex
            -- @
            -- NOTE: `demotedToColdRemote` doesn't throw, hence exception handling
            -- is not needed.
            res <- ConnectionManager muxMode socket peerAddr handle handleError m
-> ConnectionId peerAddr -> m (OperationResult AbstractState)
forall (muxMode :: Mode) socket peerAddr handle handleError
       (m :: * -> *).
(HasResponder muxMode ~ 'True) =>
ConnectionManager muxMode socket peerAddr handle handleError m
-> ConnectionId peerAddr -> m (OperationResult AbstractState)
demotedToColdRemote ConnectionManager muxMode socket peerAddr handle handleError m
connectionManager ConnectionId peerAddr
connId
            traceWith tracer (TrWaitIdleRemote connId res)
            case res of
              OperationSuccess {}  -> do
                mv <- (DiffTime -> m (TVar m Bool))
-> Maybe DiffTime -> m (Maybe (TVar m Bool))
forall (t :: * -> *) (f :: * -> *) a b.
(Traversable t, Applicative f) =>
(a -> f b) -> t a -> f (t b)
forall (f :: * -> *) a b.
Applicative f =>
(a -> f b) -> Maybe a -> f (Maybe b)
traverse DiffTime -> m (TVar m Bool)
forall (m :: * -> *). MonadTimer m => DiffTime -> m (TVar m Bool)
registerDelay Maybe DiffTime
idleTimeout
                let timeoutSTM :: STM m Bool
                    !timeoutSTM = case Maybe (TVar m Bool)
mv of
                      Maybe (TVar m Bool)
Nothing -> Bool -> STM m Bool
forall a. a -> STM m a
forall (f :: * -> *) a. Applicative f => a -> f a
pure Bool
False
                      Just TVar m Bool
v  -> TVar m Bool -> STM m Bool
forall a. TVar m a -> STM m a
forall (m :: * -> *) a. MonadSTM m => TVar m a -> STM m a
LazySTM.readTVar TVar m Bool
v

                    state' = ConnectionId peerAddr
-> RemoteState m
-> State muxMode initiatorCtx peerAddr versionData m a b
-> State muxMode initiatorCtx peerAddr versionData m a b
forall peerAddr (m :: * -> *) (muxMode :: Mode) initiatorCtx
       versionData a b.
Ord peerAddr =>
ConnectionId peerAddr
-> RemoteState m
-> State muxMode initiatorCtx peerAddr versionData m a b
-> State muxMode initiatorCtx peerAddr versionData m a b
updateRemoteState ConnectionId peerAddr
connId (STM m Bool -> RemoteState m
forall (m :: * -> *). STM m Bool -> RemoteState m
RemoteIdle STM m Bool
timeoutSTM) State muxMode initiatorCtx peerAddr versionData m a b
state

                return . Just $ StateWithPeerTransition state' connId
              -- if the connection handler failed by this time, it will have
              -- written BearerState Dead to the IG tracer and we will handle this
              -- in MuxFinished case on the next iteration, where it will unregister
              -- the connection
              OperationResult AbstractState
_otherwise -> Maybe
  (LoopDecision
     (State muxMode initiatorCtx peerAddr versionData m a b)
     (ConnectionId peerAddr))
-> m (Maybe
        (LoopDecision
           (State muxMode initiatorCtx peerAddr versionData m a b)
           (ConnectionId peerAddr)))
forall a. a -> m a
forall (m :: * -> *) a. Monad m => a -> m a
return Maybe
  (LoopDecision
     (State muxMode initiatorCtx peerAddr versionData m a b)
     (ConnectionId peerAddr))
forall a. Maybe a
Nothing

          -- @
          --    PromotedToWarm^{Duplex}_{Remote}
          -- @
          -- or
          -- @
          --    Awake^{dataFlow}_{Remote}
          -- @
          --
          -- Note: the 'AwakeRemote' is detected as soon as mux detects any
          -- traffic.  This means that we'll observe this transition also if the
          -- first message that arrives is terminating a mini-protocol.
          AwakeRemote ConnectionId peerAddr
connId -> do
            -- notify the connection manager about the transition
            --
            -- NOTE: `promotedToWarmRemote` doesn't throw, hence exception handling
            -- is not needed.
            res <- ConnectionManager muxMode socket peerAddr handle handleError m
-> ConnectionId peerAddr -> m (OperationResult AbstractState)
forall (muxMode :: Mode) socket peerAddr handle handleError
       (m :: * -> *).
(HasResponder muxMode ~ 'True) =>
ConnectionManager muxMode socket peerAddr handle handleError m
-> ConnectionId peerAddr -> m (OperationResult AbstractState)
promotedToWarmRemote ConnectionManager muxMode socket peerAddr handle handleError m
connectionManager ConnectionId peerAddr
connId
            traceWith tracer (TrPromotedToWarmRemote connId res)

            let state' = ConnectionId peerAddr
-> RemoteState m
-> State muxMode initiatorCtx peerAddr versionData m a b
-> State muxMode initiatorCtx peerAddr versionData m a b
forall peerAddr (m :: * -> *) (muxMode :: Mode) initiatorCtx
       versionData a b.
Ord peerAddr =>
ConnectionId peerAddr
-> RemoteState m
-> State muxMode initiatorCtx peerAddr versionData m a b
-> State muxMode initiatorCtx peerAddr versionData m a b
updateRemoteState
                           ConnectionId peerAddr
connId
                           RemoteState m
forall (m :: * -> *). RemoteState m
RemoteWarm
                           State muxMode initiatorCtx peerAddr versionData m a b
state
            return . Just $ StateWithPeerTransition state' connId

          RemotePromotedToHot ConnectionId peerAddr
connId -> do
            Tracer m (Trace peerAddr) -> Trace peerAddr -> m ()
forall (m :: * -> *) a. Tracer m a -> a -> m ()
traceWith Tracer m (Trace peerAddr)
tracer (ConnectionId peerAddr -> Trace peerAddr
forall peerAddr. ConnectionId peerAddr -> Trace peerAddr
TrPromotedToHotRemote ConnectionId peerAddr
connId)
            let state' :: State muxMode initiatorCtx peerAddr versionData m a b
state' = ConnectionId peerAddr
-> RemoteState m
-> State muxMode initiatorCtx peerAddr versionData m a b
-> State muxMode initiatorCtx peerAddr versionData m a b
forall peerAddr (m :: * -> *) (muxMode :: Mode) initiatorCtx
       versionData a b.
Ord peerAddr =>
ConnectionId peerAddr
-> RemoteState m
-> State muxMode initiatorCtx peerAddr versionData m a b
-> State muxMode initiatorCtx peerAddr versionData m a b
updateRemoteState ConnectionId peerAddr
connId RemoteState m
forall (m :: * -> *). RemoteState m
RemoteHot State muxMode initiatorCtx peerAddr versionData m a b
state
            Maybe
  (LoopDecision
     (State muxMode initiatorCtx peerAddr versionData m a b)
     (ConnectionId peerAddr))
-> m (Maybe
        (LoopDecision
           (State muxMode initiatorCtx peerAddr versionData m a b)
           (ConnectionId peerAddr)))
forall a. a -> m a
forall (m :: * -> *) a. Monad m => a -> m a
return (Maybe
   (LoopDecision
      (State muxMode initiatorCtx peerAddr versionData m a b)
      (ConnectionId peerAddr))
 -> m (Maybe
         (LoopDecision
            (State muxMode initiatorCtx peerAddr versionData m a b)
            (ConnectionId peerAddr))))
-> (LoopDecision
      (State muxMode initiatorCtx peerAddr versionData m a b)
      (ConnectionId peerAddr)
    -> Maybe
         (LoopDecision
            (State muxMode initiatorCtx peerAddr versionData m a b)
            (ConnectionId peerAddr)))
-> LoopDecision
     (State muxMode initiatorCtx peerAddr versionData m a b)
     (ConnectionId peerAddr)
-> m (Maybe
        (LoopDecision
           (State muxMode initiatorCtx peerAddr versionData m a b)
           (ConnectionId peerAddr)))
forall b c a. (b -> c) -> (a -> b) -> a -> c
. LoopDecision
  (State muxMode initiatorCtx peerAddr versionData m a b)
  (ConnectionId peerAddr)
-> Maybe
     (LoopDecision
        (State muxMode initiatorCtx peerAddr versionData m a b)
        (ConnectionId peerAddr))
forall a. a -> Maybe a
Just (LoopDecision
   (State muxMode initiatorCtx peerAddr versionData m a b)
   (ConnectionId peerAddr)
 -> m (Maybe
         (LoopDecision
            (State muxMode initiatorCtx peerAddr versionData m a b)
            (ConnectionId peerAddr))))
-> LoopDecision
     (State muxMode initiatorCtx peerAddr versionData m a b)
     (ConnectionId peerAddr)
-> m (Maybe
        (LoopDecision
           (State muxMode initiatorCtx peerAddr versionData m a b)
           (ConnectionId peerAddr)))
forall a b. (a -> b) -> a -> b
$ State muxMode initiatorCtx peerAddr versionData m a b
-> ConnectionId peerAddr
-> LoopDecision
     (State muxMode initiatorCtx peerAddr versionData m a b)
     (ConnectionId peerAddr)
forall state peer. state -> peer -> LoopDecision state peer
StateWithPeerTransition State muxMode initiatorCtx peerAddr versionData m a b
state' ConnectionId peerAddr
connId

          RemoteDemotedToWarm ConnectionId peerAddr
connId -> do
            Tracer m (Trace peerAddr) -> Trace peerAddr -> m ()
forall (m :: * -> *) a. Tracer m a -> a -> m ()
traceWith Tracer m (Trace peerAddr)
tracer (ConnectionId peerAddr -> Trace peerAddr
forall peerAddr. ConnectionId peerAddr -> Trace peerAddr
TrDemotedToWarmRemote ConnectionId peerAddr
connId)
            let state' :: State muxMode initiatorCtx peerAddr versionData m a b
state' = ConnectionId peerAddr
-> RemoteState m
-> State muxMode initiatorCtx peerAddr versionData m a b
-> State muxMode initiatorCtx peerAddr versionData m a b
forall peerAddr (m :: * -> *) (muxMode :: Mode) initiatorCtx
       versionData a b.
Ord peerAddr =>
ConnectionId peerAddr
-> RemoteState m
-> State muxMode initiatorCtx peerAddr versionData m a b
-> State muxMode initiatorCtx peerAddr versionData m a b
updateRemoteState ConnectionId peerAddr
connId RemoteState m
forall (m :: * -> *). RemoteState m
RemoteWarm State muxMode initiatorCtx peerAddr versionData m a b
state
            Maybe
  (LoopDecision
     (State muxMode initiatorCtx peerAddr versionData m a b)
     (ConnectionId peerAddr))
-> m (Maybe
        (LoopDecision
           (State muxMode initiatorCtx peerAddr versionData m a b)
           (ConnectionId peerAddr)))
forall a. a -> m a
forall (m :: * -> *) a. Monad m => a -> m a
return (Maybe
   (LoopDecision
      (State muxMode initiatorCtx peerAddr versionData m a b)
      (ConnectionId peerAddr))
 -> m (Maybe
         (LoopDecision
            (State muxMode initiatorCtx peerAddr versionData m a b)
            (ConnectionId peerAddr))))
-> (LoopDecision
      (State muxMode initiatorCtx peerAddr versionData m a b)
      (ConnectionId peerAddr)
    -> Maybe
         (LoopDecision
            (State muxMode initiatorCtx peerAddr versionData m a b)
            (ConnectionId peerAddr)))
-> LoopDecision
     (State muxMode initiatorCtx peerAddr versionData m a b)
     (ConnectionId peerAddr)
-> m (Maybe
        (LoopDecision
           (State muxMode initiatorCtx peerAddr versionData m a b)
           (ConnectionId peerAddr)))
forall b c a. (b -> c) -> (a -> b) -> a -> c
. LoopDecision
  (State muxMode initiatorCtx peerAddr versionData m a b)
  (ConnectionId peerAddr)
-> Maybe
     (LoopDecision
        (State muxMode initiatorCtx peerAddr versionData m a b)
        (ConnectionId peerAddr))
forall a. a -> Maybe a
Just (LoopDecision
   (State muxMode initiatorCtx peerAddr versionData m a b)
   (ConnectionId peerAddr)
 -> m (Maybe
         (LoopDecision
            (State muxMode initiatorCtx peerAddr versionData m a b)
            (ConnectionId peerAddr))))
-> LoopDecision
     (State muxMode initiatorCtx peerAddr versionData m a b)
     (ConnectionId peerAddr)
-> m (Maybe
        (LoopDecision
           (State muxMode initiatorCtx peerAddr versionData m a b)
           (ConnectionId peerAddr)))
forall a b. (a -> b) -> a -> b
$ State muxMode initiatorCtx peerAddr versionData m a b
-> ConnectionId peerAddr
-> LoopDecision
     (State muxMode initiatorCtx peerAddr versionData m a b)
     (ConnectionId peerAddr)
forall state peer. state -> peer -> LoopDecision state peer
StateWithPeerTransition State muxMode initiatorCtx peerAddr versionData m a b
state' ConnectionId peerAddr
connId

          CommitRemote ConnectionId peerAddr
connId -> do
            -- NOTE: `releaseInboundConnection` doesn't throw, hence exception
            -- handling is not needed.
            res <- ConnectionManager muxMode socket peerAddr handle handleError m
-> ConnectionId peerAddr
-> m (OperationResult DemotedToColdRemoteTr)
forall (muxMode :: Mode) socket peerAddr handle handleError
       (m :: * -> *).
(HasResponder muxMode ~ 'True) =>
ConnectionManager muxMode socket peerAddr handle handleError m
-> ConnectionId peerAddr
-> m (OperationResult DemotedToColdRemoteTr)
releaseInboundConnection ConnectionManager muxMode socket peerAddr handle handleError m
connectionManager ConnectionId peerAddr
connId
            traceWith tracer $ TrDemotedToColdRemote connId res
            case res of
              OperationSuccess DemotedToColdRemoteTr
transition ->
                case DemotedToColdRemoteTr
transition of
                  -- the following two cases are when the connection was not used
                  -- by p2p-governor, the connection will be closed.
                  DemotedToColdRemoteTr
CommitTr -> do
                    -- @
                    --    Commit^{dataFlow}_{Remote} : InboundIdleState dataFlow
                    --                               → TerminatingState
                    -- @
                    let state' :: State muxMode initiatorCtx peerAddr versionData m a b
state' = Bool
-> ConnectionId peerAddr
-> State muxMode initiatorCtx peerAddr versionData m a b
-> State muxMode initiatorCtx peerAddr versionData m a b
forall peerAddr (muxMode :: Mode) initiatorCtx versionData
       (m :: * -> *) a b.
Ord peerAddr =>
Bool
-> ConnectionId peerAddr
-> State muxMode initiatorCtx peerAddr versionData m a b
-> State muxMode initiatorCtx peerAddr versionData m a b
unregisterConnection Bool
False ConnectionId peerAddr
connId State muxMode initiatorCtx peerAddr versionData m a b
state
                    Maybe
  (LoopDecision
     (State muxMode initiatorCtx peerAddr versionData m a b)
     (ConnectionId peerAddr))
-> m (Maybe
        (LoopDecision
           (State muxMode initiatorCtx peerAddr versionData m a b)
           (ConnectionId peerAddr)))
forall a. a -> m a
forall (m :: * -> *) a. Monad m => a -> m a
return (Maybe
   (LoopDecision
      (State muxMode initiatorCtx peerAddr versionData m a b)
      (ConnectionId peerAddr))
 -> m (Maybe
         (LoopDecision
            (State muxMode initiatorCtx peerAddr versionData m a b)
            (ConnectionId peerAddr))))
-> (LoopDecision
      (State muxMode initiatorCtx peerAddr versionData m a b)
      (ConnectionId peerAddr)
    -> Maybe
         (LoopDecision
            (State muxMode initiatorCtx peerAddr versionData m a b)
            (ConnectionId peerAddr)))
-> LoopDecision
     (State muxMode initiatorCtx peerAddr versionData m a b)
     (ConnectionId peerAddr)
-> m (Maybe
        (LoopDecision
           (State muxMode initiatorCtx peerAddr versionData m a b)
           (ConnectionId peerAddr)))
forall b c a. (b -> c) -> (a -> b) -> a -> c
. LoopDecision
  (State muxMode initiatorCtx peerAddr versionData m a b)
  (ConnectionId peerAddr)
-> Maybe
     (LoopDecision
        (State muxMode initiatorCtx peerAddr versionData m a b)
        (ConnectionId peerAddr))
forall a. a -> Maybe a
Just (LoopDecision
   (State muxMode initiatorCtx peerAddr versionData m a b)
   (ConnectionId peerAddr)
 -> m (Maybe
         (LoopDecision
            (State muxMode initiatorCtx peerAddr versionData m a b)
            (ConnectionId peerAddr))))
-> LoopDecision
     (State muxMode initiatorCtx peerAddr versionData m a b)
     (ConnectionId peerAddr)
-> m (Maybe
        (LoopDecision
           (State muxMode initiatorCtx peerAddr versionData m a b)
           (ConnectionId peerAddr)))
forall a b. (a -> b) -> a -> b
$ State muxMode initiatorCtx peerAddr versionData m a b
-> ConnectionId peerAddr
-> LoopDecision
     (State muxMode initiatorCtx peerAddr versionData m a b)
     (ConnectionId peerAddr)
forall state peer. state -> peer -> LoopDecision state peer
StateWithPeerTransition State muxMode initiatorCtx peerAddr versionData m a b
state' ConnectionId peerAddr
connId

                  -- the connection is still used by p2p-governor, carry on but put
                  -- it in 'RemoteCold' state.  This will ensure we keep ready to
                  -- serve the peer.
                  -- @
                  --    DemotedToCold^{Duplex}_{Remote} : DuplexState
                  --                                    → OutboundState Duplex
                  -- @
                  -- or
                  -- @
                  --    Awake^{Duplex}^{Local} : InboundIdleState Duplex
                  --                           → OutboundState Duplex
                  -- @
                  --
                  -- note: the latter transition is level triggered rather than
                  -- edge triggered. The server state is updated once protocol
                  -- idleness expires rather than as soon as the connection
                  -- manager was requested outbound connection.
                  DemotedToColdRemoteTr
KeepTr -> do
                    let state' :: State muxMode initiatorCtx peerAddr versionData m a b
state' = ConnectionId peerAddr
-> RemoteState m
-> State muxMode initiatorCtx peerAddr versionData m a b
-> State muxMode initiatorCtx peerAddr versionData m a b
forall peerAddr (m :: * -> *) (muxMode :: Mode) initiatorCtx
       versionData a b.
Ord peerAddr =>
ConnectionId peerAddr
-> RemoteState m
-> State muxMode initiatorCtx peerAddr versionData m a b
-> State muxMode initiatorCtx peerAddr versionData m a b
updateRemoteState ConnectionId peerAddr
connId RemoteState m
forall (m :: * -> *). RemoteState m
RemoteCold State muxMode initiatorCtx peerAddr versionData m a b
state
                    Maybe
  (LoopDecision
     (State muxMode initiatorCtx peerAddr versionData m a b)
     (ConnectionId peerAddr))
-> m (Maybe
        (LoopDecision
           (State muxMode initiatorCtx peerAddr versionData m a b)
           (ConnectionId peerAddr)))
forall a. a -> m a
forall (m :: * -> *) a. Monad m => a -> m a
return (Maybe
   (LoopDecision
      (State muxMode initiatorCtx peerAddr versionData m a b)
      (ConnectionId peerAddr))
 -> m (Maybe
         (LoopDecision
            (State muxMode initiatorCtx peerAddr versionData m a b)
            (ConnectionId peerAddr))))
-> (LoopDecision
      (State muxMode initiatorCtx peerAddr versionData m a b)
      (ConnectionId peerAddr)
    -> Maybe
         (LoopDecision
            (State muxMode initiatorCtx peerAddr versionData m a b)
            (ConnectionId peerAddr)))
-> LoopDecision
     (State muxMode initiatorCtx peerAddr versionData m a b)
     (ConnectionId peerAddr)
-> m (Maybe
        (LoopDecision
           (State muxMode initiatorCtx peerAddr versionData m a b)
           (ConnectionId peerAddr)))
forall b c a. (b -> c) -> (a -> b) -> a -> c
. LoopDecision
  (State muxMode initiatorCtx peerAddr versionData m a b)
  (ConnectionId peerAddr)
-> Maybe
     (LoopDecision
        (State muxMode initiatorCtx peerAddr versionData m a b)
        (ConnectionId peerAddr))
forall a. a -> Maybe a
Just (LoopDecision
   (State muxMode initiatorCtx peerAddr versionData m a b)
   (ConnectionId peerAddr)
 -> m (Maybe
         (LoopDecision
            (State muxMode initiatorCtx peerAddr versionData m a b)
            (ConnectionId peerAddr))))
-> LoopDecision
     (State muxMode initiatorCtx peerAddr versionData m a b)
     (ConnectionId peerAddr)
-> m (Maybe
        (LoopDecision
           (State muxMode initiatorCtx peerAddr versionData m a b)
           (ConnectionId peerAddr)))
forall a b. (a -> b) -> a -> b
$ State muxMode initiatorCtx peerAddr versionData m a b
-> ConnectionId peerAddr
-> LoopDecision
     (State muxMode initiatorCtx peerAddr versionData m a b)
     (ConnectionId peerAddr)
forall state peer. state -> peer -> LoopDecision state peer
StateWithPeerTransition State muxMode initiatorCtx peerAddr versionData m a b
state' ConnectionId peerAddr
connId

              OperationResult DemotedToColdRemoteTr
_otherwise -> Maybe
  (LoopDecision
     (State muxMode initiatorCtx peerAddr versionData m a b)
     (ConnectionId peerAddr))
-> m (Maybe
        (LoopDecision
           (State muxMode initiatorCtx peerAddr versionData m a b)
           (ConnectionId peerAddr)))
forall a. a -> m a
forall (m :: * -> *) a. Monad m => a -> m a
return Maybe
  (LoopDecision
     (State muxMode initiatorCtx peerAddr versionData m a b)
     (ConnectionId peerAddr))
forall a. Maybe a
Nothing

          MaturedDuplexPeers Map peerAddr versionData
newMatureDuplexPeers OrdPSQ peerAddr Time versionData
freshDuplexPeers -> do
            Tracer m (Trace peerAddr) -> Trace peerAddr -> m ()
forall (m :: * -> *) a. Tracer m a -> a -> m ()
traceWith Tracer m (Trace peerAddr)
tracer (Trace peerAddr -> m ()) -> Trace peerAddr -> m ()
forall a b. (a -> b) -> a -> b
$ Set peerAddr -> Set peerAddr -> Trace peerAddr
forall peerAddr. Set peerAddr -> Set peerAddr -> Trace peerAddr
TrMaturedConnections (Map peerAddr versionData -> Set peerAddr
forall k a. Map k a -> Set k
Map.keysSet Map peerAddr versionData
newMatureDuplexPeers)
                                                    ([peerAddr] -> Set peerAddr
forall a. Ord a => [a] -> Set a
Set.fromList ([peerAddr] -> Set peerAddr) -> [peerAddr] -> Set peerAddr
forall a b. (a -> b) -> a -> b
$ OrdPSQ peerAddr Time versionData -> [peerAddr]
forall k p v. OrdPSQ k p v -> [k]
OrdPSQ.keys OrdPSQ peerAddr Time versionData
freshDuplexPeers)
            Maybe
  (LoopDecision
     (State muxMode initiatorCtx peerAddr versionData m a b)
     (ConnectionId peerAddr))
-> m (Maybe
        (LoopDecision
           (State muxMode initiatorCtx peerAddr versionData m a b)
           (ConnectionId peerAddr)))
forall a. a -> m a
forall (m :: * -> *) a. Monad m => a -> m a
return (Maybe
   (LoopDecision
      (State muxMode initiatorCtx peerAddr versionData m a b)
      (ConnectionId peerAddr))
 -> m (Maybe
         (LoopDecision
            (State muxMode initiatorCtx peerAddr versionData m a b)
            (ConnectionId peerAddr))))
-> (LoopDecision
      (State muxMode initiatorCtx peerAddr versionData m a b)
      (ConnectionId peerAddr)
    -> Maybe
         (LoopDecision
            (State muxMode initiatorCtx peerAddr versionData m a b)
            (ConnectionId peerAddr)))
-> LoopDecision
     (State muxMode initiatorCtx peerAddr versionData m a b)
     (ConnectionId peerAddr)
-> m (Maybe
        (LoopDecision
           (State muxMode initiatorCtx peerAddr versionData m a b)
           (ConnectionId peerAddr)))
forall b c a. (b -> c) -> (a -> b) -> a -> c
. LoopDecision
  (State muxMode initiatorCtx peerAddr versionData m a b)
  (ConnectionId peerAddr)
-> Maybe
     (LoopDecision
        (State muxMode initiatorCtx peerAddr versionData m a b)
        (ConnectionId peerAddr))
forall a. a -> Maybe a
Just (LoopDecision
   (State muxMode initiatorCtx peerAddr versionData m a b)
   (ConnectionId peerAddr)
 -> m (Maybe
         (LoopDecision
            (State muxMode initiatorCtx peerAddr versionData m a b)
            (ConnectionId peerAddr))))
-> LoopDecision
     (State muxMode initiatorCtx peerAddr versionData m a b)
     (ConnectionId peerAddr)
-> m (Maybe
        (LoopDecision
           (State muxMode initiatorCtx peerAddr versionData m a b)
           (ConnectionId peerAddr)))
forall a b. (a -> b) -> a -> b
$ State muxMode initiatorCtx peerAddr versionData m a b
-> LoopDecision
     (State muxMode initiatorCtx peerAddr versionData m a b)
     (ConnectionId peerAddr)
forall state peer. state -> LoopDecision state peer
OnlyStateChange State muxMode initiatorCtx peerAddr versionData m a b
state { matureDuplexPeers = newMatureDuplexPeers
                                                               <> matureDuplexPeers state,
                                                    freshDuplexPeers }

          Event
  muxMode
  (Handle
     muxMode
     initiatorCtx
     (ResponderContext peerAddr)
     versionData
     ByteString
     m
     a
     b)
  initiatorCtx
  peerAddr
  versionData
  m
  a
  b
InactivityTimeout -> do
            Tracer m (Trace peerAddr) -> Trace peerAddr -> m ()
forall (m :: * -> *) a. Tracer m a -> a -> m ()
traceWith Tracer m (Trace peerAddr)
tracer (Trace peerAddr -> m ()) -> Trace peerAddr -> m ()
forall a b. (a -> b) -> a -> b
$ [(peerAddr, Time)] -> Trace peerAddr
forall peerAddr. [(peerAddr, Time)] -> Trace peerAddr
TrInactive ((\(peerAddr
a,Time
b,versionData
_) -> (peerAddr
a,Time
b)) ((peerAddr, Time, versionData) -> (peerAddr, Time))
-> [(peerAddr, Time, versionData)] -> [(peerAddr, Time)]
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> OrdPSQ peerAddr Time versionData -> [(peerAddr, Time, versionData)]
forall k p v. OrdPSQ k p v -> [(k, p, v)]
OrdPSQ.toList (State muxMode initiatorCtx peerAddr versionData m a b
-> OrdPSQ peerAddr Time versionData
forall (muxMode :: Mode) initiatorCtx peerAddr versionData
       (m :: * -> *) a b.
State muxMode initiatorCtx peerAddr versionData m a b
-> OrdPSQ peerAddr Time versionData
freshDuplexPeers State muxMode initiatorCtx peerAddr versionData m a b
state))
            Maybe
  (LoopDecision
     (State muxMode initiatorCtx peerAddr versionData m a b)
     (ConnectionId peerAddr))
-> m (Maybe
        (LoopDecision
           (State muxMode initiatorCtx peerAddr versionData m a b)
           (ConnectionId peerAddr)))
forall a. a -> m a
forall (m :: * -> *) a. Monad m => a -> m a
return Maybe
  (LoopDecision
     (State muxMode initiatorCtx peerAddr versionData m a b)
     (ConnectionId peerAddr))
forall a. Maybe a
Nothing

        mask_ $ do
          case decision of
            Just (OnlyStateChange State muxMode initiatorCtx peerAddr versionData m a b
state') -> do
              STM m () -> m ()
forall a. HasCallStack => STM m a -> m a
forall (m :: * -> *) a.
(MonadSTM m, HasCallStack) =>
STM m a -> m a
atomically (STM m () -> m ()) -> STM m () -> m ()
forall a b. (a -> b) -> a -> b
$ StrictTVar
  m (State muxMode initiatorCtx peerAddr versionData m a b)
-> State muxMode initiatorCtx peerAddr versionData m a b
-> STM m ()
forall (m :: * -> *) a.
MonadSTM m =>
StrictTVar m a -> a -> STM m ()
writeTVar StrictTVar
  m (State muxMode initiatorCtx peerAddr versionData m a b)
stateVar State muxMode initiatorCtx peerAddr versionData m a b
state'
              Tracer m (Debug peerAddr versionData)
-> Debug peerAddr versionData -> m ()
forall (m :: * -> *) a. Tracer m a -> a -> m ()
traceWith Tracer m (Debug peerAddr versionData)
debugTracer (State muxMode initiatorCtx peerAddr versionData m a b
-> Debug peerAddr versionData
forall peerAddr versionData (muxMode :: Mode) initiatorCtx
       (m :: * -> *) a b.
State muxMode initiatorCtx peerAddr versionData m a b
-> Debug peerAddr versionData
Debug State muxMode initiatorCtx peerAddr versionData m a b
state')
            Just (StateWithPeerTransition State muxMode initiatorCtx peerAddr versionData m a b
state' ConnectionId peerAddr
p) -> do
              STM m () -> m ()
forall a. HasCallStack => STM m a -> m a
forall (m :: * -> *) a.
(MonadSTM m, HasCallStack) =>
STM m a -> m a
atomically (STM m () -> m ()) -> STM m () -> m ()
forall a b. (a -> b) -> a -> b
$ StrictTVar
  m (State muxMode initiatorCtx peerAddr versionData m a b)
-> State muxMode initiatorCtx peerAddr versionData m a b
-> STM m ()
forall (m :: * -> *) a.
MonadSTM m =>
StrictTVar m a -> a -> STM m ()
writeTVar StrictTVar
  m (State muxMode initiatorCtx peerAddr versionData m a b)
stateVar State muxMode initiatorCtx peerAddr versionData m a b
state'
              Tracer m (Debug peerAddr versionData)
-> Debug peerAddr versionData -> m ()
forall (m :: * -> *) a. Tracer m a -> a -> m ()
traceWith Tracer m (Debug peerAddr versionData)
debugTracer (State muxMode initiatorCtx peerAddr versionData m a b
-> Debug peerAddr versionData
forall peerAddr versionData (muxMode :: Mode) initiatorCtx
       (m :: * -> *) a b.
State muxMode initiatorCtx peerAddr versionData m a b
-> Debug peerAddr versionData
Debug State muxMode initiatorCtx peerAddr versionData m a b
state')
              Tracer m (RemoteTransitionTrace peerAddr)
-> RemoteTransitionTrace peerAddr -> m ()
forall (m :: * -> *) a. Tracer m a -> a -> m ()
traceWith Tracer m (RemoteTransitionTrace peerAddr)
trTracer (ConnectionId peerAddr
-> State muxMode initiatorCtx peerAddr versionData m a b
-> State muxMode initiatorCtx peerAddr versionData m a b
-> RemoteTransitionTrace peerAddr
forall peerAddr (muxMode :: Mode) initiatorCtx versionData
       (m :: * -> *) a b.
Ord peerAddr =>
ConnectionId peerAddr
-> State muxMode initiatorCtx peerAddr versionData m a b
-> State muxMode initiatorCtx peerAddr versionData m a b
-> RemoteTransitionTrace peerAddr
mkRemoteTransitionTrace ConnectionId peerAddr
p State muxMode initiatorCtx peerAddr versionData m a b
state State muxMode initiatorCtx peerAddr versionData m a b
state')
            Maybe
  (LoopDecision
     (State muxMode initiatorCtx peerAddr versionData m a b)
     (ConnectionId peerAddr))
_otherwise -> () -> m ()
forall a. a -> m a
forall (f :: * -> *) a. Applicative f => a -> f a
pure ()

        case decision of
          Maybe
  (LoopDecision
     (State muxMode initiatorCtx peerAddr versionData m a b)
     (ConnectionId peerAddr))
_ | Just State muxMode initiatorCtx peerAddr versionData m a b
state' <- Maybe (State muxMode initiatorCtx peerAddr versionData m a b)
withState -> do
                (Counters -> Trace peerAddr)
-> Tracer m (Trace peerAddr) -> Cache Counters -> Counters -> m ()
forall (m :: * -> *) a b.
(Applicative m, Eq a) =>
(a -> b) -> Tracer m b -> Cache a -> a -> m ()
mapTraceWithCache Counters -> Trace peerAddr
forall peerAddr. Counters -> Trace peerAddr
TrInboundGovernorCounters
                                  Tracer m (Trace peerAddr)
tracer
                                  (State muxMode initiatorCtx peerAddr versionData m a b
-> Cache Counters
forall (muxMode :: Mode) initiatorCtx peerAddr versionData
       (m :: * -> *) a b.
State muxMode initiatorCtx peerAddr versionData m a b
-> Cache Counters
countersCache State muxMode initiatorCtx peerAddr versionData m a b
state')
                                  (State muxMode initiatorCtx peerAddr versionData m a b -> Counters
forall (muxMode :: Mode) initiatorCtx peerAddr versionData
       (m :: * -> *) a b.
State muxMode initiatorCtx peerAddr versionData m a b -> Counters
counters State muxMode initiatorCtx peerAddr versionData m a b
state')
                Tracer m (Trace peerAddr) -> Trace peerAddr -> m ()
forall (m :: * -> *) a. Tracer m a -> a -> m ()
traceWith Tracer m (Trace peerAddr)
tracer (Trace peerAddr -> m ()) -> Trace peerAddr -> m ()
forall a b. (a -> b) -> a -> b
$ Map (ConnectionId peerAddr) RemoteSt -> Trace peerAddr
forall peerAddr.
Map (ConnectionId peerAddr) RemoteSt -> Trace peerAddr
TrRemoteState (Map (ConnectionId peerAddr) RemoteSt -> Trace peerAddr)
-> Map (ConnectionId peerAddr) RemoteSt -> Trace peerAddr
forall a b. (a -> b) -> a -> b
$
                      RemoteState m -> RemoteSt
forall (m :: * -> *). RemoteState m -> RemoteSt
mkRemoteSt (RemoteState m -> RemoteSt)
-> (ConnectionState muxMode initiatorCtx peerAddr versionData m a b
    -> RemoteState m)
-> ConnectionState muxMode initiatorCtx peerAddr versionData m a b
-> RemoteSt
forall b c a. (b -> c) -> (a -> b) -> a -> c
. ConnectionState muxMode initiatorCtx peerAddr versionData m a b
-> RemoteState m
forall (muxMode :: Mode) initiatorCtx peerAddr versionData
       (m :: * -> *) a b.
ConnectionState muxMode initiatorCtx peerAddr versionData m a b
-> RemoteState m
csRemoteState
                  (ConnectionState muxMode initiatorCtx peerAddr versionData m a b
 -> RemoteSt)
-> Map
     (ConnectionId peerAddr)
     (ConnectionState muxMode initiatorCtx peerAddr versionData m a b)
-> Map (ConnectionId peerAddr) RemoteSt
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> State muxMode initiatorCtx peerAddr versionData m a b
-> Map
     (ConnectionId peerAddr)
     (ConnectionState muxMode initiatorCtx peerAddr versionData m a b)
forall (muxMode :: Mode) initiatorCtx peerAddr versionData
       (m :: * -> *) a b.
State muxMode initiatorCtx peerAddr versionData m a b
-> Map
     (ConnectionId peerAddr)
     (ConnectionState muxMode initiatorCtx peerAddr versionData m a b)
connections State muxMode initiatorCtx peerAddr versionData m a b
state'

                -- Update Inbound Governor Counters cache values
                let newCounters :: Counters
newCounters       = State muxMode initiatorCtx peerAddr versionData m a b -> Counters
forall (muxMode :: Mode) initiatorCtx peerAddr versionData
       (m :: * -> *) a b.
State muxMode initiatorCtx peerAddr versionData m a b -> Counters
counters State muxMode initiatorCtx peerAddr versionData m a b
state'
                    Cache Counters
oldCounters = State muxMode initiatorCtx peerAddr versionData m a b
-> Cache Counters
forall (muxMode :: Mode) initiatorCtx peerAddr versionData
       (m :: * -> *) a b.
State muxMode initiatorCtx peerAddr versionData m a b
-> Cache Counters
countersCache State muxMode initiatorCtx peerAddr versionData m a b
state'
                    state'' :: State muxMode initiatorCtx peerAddr versionData m a b
state'' | Counters
newCounters Counters -> Counters -> Bool
forall a. Eq a => a -> a -> Bool
/= Counters
oldCounters = State muxMode initiatorCtx peerAddr versionData m a b
state' { countersCache = Cache newCounters }
                            | Bool
otherwise                 = State muxMode initiatorCtx peerAddr versionData m a b
state'

                STM m () -> m ()
forall a. HasCallStack => STM m a -> m a
forall (m :: * -> *) a.
(MonadSTM m, HasCallStack) =>
STM m a -> m a
atomically (STM m () -> m ()) -> STM m () -> m ()
forall a b. (a -> b) -> a -> b
$ StrictTVar
  m (State muxMode initiatorCtx peerAddr versionData m a b)
-> State muxMode initiatorCtx peerAddr versionData m a b
-> STM m ()
forall (m :: * -> *) a.
MonadSTM m =>
StrictTVar m a -> a -> STM m ()
writeTVar StrictTVar
  m (State muxMode initiatorCtx peerAddr versionData m a b)
stateVar State muxMode initiatorCtx peerAddr versionData m a b
state''
            where
              withState :: Maybe (State muxMode initiatorCtx peerAddr versionData m a b)
withState = case Maybe
  (LoopDecision
     (State muxMode initiatorCtx peerAddr versionData m a b)
     (ConnectionId peerAddr))
decision of
                Just (OnlyStateChange State muxMode initiatorCtx peerAddr versionData m a b
s)            -> State muxMode initiatorCtx peerAddr versionData m a b
-> Maybe (State muxMode initiatorCtx peerAddr versionData m a b)
forall a. a -> Maybe a
Just State muxMode initiatorCtx peerAddr versionData m a b
s
                Just (StateWithPeerTransition State muxMode initiatorCtx peerAddr versionData m a b
s ConnectionId peerAddr
_p) -> State muxMode initiatorCtx peerAddr versionData m a b
-> Maybe (State muxMode initiatorCtx peerAddr versionData m a b)
forall a. a -> Maybe a
Just State muxMode initiatorCtx peerAddr versionData m a b
s
                Maybe
  (LoopDecision
     (State muxMode initiatorCtx peerAddr versionData m a b)
     (ConnectionId peerAddr))
_otherwise                          -> Maybe (State muxMode initiatorCtx peerAddr versionData m a b)
forall a. Maybe a
Nothing

          Maybe
  (LoopDecision
     (State muxMode initiatorCtx peerAddr versionData m a b)
     (ConnectionId peerAddr))
_otherwise -> () -> m ()
forall a. a -> m a
forall (m :: * -> *) a. Monad m => a -> m a
return ()

-- | The tracer embedded with the mux tracer by the connection handler
-- for inbound or outbound duplex connections for efficient tracking
-- of inbound governor transitions for a given peer
--
inboundGovernorMuxTracer
  :: (MonadSTM m, Ord peerAddr)
  => InboundGovernorInfoChannel muxMode initiatorCtx peerAddr versionData ByteString m a b
  -> (versionData -> DataFlow)
  -> StrictTVar m (State muxMode initiatorCtx peerAddr versionData m a b)
  -> StrictTVar m Bool
  -> StrictTVar m (StrictMaybe ResponderCounters)
  -> Tracer m (Mux.WithBearer (ConnectionId peerAddr) Mux.Trace)
inboundGovernorMuxTracer :: forall (m :: * -> *) peerAddr (muxMode :: Mode) initiatorCtx
       versionData a b.
(MonadSTM m, Ord peerAddr) =>
InboundGovernorInfoChannel
  muxMode initiatorCtx peerAddr versionData ByteString m a b
-> (versionData -> DataFlow)
-> StrictTVar
     m (State muxMode initiatorCtx peerAddr versionData m a b)
-> StrictTVar m Bool
-> StrictTVar m (StrictMaybe ResponderCounters)
-> Tracer m (WithBearer (ConnectionId peerAddr) Trace)
inboundGovernorMuxTracer InboundGovernorInfoChannel
  muxMode initiatorCtx peerAddr versionData ByteString m a b
infoChannel versionData -> DataFlow
connectionDataFlow StrictTVar
  m (State muxMode initiatorCtx peerAddr versionData m a b)
stateVar StrictTVar m Bool
activeVar StrictTVar m (StrictMaybe ResponderCounters)
countersVar =
  (WithBearer (ConnectionId peerAddr) Trace -> m ())
-> Tracer m (WithBearer (ConnectionId peerAddr) Trace)
forall (m :: * -> *) a. (a -> m ()) -> Tracer m a
Tracer \(Mux.WithBearer ConnectionId peerAddr
peer Trace
trace) -> do
    -- hello from muxer main thread
    -- code here is running in the context of the connection handler/muxer
    -- so care must be taken not to deadlock ourselves
    active <- StrictTVar m Bool -> m Bool
forall (m :: * -> *) a. MonadSTM m => StrictTVar m a -> m a
readTVarIO StrictTVar m Bool
activeVar
    case (trace, active) of
      (Trace
_, Bool
True) | Just MiniProtocolNum
miniProtocolNum <- Trace -> Maybe MiniProtocolNum
miniProtocolStarted Trace
trace -> STM m () -> m ()
forall a. HasCallStack => STM m a -> m a
forall (m :: * -> *) a.
(MonadSTM m, HasCallStack) =>
STM m a -> m a
atomically do
        connections <- State muxMode initiatorCtx peerAddr versionData m a b
-> Map
     (ConnectionId peerAddr)
     (ConnectionState muxMode initiatorCtx peerAddr versionData m a b)
forall (muxMode :: Mode) initiatorCtx peerAddr versionData
       (m :: * -> *) a b.
State muxMode initiatorCtx peerAddr versionData m a b
-> Map
     (ConnectionId peerAddr)
     (ConnectionState muxMode initiatorCtx peerAddr versionData m a b)
connections (State muxMode initiatorCtx peerAddr versionData m a b
 -> Map
      (ConnectionId peerAddr)
      (ConnectionState muxMode initiatorCtx peerAddr versionData m a b))
-> STM m (State muxMode initiatorCtx peerAddr versionData m a b)
-> STM
     m
     (Map
        (ConnectionId peerAddr)
        (ConnectionState muxMode initiatorCtx peerAddr versionData m a b))
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> StrictTVar
  m (State muxMode initiatorCtx peerAddr versionData m a b)
-> STM m (State muxMode initiatorCtx peerAddr versionData m a b)
forall (m :: * -> *) a. MonadSTM m => StrictTVar m a -> STM m a
readTVar StrictTVar
  m (State muxMode initiatorCtx peerAddr versionData m a b)
stateVar
        mCounters   <- readTVar countersVar
        case (Map.lookup peer connections, mCounters) of
          (Just (ConnectionState { RemoteState m
csRemoteState :: forall (muxMode :: Mode) initiatorCtx peerAddr versionData
       (m :: * -> *) a b.
ConnectionState muxMode initiatorCtx peerAddr versionData m a b
-> RemoteState m
csRemoteState :: RemoteState m
csRemoteState, Map
  MiniProtocolNum
  (MiniProtocolData muxMode initiatorCtx peerAddr m a b)
csMiniProtocolMap :: forall (muxMode :: Mode) initiatorCtx peerAddr versionData
       (m :: * -> *) a b.
ConnectionState muxMode initiatorCtx peerAddr versionData m a b
-> Map
     MiniProtocolNum
     (MiniProtocolData muxMode initiatorCtx peerAddr m a b)
csMiniProtocolMap :: Map
  MiniProtocolNum
  (MiniProtocolData muxMode initiatorCtx peerAddr m a b)
csMiniProtocolMap }),
           SJust rc :: ResponderCounters
rc@ResponderCounters { Int
numTraceHotResponders :: Int
numTraceHotResponders :: ResponderCounters -> Int
numTraceHotResponders,
                                        Int
numTraceNonHotResponders :: Int
numTraceNonHotResponders :: ResponderCounters -> Int
numTraceNonHotResponders }) -> do
            let miniProtocolTemp :: ProtocolTemperature
miniProtocolTemp = MiniProtocolNum
-> Map
     MiniProtocolNum
     (MiniProtocolData muxMode initiatorCtx peerAddr m a b)
-> ProtocolTemperature
forall {k} {muxMode :: Mode} {initiatorCtx} {peerAddr}
       {m :: * -> *} {a} {b}.
Ord k =>
k
-> Map k (MiniProtocolData muxMode initiatorCtx peerAddr m a b)
-> ProtocolTemperature
getProtocolTemp MiniProtocolNum
miniProtocolNum Map
  MiniProtocolNum
  (MiniProtocolData muxMode initiatorCtx peerAddr m a b)
csMiniProtocolMap
                commit :: STM m ()
commit = do
                  case (Int
numTraceNonHotResponders, Int
numTraceHotResponders) of
                    (Int
0, Int
0) -> InboundGovernorInfoChannel
  muxMode initiatorCtx peerAddr versionData ByteString m a b
-> Event
     muxMode
     (Handle
        muxMode
        initiatorCtx
        (ResponderContext peerAddr)
        versionData
        ByteString
        m
        a
        b)
     initiatorCtx
     peerAddr
     versionData
     m
     a
     b
-> STM m ()
forall a (m :: * -> *). InformationChannel a m -> a -> STM m ()
InfoChannel.writeMessage InboundGovernorInfoChannel
  muxMode initiatorCtx peerAddr versionData ByteString m a b
infoChannel
                                                       (ConnectionId peerAddr
-> Event
     muxMode
     (Handle
        muxMode
        initiatorCtx
        (ResponderContext peerAddr)
        versionData
        ByteString
        m
        a
        b)
     initiatorCtx
     peerAddr
     versionData
     m
     a
     b
forall (muxMode :: Mode) handle initiatorCtx peerAddr versionData
       (m :: * -> *) a b.
ConnectionId peerAddr
-> Event muxMode handle initiatorCtx peerAddr versionData m a b
AwakeRemote ConnectionId peerAddr
peer)
                    (Int, Int)
_      -> () -> STM m ()
forall a. a -> STM m a
forall (f :: * -> *) a. Applicative f => a -> f a
pure ()
                  case (ProtocolTemperature
miniProtocolTemp, Int
numTraceHotResponders) of
                    (ProtocolTemperature
Hot, Int
0) -> InboundGovernorInfoChannel
  muxMode initiatorCtx peerAddr versionData ByteString m a b
-> Event
     muxMode
     (Handle
        muxMode
        initiatorCtx
        (ResponderContext peerAddr)
        versionData
        ByteString
        m
        a
        b)
     initiatorCtx
     peerAddr
     versionData
     m
     a
     b
-> STM m ()
forall a (m :: * -> *). InformationChannel a m -> a -> STM m ()
InfoChannel.writeMessage InboundGovernorInfoChannel
  muxMode initiatorCtx peerAddr versionData ByteString m a b
infoChannel
                                                         (ConnectionId peerAddr
-> Event
     muxMode
     (Handle
        muxMode
        initiatorCtx
        (ResponderContext peerAddr)
        versionData
        ByteString
        m
        a
        b)
     initiatorCtx
     peerAddr
     versionData
     m
     a
     b
forall (muxMode :: Mode) handle initiatorCtx peerAddr versionData
       (m :: * -> *) a b.
ConnectionId peerAddr
-> Event muxMode handle initiatorCtx peerAddr versionData m a b
RemotePromotedToHot ConnectionId peerAddr
peer)
                    (ProtocolTemperature, Int)
_        -> () -> STM m ()
forall a. a -> STM m a
forall (f :: * -> *) a. Applicative f => a -> f a
pure ()
                  case ProtocolTemperature
miniProtocolTemp of
                    ProtocolTemperature
Hot    -> StrictTVar m (StrictMaybe ResponderCounters)
-> StrictMaybe ResponderCounters -> STM m ()
forall (m :: * -> *) a.
MonadSTM m =>
StrictTVar m a -> a -> STM m ()
writeTVar StrictTVar m (StrictMaybe ResponderCounters)
countersVar (StrictMaybe ResponderCounters -> STM m ())
-> StrictMaybe ResponderCounters -> STM m ()
forall a b. (a -> b) -> a -> b
$
                                ResponderCounters -> StrictMaybe ResponderCounters
forall a. a -> StrictMaybe a
SJust ResponderCounters
rc { numTraceHotResponders =
                                             succ numTraceHotResponders }
                    ProtocolTemperature
_orNot -> StrictTVar m (StrictMaybe ResponderCounters)
-> StrictMaybe ResponderCounters -> STM m ()
forall (m :: * -> *) a.
MonadSTM m =>
StrictTVar m a -> a -> STM m ()
writeTVar StrictTVar m (StrictMaybe ResponderCounters)
countersVar (StrictMaybe ResponderCounters -> STM m ())
-> StrictMaybe ResponderCounters -> STM m ()
forall a b. (a -> b) -> a -> b
$
                                ResponderCounters -> StrictMaybe ResponderCounters
forall a. a -> StrictMaybe a
SJust ResponderCounters
rc { numTraceNonHotResponders =
                                             succ numTraceNonHotResponders }

            case RemoteState m
csRemoteState of
              -- we retry on expired because we let the IG
              -- loop handle this peer. If the connection is released,
              -- and CM reports CommitTr, this peer will disappear
              -- from the connections so on retry we will hit the
              -- _otherwise clause instead and promotion will fail,
              -- as it should. Otherwise, if KeepTr is returned,
              -- we can handle 'AwakeRemote' from this peer.
              RemoteIdle STM m Bool
timeoutSTM -> do
                expired <- STM m Bool
timeoutSTM
                if expired then retry else commit
              RemoteState m
_ -> STM m ()
commit

          (Maybe
   (ConnectionState muxMode initiatorCtx peerAddr versionData m a b),
 StrictMaybe ResponderCounters)
_otherwise -> StrictTVar m (StrictMaybe ResponderCounters)
-> StrictMaybe ResponderCounters -> STM m ()
forall (m :: * -> *) a.
MonadSTM m =>
StrictTVar m a -> a -> STM m ()
writeTVar StrictTVar m (StrictMaybe ResponderCounters)
countersVar StrictMaybe ResponderCounters
forall a. StrictMaybe a
SNothing

      (Trace
_, Bool
True) | Just MiniProtocolNum
miniProtocolNum <- Trace -> Maybe MiniProtocolNum
miniProtocolTerminated Trace
trace -> STM m () -> m ()
forall a. HasCallStack => STM m a -> m a
forall (m :: * -> *) a.
(MonadSTM m, HasCallStack) =>
STM m a -> m a
atomically do
        connections <- State muxMode initiatorCtx peerAddr versionData m a b
-> Map
     (ConnectionId peerAddr)
     (ConnectionState muxMode initiatorCtx peerAddr versionData m a b)
forall (muxMode :: Mode) initiatorCtx peerAddr versionData
       (m :: * -> *) a b.
State muxMode initiatorCtx peerAddr versionData m a b
-> Map
     (ConnectionId peerAddr)
     (ConnectionState muxMode initiatorCtx peerAddr versionData m a b)
connections (State muxMode initiatorCtx peerAddr versionData m a b
 -> Map
      (ConnectionId peerAddr)
      (ConnectionState muxMode initiatorCtx peerAddr versionData m a b))
-> STM m (State muxMode initiatorCtx peerAddr versionData m a b)
-> STM
     m
     (Map
        (ConnectionId peerAddr)
        (ConnectionState muxMode initiatorCtx peerAddr versionData m a b))
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> StrictTVar
  m (State muxMode initiatorCtx peerAddr versionData m a b)
-> STM m (State muxMode initiatorCtx peerAddr versionData m a b)
forall (m :: * -> *) a. MonadSTM m => StrictTVar m a -> STM m a
readTVar StrictTVar
  m (State muxMode initiatorCtx peerAddr versionData m a b)
stateVar
        mCounters   <- readTVar countersVar
        case (Map.lookup peer connections, mCounters) of
          (Just (ConnectionState { Mux muxMode m
csMux :: forall (muxMode :: Mode) initiatorCtx peerAddr versionData
       (m :: * -> *) a b.
ConnectionState muxMode initiatorCtx peerAddr versionData m a b
-> Mux muxMode m
csMux :: Mux muxMode m
csMux,
                                   versionData
csVersionData :: forall (muxMode :: Mode) initiatorCtx peerAddr versionData
       (m :: * -> *) a b.
ConnectionState muxMode initiatorCtx peerAddr versionData m a b
-> versionData
csVersionData :: versionData
csVersionData,
                                   Map
  MiniProtocolNum
  (MiniProtocolData muxMode initiatorCtx peerAddr m a b)
csMiniProtocolMap :: forall (muxMode :: Mode) initiatorCtx peerAddr versionData
       (m :: * -> *) a b.
ConnectionState muxMode initiatorCtx peerAddr versionData m a b
-> Map
     MiniProtocolNum
     (MiniProtocolData muxMode initiatorCtx peerAddr m a b)
csMiniProtocolMap :: Map
  MiniProtocolNum
  (MiniProtocolData muxMode initiatorCtx peerAddr m a b)
csMiniProtocolMap,
                                   Map MiniProtocolNum (STM m (Either SomeException b))
csCompletionMap :: forall (muxMode :: Mode) initiatorCtx peerAddr versionData
       (m :: * -> *) a b.
ConnectionState muxMode initiatorCtx peerAddr versionData m a b
-> Map MiniProtocolNum (STM m (Either SomeException b))
csCompletionMap :: Map MiniProtocolNum (STM m (Either SomeException b))
csCompletionMap }),
           SJust rc :: ResponderCounters
rc@ResponderCounters { Int
numTraceHotResponders :: ResponderCounters -> Int
numTraceHotResponders :: Int
numTraceHotResponders,
                                        Int
numTraceNonHotResponders :: ResponderCounters -> Int
numTraceNonHotResponders :: Int
numTraceNonHotResponders }) -> do
            InboundGovernorInfoChannel
  muxMode initiatorCtx peerAddr versionData ByteString m a b
-> Event
     muxMode
     (Handle
        muxMode
        initiatorCtx
        (ResponderContext peerAddr)
        versionData
        ByteString
        m
        a
        b)
     initiatorCtx
     peerAddr
     versionData
     m
     a
     b
-> STM m ()
forall a (m :: * -> *). InformationChannel a m -> a -> STM m ()
InfoChannel.writeMessage InboundGovernorInfoChannel
  muxMode initiatorCtx peerAddr versionData ByteString m a b
infoChannel (Event
   muxMode
   (Handle
      muxMode
      initiatorCtx
      (ResponderContext peerAddr)
      versionData
      ByteString
      m
      a
      b)
   initiatorCtx
   peerAddr
   versionData
   m
   a
   b
 -> STM m ())
-> Event
     muxMode
     (Handle
        muxMode
        initiatorCtx
        (ResponderContext peerAddr)
        versionData
        ByteString
        m
        a
        b)
     initiatorCtx
     peerAddr
     versionData
     m
     a
     b
-> STM m ()
forall a b. (a -> b) -> a -> b
$
              Terminated muxMode initiatorCtx peerAddr m a b
-> Event
     muxMode
     (Handle
        muxMode
        initiatorCtx
        (ResponderContext peerAddr)
        versionData
        ByteString
        m
        a
        b)
     initiatorCtx
     peerAddr
     versionData
     m
     a
     b
forall (muxMode :: Mode) handle initiatorCtx peerAddr versionData
       (m :: * -> *) a b.
Terminated muxMode initiatorCtx peerAddr m a b
-> Event muxMode handle initiatorCtx peerAddr versionData m a b
MiniProtocolTerminated (Terminated muxMode initiatorCtx peerAddr m a b
 -> Event
      muxMode
      (Handle
         muxMode
         initiatorCtx
         (ResponderContext peerAddr)
         versionData
         ByteString
         m
         a
         b)
      initiatorCtx
      peerAddr
      versionData
      m
      a
      b)
-> Terminated muxMode initiatorCtx peerAddr m a b
-> Event
     muxMode
     (Handle
        muxMode
        initiatorCtx
        (ResponderContext peerAddr)
        versionData
        ByteString
        m
        a
        b)
     initiatorCtx
     peerAddr
     versionData
     m
     a
     b
forall a b. (a -> b) -> a -> b
$ Terminated {
                tConnId :: ConnectionId peerAddr
tConnId = ConnectionId peerAddr
peer,
                tMux :: Mux muxMode m
tMux = Mux muxMode m
csMux,
                tMiniProtocolData :: MiniProtocolData muxMode initiatorCtx peerAddr m a b
tMiniProtocolData = Map
  MiniProtocolNum
  (MiniProtocolData muxMode initiatorCtx peerAddr m a b)
csMiniProtocolMap Map
  MiniProtocolNum
  (MiniProtocolData muxMode initiatorCtx peerAddr m a b)
-> MiniProtocolNum
-> MiniProtocolData muxMode initiatorCtx peerAddr m a b
forall k a. Ord k => Map k a -> k -> a
Map.! MiniProtocolNum
miniProtocolNum,
                tDataFlow :: DataFlow
tDataFlow = versionData -> DataFlow
connectionDataFlow versionData
csVersionData,
                tResult :: STM m (Either SomeException b)
tResult = Map MiniProtocolNum (STM m (Either SomeException b))
csCompletionMap Map MiniProtocolNum (STM m (Either SomeException b))
-> MiniProtocolNum -> STM m (Either SomeException b)
forall k a. Ord k => Map k a -> k -> a
Map.! MiniProtocolNum
miniProtocolNum }
            case Trace
trace of
              Mux.TraceCleanExit {} -> do
                let miniProtocolTemp :: ProtocolTemperature
miniProtocolTemp = MiniProtocolNum
-> Map
     MiniProtocolNum
     (MiniProtocolData muxMode initiatorCtx peerAddr m a b)
-> ProtocolTemperature
forall {k} {muxMode :: Mode} {initiatorCtx} {peerAddr}
       {m :: * -> *} {a} {b}.
Ord k =>
k
-> Map k (MiniProtocolData muxMode initiatorCtx peerAddr m a b)
-> ProtocolTemperature
getProtocolTemp MiniProtocolNum
miniProtocolNum Map
  MiniProtocolNum
  (MiniProtocolData muxMode initiatorCtx peerAddr m a b)
csMiniProtocolMap
                case (ProtocolTemperature
miniProtocolTemp, Int
numTraceHotResponders) of
                  (ProtocolTemperature
Hot, Int
1) -> InboundGovernorInfoChannel
  muxMode initiatorCtx peerAddr versionData ByteString m a b
-> Event
     muxMode
     (Handle
        muxMode
        initiatorCtx
        (ResponderContext peerAddr)
        versionData
        ByteString
        m
        a
        b)
     initiatorCtx
     peerAddr
     versionData
     m
     a
     b
-> STM m ()
forall a (m :: * -> *). InformationChannel a m -> a -> STM m ()
InfoChannel.writeMessage InboundGovernorInfoChannel
  muxMode initiatorCtx peerAddr versionData ByteString m a b
infoChannel
                                                       (ConnectionId peerAddr
-> Event
     muxMode
     (Handle
        muxMode
        initiatorCtx
        (ResponderContext peerAddr)
        versionData
        ByteString
        m
        a
        b)
     initiatorCtx
     peerAddr
     versionData
     m
     a
     b
forall (muxMode :: Mode) handle initiatorCtx peerAddr versionData
       (m :: * -> *) a b.
ConnectionId peerAddr
-> Event muxMode handle initiatorCtx peerAddr versionData m a b
RemoteDemotedToWarm ConnectionId peerAddr
peer)
                  (ProtocolTemperature, Int)
_        -> () -> STM m ()
forall a. a -> STM m a
forall (f :: * -> *) a. Applicative f => a -> f a
pure ()
                Bool -> STM m () -> STM m ()
forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
when (Int
numTraceHotResponders Int -> Int -> Int
forall a. Num a => a -> a -> a
+ Int
numTraceNonHotResponders Int -> Int -> Bool
forall a. Eq a => a -> a -> Bool
== Int
1) (STM m () -> STM m ()) -> STM m () -> STM m ()
forall a b. (a -> b) -> a -> b
$
                  InboundGovernorInfoChannel
  muxMode initiatorCtx peerAddr versionData ByteString m a b
-> Event
     muxMode
     (Handle
        muxMode
        initiatorCtx
        (ResponderContext peerAddr)
        versionData
        ByteString
        m
        a
        b)
     initiatorCtx
     peerAddr
     versionData
     m
     a
     b
-> STM m ()
forall a (m :: * -> *). InformationChannel a m -> a -> STM m ()
InfoChannel.writeMessage InboundGovernorInfoChannel
  muxMode initiatorCtx peerAddr versionData ByteString m a b
infoChannel
                                           (ConnectionId peerAddr
-> Event
     muxMode
     (Handle
        muxMode
        initiatorCtx
        (ResponderContext peerAddr)
        versionData
        ByteString
        m
        a
        b)
     initiatorCtx
     peerAddr
     versionData
     m
     a
     b
forall (muxMode :: Mode) handle initiatorCtx peerAddr versionData
       (m :: * -> *) a b.
ConnectionId peerAddr
-> Event muxMode handle initiatorCtx peerAddr versionData m a b
WaitIdleRemote ConnectionId peerAddr
peer)
                case ProtocolTemperature
miniProtocolTemp of
                    ProtocolTemperature
Hot    -> StrictTVar m (StrictMaybe ResponderCounters)
-> StrictMaybe ResponderCounters -> STM m ()
forall (m :: * -> *) a.
MonadSTM m =>
StrictTVar m a -> a -> STM m ()
writeTVar StrictTVar m (StrictMaybe ResponderCounters)
countersVar (StrictMaybe ResponderCounters -> STM m ())
-> StrictMaybe ResponderCounters -> STM m ()
forall a b. (a -> b) -> a -> b
$
                                ResponderCounters -> StrictMaybe ResponderCounters
forall a. a -> StrictMaybe a
SJust ResponderCounters
rc { numTraceHotResponders =
                                             pred numTraceHotResponders }
                    ProtocolTemperature
_orNot -> StrictTVar m (StrictMaybe ResponderCounters)
-> StrictMaybe ResponderCounters -> STM m ()
forall (m :: * -> *) a.
MonadSTM m =>
StrictTVar m a -> a -> STM m ()
writeTVar StrictTVar m (StrictMaybe ResponderCounters)
countersVar (StrictMaybe ResponderCounters -> STM m ())
-> StrictMaybe ResponderCounters -> STM m ()
forall a b. (a -> b) -> a -> b
$
                                ResponderCounters -> StrictMaybe ResponderCounters
forall a. a -> StrictMaybe a
SJust ResponderCounters
rc { numTraceNonHotResponders =
                                             pred numTraceNonHotResponders }

              Trace
_otherwise -> StrictTVar m (StrictMaybe ResponderCounters)
-> StrictMaybe ResponderCounters -> STM m ()
forall (m :: * -> *) a.
MonadSTM m =>
StrictTVar m a -> a -> STM m ()
writeTVar StrictTVar m (StrictMaybe ResponderCounters)
countersVar StrictMaybe ResponderCounters
forall a. StrictMaybe a
SNothing

          (Maybe
   (ConnectionState muxMode initiatorCtx peerAddr versionData m a b),
 StrictMaybe ResponderCounters)
_otherwise -> StrictTVar m (StrictMaybe ResponderCounters)
-> StrictMaybe ResponderCounters -> STM m ()
forall (m :: * -> *) a.
MonadSTM m =>
StrictTVar m a -> a -> STM m ()
writeTVar StrictTVar m (StrictMaybe ResponderCounters)
countersVar StrictMaybe ResponderCounters
forall a. StrictMaybe a
SNothing


      (Trace
_, Bool
True) | Bool
True <- Trace -> Bool
muxStopped Trace
trace -> STM m () -> m ()
forall a. HasCallStack => STM m a -> m a
forall (m :: * -> *) a.
(MonadSTM m, HasCallStack) =>
STM m a -> m a
atomically do
        State { connections } <- StrictTVar
  m (State muxMode initiatorCtx peerAddr versionData m a b)
-> STM m (State muxMode initiatorCtx peerAddr versionData m a b)
forall (m :: * -> *) a. MonadSTM m => StrictTVar m a -> STM m a
readTVar StrictTVar
  m (State muxMode initiatorCtx peerAddr versionData m a b)
stateVar
        case Map.lookup peer connections of
          Just ConnectionState {Mux muxMode m
csMux :: forall (muxMode :: Mode) initiatorCtx peerAddr versionData
       (m :: * -> *) a b.
ConnectionState muxMode initiatorCtx peerAddr versionData m a b
-> Mux muxMode m
csMux :: Mux muxMode m
csMux} ->
            InboundGovernorInfoChannel
  muxMode initiatorCtx peerAddr versionData ByteString m a b
-> Event
     muxMode
     (Handle
        muxMode
        initiatorCtx
        (ResponderContext peerAddr)
        versionData
        ByteString
        m
        a
        b)
     initiatorCtx
     peerAddr
     versionData
     m
     a
     b
-> STM m ()
forall a (m :: * -> *). InformationChannel a m -> a -> STM m ()
InfoChannel.writeMessage InboundGovernorInfoChannel
  muxMode initiatorCtx peerAddr versionData ByteString m a b
infoChannel (Event
   muxMode
   (Handle
      muxMode
      initiatorCtx
      (ResponderContext peerAddr)
      versionData
      ByteString
      m
      a
      b)
   initiatorCtx
   peerAddr
   versionData
   m
   a
   b
 -> STM m ())
-> Event
     muxMode
     (Handle
        muxMode
        initiatorCtx
        (ResponderContext peerAddr)
        versionData
        ByteString
        m
        a
        b)
     initiatorCtx
     peerAddr
     versionData
     m
     a
     b
-> STM m ()
forall a b. (a -> b) -> a -> b
$
              ConnectionId peerAddr
-> STM m (Maybe SomeException)
-> Event
     muxMode
     (Handle
        muxMode
        initiatorCtx
        (ResponderContext peerAddr)
        versionData
        ByteString
        m
        a
        b)
     initiatorCtx
     peerAddr
     versionData
     m
     a
     b
forall (muxMode :: Mode) handle initiatorCtx peerAddr versionData
       (m :: * -> *) a b.
ConnectionId peerAddr
-> STM m (Maybe SomeException)
-> Event muxMode handle initiatorCtx peerAddr versionData m a b
MuxFinished ConnectionId peerAddr
peer (Mux muxMode m -> STM m (Maybe SomeException)
forall (m :: * -> *) (mode :: Mode).
MonadSTM m =>
Mux mode m -> STM m (Maybe SomeException)
Mux.stopped Mux muxMode m
csMux)
          Maybe
  (ConnectionState muxMode initiatorCtx peerAddr versionData m a b)
_otherwise -> () -> STM m ()
forall a. a -> STM m a
forall (f :: * -> *) a. Applicative f => a -> f a
pure ()
        writeTVar countersVar SNothing

      (Trace, Bool)
_otherwise -> () -> m ()
forall a. a -> m a
forall (m :: * -> *) a. Monad m => a -> m a
return ()
    where
      muxStopped :: Trace -> Bool
muxStopped = \case
        Trace
Mux.TraceStopped -> Bool
True
        Mux.TraceState State
Mux.Dead -> Bool
True
        Trace
_otherwise -> Bool
False

      getProtocolTemp :: k
-> Map k (MiniProtocolData muxMode initiatorCtx peerAddr m a b)
-> ProtocolTemperature
getProtocolTemp k
miniProtocolNum Map k (MiniProtocolData muxMode initiatorCtx peerAddr m a b)
csMiniProtocolMap  =
        let miniData :: MiniProtocolData muxMode initiatorCtx peerAddr m a b
miniData = Map k (MiniProtocolData muxMode initiatorCtx peerAddr m a b)
csMiniProtocolMap Map k (MiniProtocolData muxMode initiatorCtx peerAddr m a b)
-> k -> MiniProtocolData muxMode initiatorCtx peerAddr m a b
forall k a. Ord k => Map k a -> k -> a
Map.! k
miniProtocolNum
         in MiniProtocolData muxMode initiatorCtx peerAddr m a b
-> ProtocolTemperature
forall (muxMode :: Mode) initiatorCtx peerAddr (m :: * -> *) a b.
MiniProtocolData muxMode initiatorCtx peerAddr m a b
-> ProtocolTemperature
mpdMiniProtocolTemp MiniProtocolData muxMode initiatorCtx peerAddr m a b
miniData

      miniProtocolTerminated :: Trace -> Maybe MiniProtocolNum
miniProtocolTerminated = \case
        Mux.TraceCleanExit MiniProtocolNum
miniProtocolNum MiniProtocolDir
Mux.ResponderDir -> MiniProtocolNum -> Maybe MiniProtocolNum
forall a. a -> Maybe a
Just MiniProtocolNum
miniProtocolNum
        Mux.TraceExceptionExit MiniProtocolNum
miniProtocolNum MiniProtocolDir
Mux.ResponderDir SomeException
_e -> MiniProtocolNum -> Maybe MiniProtocolNum
forall a. a -> Maybe a
Just MiniProtocolNum
miniProtocolNum
        Trace
_otherwise -> Maybe MiniProtocolNum
forall a. Maybe a
Nothing

      miniProtocolStarted :: Trace -> Maybe MiniProtocolNum
miniProtocolStarted = \case
        -- is any responder started eagerly???
        Mux.TraceStartEagerly MiniProtocolNum
miniProtocolNum MiniProtocolDir
Mux.ResponderDir -> MiniProtocolNum -> Maybe MiniProtocolNum
forall a. a -> Maybe a
Just MiniProtocolNum
miniProtocolNum
        Mux.TraceStartedOnDemand MiniProtocolNum
miniProtocolNum MiniProtocolDir
Mux.ResponderDir -> MiniProtocolNum -> Maybe MiniProtocolNum
forall a. a -> Maybe a
Just MiniProtocolNum
miniProtocolNum
        Trace
_otherwise -> Maybe MiniProtocolNum
forall a. Maybe a
Nothing


-- | Run a responder mini-protocol.
--
-- @'HasResponder' mode ~ True@ is used to rule out
-- 'InitiatorProtocolOnly' case.
--
runResponder :: forall (mode :: Mux.Mode) initiatorCtx peerAddr m a b.
                 ( Alternative (STM m)
                 , HasResponder mode ~ True
                 , MonadAsync       m
                 , MonadLabelledSTM m
                 , MonadCatch       m
                 , MonadMask        m
                 , MonadThrow  (STM m)
                 )
              => Mux.Mux mode m
              -> MiniProtocolData mode initiatorCtx peerAddr m a b
              -> m (Either SomeException (STM m (Either SomeException b)))
runResponder :: forall (mode :: Mode) initiatorCtx peerAddr (m :: * -> *) a b.
(Alternative (STM m), HasResponder mode ~ 'True, MonadAsync m,
 MonadLabelledSTM m, MonadCatch m, MonadMask m,
 MonadThrow (STM m)) =>
Mux mode m
-> MiniProtocolData mode initiatorCtx peerAddr m a b
-> m (Either SomeException (STM m (Either SomeException b)))
runResponder Mux mode m
mux
             MiniProtocolData {
               mpdMiniProtocol :: forall (muxMode :: Mode) initiatorCtx peerAddr (m :: * -> *) a b.
MiniProtocolData muxMode initiatorCtx peerAddr m a b
-> MiniProtocol
     muxMode initiatorCtx (ResponderContext peerAddr) ByteString m a b
mpdMiniProtocol     = MiniProtocol
  mode initiatorCtx (ResponderContext peerAddr) ByteString m a b
miniProtocol,
               mpdResponderContext :: forall (muxMode :: Mode) initiatorCtx peerAddr (m :: * -> *) a b.
MiniProtocolData muxMode initiatorCtx peerAddr m a b
-> ResponderContext peerAddr
mpdResponderContext = ResponderContext peerAddr
responderContext
             } =
    -- do not catch asynchronous exceptions, which are non recoverable
    (SomeException -> Maybe SomeException)
-> m (STM m (Either SomeException b))
-> m (Either SomeException (STM m (Either SomeException b)))
forall e b a.
Exception e =>
(e -> Maybe b) -> m a -> m (Either b a)
forall (m :: * -> *) e b a.
(MonadCatch m, Exception e) =>
(e -> Maybe b) -> m a -> m (Either b a)
tryJust (\SomeException
e -> case SomeException -> Maybe SomeAsyncException
forall e. Exception e => SomeException -> Maybe e
fromException SomeException
e of
              Just (SomeAsyncException e
_) -> Maybe SomeException
forall a. Maybe a
Nothing
              Maybe SomeAsyncException
Nothing                     -> SomeException -> Maybe SomeException
forall a. a -> Maybe a
Just SomeException
e) (m (STM m (Either SomeException b))
 -> m (Either SomeException (STM m (Either SomeException b))))
-> m (STM m (Either SomeException b))
-> m (Either SomeException (STM m (Either SomeException b)))
forall a b. (a -> b) -> a -> b
$
      case MiniProtocol
  mode initiatorCtx (ResponderContext peerAddr) ByteString m a b
-> RunMiniProtocol
     mode initiatorCtx (ResponderContext peerAddr) ByteString m a b
forall (mode :: Mode) initiatorCtx responderCtx bytes (m :: * -> *)
       a b.
MiniProtocol mode initiatorCtx responderCtx bytes m a b
-> RunMiniProtocol mode initiatorCtx responderCtx bytes m a b
miniProtocolRun MiniProtocol
  mode initiatorCtx (ResponderContext peerAddr) ByteString m a b
miniProtocol of
        ResponderProtocolOnly MiniProtocolCb (ResponderContext peerAddr) ByteString m b
responder ->
          Mux mode m
-> MiniProtocolNum
-> MiniProtocolDirection mode
-> StartOnDemandOrEagerly
-> (ByteChannel m -> m (b, Maybe ByteString))
-> m (STM m (Either SomeException b))
forall (mode :: Mode) (m :: * -> *) a.
(Alternative (STM m), MonadSTM m, MonadThrow m,
 MonadThrow (STM m)) =>
Mux mode m
-> MiniProtocolNum
-> MiniProtocolDirection mode
-> StartOnDemandOrEagerly
-> (ByteChannel m -> m (a, Maybe ByteString))
-> m (STM m (Either SomeException a))
Mux.runMiniProtocol
            Mux mode m
mux (MiniProtocol
  mode initiatorCtx (ResponderContext peerAddr) ByteString m a b
-> MiniProtocolNum
forall (mode :: Mode) initiatorCtx responderCtx bytes (m :: * -> *)
       a b.
MiniProtocol mode initiatorCtx responderCtx bytes m a b
-> MiniProtocolNum
miniProtocolNum MiniProtocol
  mode initiatorCtx (ResponderContext peerAddr) ByteString m a b
miniProtocol)
            MiniProtocolDirection mode
MiniProtocolDirection 'ResponderMode
Mux.ResponderDirectionOnly
            (MiniProtocol
  mode initiatorCtx (ResponderContext peerAddr) ByteString m a b
-> StartOnDemandOrEagerly
forall (mode :: Mode) initiatorCtx responderCtx bytes (m :: * -> *)
       a b.
MiniProtocol mode initiatorCtx responderCtx bytes m a b
-> StartOnDemandOrEagerly
miniProtocolStart MiniProtocol
  mode initiatorCtx (ResponderContext peerAddr) ByteString m a b
miniProtocol)
            (MiniProtocolCb (ResponderContext peerAddr) ByteString m b
-> ResponderContext peerAddr
-> ByteChannel m
-> m (b, Maybe ByteString)
forall ctx bytes (m :: * -> *) a.
MiniProtocolCb ctx bytes m a
-> ctx -> Channel m bytes -> m (a, Maybe bytes)
runMiniProtocolCb MiniProtocolCb (ResponderContext peerAddr) ByteString m b
responder ResponderContext peerAddr
responderContext)

        InitiatorAndResponderProtocol MiniProtocolCb initiatorCtx ByteString m a
_ MiniProtocolCb (ResponderContext peerAddr) ByteString m b
responder ->
          Mux mode m
-> MiniProtocolNum
-> MiniProtocolDirection mode
-> StartOnDemandOrEagerly
-> (ByteChannel m -> m (b, Maybe ByteString))
-> m (STM m (Either SomeException b))
forall (mode :: Mode) (m :: * -> *) a.
(Alternative (STM m), MonadSTM m, MonadThrow m,
 MonadThrow (STM m)) =>
Mux mode m
-> MiniProtocolNum
-> MiniProtocolDirection mode
-> StartOnDemandOrEagerly
-> (ByteChannel m -> m (a, Maybe ByteString))
-> m (STM m (Either SomeException a))
Mux.runMiniProtocol
            Mux mode m
mux (MiniProtocol
  mode initiatorCtx (ResponderContext peerAddr) ByteString m a b
-> MiniProtocolNum
forall (mode :: Mode) initiatorCtx responderCtx bytes (m :: * -> *)
       a b.
MiniProtocol mode initiatorCtx responderCtx bytes m a b
-> MiniProtocolNum
miniProtocolNum MiniProtocol
  mode initiatorCtx (ResponderContext peerAddr) ByteString m a b
miniProtocol)
            MiniProtocolDirection mode
MiniProtocolDirection 'InitiatorResponderMode
Mux.ResponderDirection
            (MiniProtocol
  mode initiatorCtx (ResponderContext peerAddr) ByteString m a b
-> StartOnDemandOrEagerly
forall (mode :: Mode) initiatorCtx responderCtx bytes (m :: * -> *)
       a b.
MiniProtocol mode initiatorCtx responderCtx bytes m a b
-> StartOnDemandOrEagerly
miniProtocolStart MiniProtocol
  mode initiatorCtx (ResponderContext peerAddr) ByteString m a b
miniProtocol)
            (MiniProtocolCb (ResponderContext peerAddr) ByteString m b
-> ResponderContext peerAddr
-> ByteChannel m
-> m (b, Maybe ByteString)
forall ctx bytes (m :: * -> *) a.
MiniProtocolCb ctx bytes m a
-> ctx -> Channel m bytes -> m (a, Maybe bytes)
runMiniProtocolCb MiniProtocolCb (ResponderContext peerAddr) ByteString m b
responder ResponderContext peerAddr
responderContext)


maturedPeers :: Ord peerAddr
             => Time
             -> OrdPSQ peerAddr Time versionData
             -> (Map peerAddr versionData, OrdPSQ peerAddr Time versionData)
maturedPeers :: forall peerAddr versionData.
Ord peerAddr =>
Time
-> OrdPSQ peerAddr Time versionData
-> (Map peerAddr versionData, OrdPSQ peerAddr Time versionData)
maturedPeers Time
time OrdPSQ peerAddr Time versionData
freshPeers =
      ([(peerAddr, Time, versionData)] -> Map peerAddr versionData)
-> ([(peerAddr, Time, versionData)],
    OrdPSQ peerAddr Time versionData)
-> (Map peerAddr versionData, OrdPSQ peerAddr Time versionData)
forall a b c. (a -> b) -> (a, c) -> (b, c)
forall (p :: * -> * -> *) a b c.
Bifunctor p =>
(a -> b) -> p a c -> p b c
first ([(peerAddr, versionData)] -> Map peerAddr versionData
forall k a. Ord k => [(k, a)] -> Map k a
Map.fromList ([(peerAddr, versionData)] -> Map peerAddr versionData)
-> ([(peerAddr, Time, versionData)] -> [(peerAddr, versionData)])
-> [(peerAddr, Time, versionData)]
-> Map peerAddr versionData
forall b c a. (b -> c) -> (a -> b) -> a -> c
. ((peerAddr, Time, versionData) -> (peerAddr, versionData))
-> [(peerAddr, Time, versionData)] -> [(peerAddr, versionData)]
forall a b. (a -> b) -> [a] -> [b]
map (\(peerAddr
addr, Time
_p, versionData
v) -> (peerAddr
addr, versionData
v)))
    (([(peerAddr, Time, versionData)],
  OrdPSQ peerAddr Time versionData)
 -> (Map peerAddr versionData, OrdPSQ peerAddr Time versionData))
-> ([(peerAddr, Time, versionData)],
    OrdPSQ peerAddr Time versionData)
-> (Map peerAddr versionData, OrdPSQ peerAddr Time versionData)
forall a b. (a -> b) -> a -> b
$ Time
-> OrdPSQ peerAddr Time versionData
-> ([(peerAddr, Time, versionData)],
    OrdPSQ peerAddr Time versionData)
forall k p v.
(Ord k, Ord p) =>
p -> OrdPSQ k p v -> ([(k, p, v)], OrdPSQ k p v)
OrdPSQ.atMostView ((-DiffTime
inboundMaturePeerDelay) DiffTime -> Time -> Time
`addTime` Time
time)
                           OrdPSQ peerAddr Time versionData
freshPeers

--
-- Trace
--


-- | 'Nothing' represents uninitialised state.
--
type RemoteTransition = Transition' (Maybe RemoteSt)

type RemoteTransitionTrace peerAddr = TransitionTrace' peerAddr (Maybe RemoteSt)

mkRemoteTransitionTrace :: Ord peerAddr
                        => ConnectionId peerAddr
                        -> State muxMode initiatorCtx peerAddr versionData m a b
                        -> State muxMode initiatorCtx peerAddr versionData m a b
                        -> RemoteTransitionTrace peerAddr
mkRemoteTransitionTrace :: forall peerAddr (muxMode :: Mode) initiatorCtx versionData
       (m :: * -> *) a b.
Ord peerAddr =>
ConnectionId peerAddr
-> State muxMode initiatorCtx peerAddr versionData m a b
-> State muxMode initiatorCtx peerAddr versionData m a b
-> RemoteTransitionTrace peerAddr
mkRemoteTransitionTrace ConnectionId peerAddr
connId State muxMode initiatorCtx peerAddr versionData m a b
fromState State muxMode initiatorCtx peerAddr versionData m a b
toState =
    peerAddr
-> Transition' (Maybe RemoteSt)
-> TransitionTrace' peerAddr (Maybe RemoteSt)
forall id state.
id -> Transition' state -> TransitionTrace' id state
TransitionTrace
      (ConnectionId peerAddr -> peerAddr
forall addr. ConnectionId addr -> addr
remoteAddress ConnectionId peerAddr
connId)
      Transition { fromState :: Maybe RemoteSt
fromState = RemoteState m -> RemoteSt
forall (m :: * -> *). RemoteState m -> RemoteSt
mkRemoteSt
                             (RemoteState m -> RemoteSt)
-> (ConnectionState muxMode initiatorCtx peerAddr versionData m a b
    -> RemoteState m)
-> ConnectionState muxMode initiatorCtx peerAddr versionData m a b
-> RemoteSt
forall b c a. (b -> c) -> (a -> b) -> a -> c
. ConnectionState muxMode initiatorCtx peerAddr versionData m a b
-> RemoteState m
forall (muxMode :: Mode) initiatorCtx peerAddr versionData
       (m :: * -> *) a b.
ConnectionState muxMode initiatorCtx peerAddr versionData m a b
-> RemoteState m
csRemoteState
                           (ConnectionState muxMode initiatorCtx peerAddr versionData m a b
 -> RemoteSt)
-> Maybe
     (ConnectionState muxMode initiatorCtx peerAddr versionData m a b)
-> Maybe RemoteSt
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> ConnectionId peerAddr
-> Map
     (ConnectionId peerAddr)
     (ConnectionState muxMode initiatorCtx peerAddr versionData m a b)
-> Maybe
     (ConnectionState muxMode initiatorCtx peerAddr versionData m a b)
forall k a. Ord k => k -> Map k a -> Maybe a
Map.lookup ConnectionId peerAddr
connId (State muxMode initiatorCtx peerAddr versionData m a b
-> Map
     (ConnectionId peerAddr)
     (ConnectionState muxMode initiatorCtx peerAddr versionData m a b)
forall (muxMode :: Mode) initiatorCtx peerAddr versionData
       (m :: * -> *) a b.
State muxMode initiatorCtx peerAddr versionData m a b
-> Map
     (ConnectionId peerAddr)
     (ConnectionState muxMode initiatorCtx peerAddr versionData m a b)
connections State muxMode initiatorCtx peerAddr versionData m a b
fromState)
                 , toState :: Maybe RemoteSt
toState   = RemoteState m -> RemoteSt
forall (m :: * -> *). RemoteState m -> RemoteSt
mkRemoteSt
                             (RemoteState m -> RemoteSt)
-> (ConnectionState muxMode initiatorCtx peerAddr versionData m a b
    -> RemoteState m)
-> ConnectionState muxMode initiatorCtx peerAddr versionData m a b
-> RemoteSt
forall b c a. (b -> c) -> (a -> b) -> a -> c
. ConnectionState muxMode initiatorCtx peerAddr versionData m a b
-> RemoteState m
forall (muxMode :: Mode) initiatorCtx peerAddr versionData
       (m :: * -> *) a b.
ConnectionState muxMode initiatorCtx peerAddr versionData m a b
-> RemoteState m
csRemoteState
                           (ConnectionState muxMode initiatorCtx peerAddr versionData m a b
 -> RemoteSt)
-> Maybe
     (ConnectionState muxMode initiatorCtx peerAddr versionData m a b)
-> Maybe RemoteSt
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> ConnectionId peerAddr
-> Map
     (ConnectionId peerAddr)
     (ConnectionState muxMode initiatorCtx peerAddr versionData m a b)
-> Maybe
     (ConnectionState muxMode initiatorCtx peerAddr versionData m a b)
forall k a. Ord k => k -> Map k a -> Maybe a
Map.lookup ConnectionId peerAddr
connId (State muxMode initiatorCtx peerAddr versionData m a b
-> Map
     (ConnectionId peerAddr)
     (ConnectionState muxMode initiatorCtx peerAddr versionData m a b)
forall (muxMode :: Mode) initiatorCtx peerAddr versionData
       (m :: * -> *) a b.
State muxMode initiatorCtx peerAddr versionData m a b
-> Map
     (ConnectionId peerAddr)
     (ConnectionState muxMode initiatorCtx peerAddr versionData m a b)
connections State muxMode initiatorCtx peerAddr versionData m a b
toState)
                 }


-- | A channel which instantiates to 'NewConnectionInfo' and
-- 'Handle'.
--
-- * /Producer:/ connection manger for duplex outbound connections.
-- * /Consumer:/ inbound governor.
--
type InboundGovernorInfoChannel (muxMode :: Mux.Mode) initiatorCtx peerAddr versionData bytes m a b =
    InformationChannel (Event (muxMode :: Mux.Mode) (Handle muxMode initiatorCtx (ResponderContext peerAddr) versionData bytes m a b) initiatorCtx peerAddr versionData m a b) m


-- | Announcement message for a new connection.
--
data NewConnectionInfo peerAddr handle

    -- | Announce a new connection.  /Inbound protocol governor/ will start
    -- responder protocols using 'StartOnDemand' strategy and monitor remote
    -- transitions: @PromotedToWarm^{Duplex}_{Remote}@ and
    -- @DemotedToCold^{dataFlow}_{Remote}@.
    = NewConnectionInfo
      !Provenance
      !(ConnectionId peerAddr)
      !DataFlow
      !handle

instance Show peerAddr
      => Show (NewConnectionInfo peerAddr handle) where
      show :: NewConnectionInfo peerAddr handle -> String
show (NewConnectionInfo Provenance
provenance ConnectionId peerAddr
connId DataFlow
dataFlow handle
_) =
        [String] -> String
forall (t :: * -> *) a. Foldable t => t [a] -> [a]
concat [ String
"NewConnectionInfo "
               , Provenance -> String
forall a. Show a => a -> String
show Provenance
provenance
               , String
" "
               , ConnectionId peerAddr -> String
forall a. Show a => a -> String
show ConnectionId peerAddr
connId
               , String
" "
               , DataFlow -> String
forall a. Show a => a -> String
show DataFlow
dataFlow
               ]


-- | Edge triggered events to which the /inbound protocol governor/ reacts.
--
data Event (muxMode :: Mux.Mode) handle initiatorCtx peerAddr versionData m a b
    -- | A request to start mini-protocol bundle, either from the server or from
    -- connection manager after a duplex connection was negotiated.
    --
    = NewConnection !(NewConnectionInfo peerAddr handle)

    -- | A multiplexer exited.
    --
    | MuxFinished            !(ConnectionId peerAddr) (STM m (Maybe SomeException))

    -- | A mini-protocol terminated either cleanly or abruptly.
    --
    | MiniProtocolTerminated !(Terminated muxMode initiatorCtx peerAddr m a b)

    -- | Transition from 'RemoteEstablished' to 'RemoteIdle'.
    --
    | WaitIdleRemote         !(ConnectionId peerAddr)

    -- | A remote @warm → hot@ transition.  It is scheduled as soon as all hot
    -- mini-protocols are running.
    --
    | RemotePromotedToHot    !(ConnectionId peerAddr)

    -- | A @hot → warm@ transition.  It is scheduled as soon as any hot
    -- mini-protocol terminates.
    --
    | RemoteDemotedToWarm    !(ConnectionId peerAddr)

    -- | Transition from 'RemoteIdle' to 'RemoteCold'.
    --
    | CommitRemote           !(ConnectionId peerAddr)

    -- | Transition from 'RemoteIdle' or 'RemoteCold' to 'RemoteEstablished'.
    --
    | AwakeRemote            !(ConnectionId peerAddr)

    -- | Update `igsMatureDuplexPeers` and `igsFreshDuplexPeers`.
    --
    | MaturedDuplexPeers   !(Map peerAddr versionData)         -- ^ newly matured duplex peers
                           !(OrdPSQ peerAddr Time versionData) -- ^ queue of fresh duplex peers

    | InactivityTimeout


-- STM transactions which detect 'Event's (signals)
--


-- | A signal which returns an 'Event'.  Signals are combined together and
-- passed used to fold the current state map.
--
type EventSignal (muxMode :: Mux.Mode) handle initiatorCtx peerAddr versionData m a b =
        ConnectionId peerAddr
     -> ConnectionState muxMode initiatorCtx peerAddr versionData m a b
     -> FirstToFinish (STM m) (Event muxMode handle initiatorCtx peerAddr versionData m a b)


-- | When a mini-protocol terminates we take 'Terminated' out of 'ConnectionState
-- and pass it to the main loop.  This is just enough to decide if we need to
-- restart a mini-protocol and to do the restart.
--
data Terminated muxMode initiatorCtx peerAddr m a b = Terminated {
    forall (muxMode :: Mode) initiatorCtx peerAddr (m :: * -> *) a b.
Terminated muxMode initiatorCtx peerAddr m a b
-> ConnectionId peerAddr
tConnId           :: !(ConnectionId peerAddr),
    forall (muxMode :: Mode) initiatorCtx peerAddr (m :: * -> *) a b.
Terminated muxMode initiatorCtx peerAddr m a b -> Mux muxMode m
tMux              :: !(Mux.Mux muxMode m),
    forall (muxMode :: Mode) initiatorCtx peerAddr (m :: * -> *) a b.
Terminated muxMode initiatorCtx peerAddr m a b
-> MiniProtocolData muxMode initiatorCtx peerAddr m a b
tMiniProtocolData :: !(MiniProtocolData muxMode initiatorCtx peerAddr m a b),
    forall (muxMode :: Mode) initiatorCtx peerAddr (m :: * -> *) a b.
Terminated muxMode initiatorCtx peerAddr m a b -> DataFlow
tDataFlow         :: !DataFlow,
    forall (muxMode :: Mode) initiatorCtx peerAddr (m :: * -> *) a b.
Terminated muxMode initiatorCtx peerAddr m a b
-> STM m (Either SomeException b)
tResult           :: STM m (Either SomeException b) -- !(Either SomeException b)
  }


-- | First peer for which the 'RemoteIdle' timeout expires.
--
firstPeerCommitRemote :: (Alternative (STM m), MonadSTM m)
                      => EventSignal muxMode handle initiatorCtx peerAddr versionData m a b
firstPeerCommitRemote :: forall (m :: * -> *) (muxMode :: Mode) handle initiatorCtx peerAddr
       versionData a b.
(Alternative (STM m), MonadSTM m) =>
EventSignal muxMode handle initiatorCtx peerAddr versionData m a b
firstPeerCommitRemote
    ConnectionId peerAddr
connId ConnectionState { RemoteState m
csRemoteState :: forall (muxMode :: Mode) initiatorCtx peerAddr versionData
       (m :: * -> *) a b.
ConnectionState muxMode initiatorCtx peerAddr versionData m a b
-> RemoteState m
csRemoteState :: RemoteState m
csRemoteState }
    = case RemoteState m
csRemoteState of
        -- the connection is already in 'RemoteCold' state
        RemoteState m
RemoteCold            -> FirstToFinish
  (STM m)
  (Event muxMode handle initiatorCtx peerAddr versionData m a b)
forall a. Monoid a => a
mempty
        RemoteState m
RemoteEstablished     -> FirstToFinish
  (STM m)
  (Event muxMode handle initiatorCtx peerAddr versionData m a b)
forall a. Monoid a => a
mempty
        RemoteIdle STM m Bool
timeoutSTM -> STM
  m (Event muxMode handle initiatorCtx peerAddr versionData m a b)
-> FirstToFinish
     (STM m)
     (Event muxMode handle initiatorCtx peerAddr versionData m a b)
forall (m :: * -> *) a. m a -> FirstToFinish m a
FirstToFinish do
          expired <- STM m Bool
timeoutSTM
          if expired then pure $ CommitRemote connId else retry


data IGAssertionLocation peerAddr
  = InboundGovernorLoop !(Maybe (ConnectionId peerAddr)) !AbstractState
  deriving Int -> IGAssertionLocation peerAddr -> ShowS
[IGAssertionLocation peerAddr] -> ShowS
IGAssertionLocation peerAddr -> String
(Int -> IGAssertionLocation peerAddr -> ShowS)
-> (IGAssertionLocation peerAddr -> String)
-> ([IGAssertionLocation peerAddr] -> ShowS)
-> Show (IGAssertionLocation peerAddr)
forall peerAddr.
Show peerAddr =>
Int -> IGAssertionLocation peerAddr -> ShowS
forall peerAddr.
Show peerAddr =>
[IGAssertionLocation peerAddr] -> ShowS
forall peerAddr.
Show peerAddr =>
IGAssertionLocation peerAddr -> String
forall a.
(Int -> a -> ShowS) -> (a -> String) -> ([a] -> ShowS) -> Show a
$cshowsPrec :: forall peerAddr.
Show peerAddr =>
Int -> IGAssertionLocation peerAddr -> ShowS
showsPrec :: Int -> IGAssertionLocation peerAddr -> ShowS
$cshow :: forall peerAddr.
Show peerAddr =>
IGAssertionLocation peerAddr -> String
show :: IGAssertionLocation peerAddr -> String
$cshowList :: forall peerAddr.
Show peerAddr =>
[IGAssertionLocation peerAddr] -> ShowS
showList :: [IGAssertionLocation peerAddr] -> ShowS
Show

data Trace peerAddr
    = TrNewConnection                !Provenance !(ConnectionId peerAddr)
    | TrResponderRestarted           !(ConnectionId peerAddr) !MiniProtocolNum
    | TrResponderStartFailure        !(ConnectionId peerAddr) !MiniProtocolNum !SomeException
    | TrResponderErrored             !(ConnectionId peerAddr) !MiniProtocolNum !SomeException
    | TrResponderStarted             !(ConnectionId peerAddr) !MiniProtocolNum
    | TrResponderTerminated          !(ConnectionId peerAddr) !MiniProtocolNum
    | TrPromotedToWarmRemote         !(ConnectionId peerAddr) !(OperationResult AbstractState)
    | TrPromotedToHotRemote          !(ConnectionId peerAddr)
    | TrDemotedToWarmRemote          !(ConnectionId peerAddr)
    | TrDemotedToColdRemote          !(ConnectionId peerAddr) !(OperationResult DemotedToColdRemoteTr)
    -- ^ All mini-protocols terminated.  The boolean is true if this connection
    -- was not used by p2p-governor, and thus the connection will be terminated.
    | TrWaitIdleRemote               !(ConnectionId peerAddr) !(OperationResult AbstractState)
    | TrMuxCleanExit                 !(ConnectionId peerAddr)
    | TrMuxErrored                   !(ConnectionId peerAddr) SomeException
    | TrInboundGovernorCounters      !Counters
    | TrRemoteState                  !(Map (ConnectionId peerAddr) RemoteSt)
    | TrUnexpectedlyFalseAssertion   !(IGAssertionLocation peerAddr)
    -- ^ This case is unexpected at call site.
    | TrInboundGovernorError         !SomeException
    | TrMaturedConnections           !(Set peerAddr) !(Set peerAddr)
    | TrInactive                     ![(peerAddr, Time)]
  deriving Int -> Trace peerAddr -> ShowS
[Trace peerAddr] -> ShowS
Trace peerAddr -> String
(Int -> Trace peerAddr -> ShowS)
-> (Trace peerAddr -> String)
-> ([Trace peerAddr] -> ShowS)
-> Show (Trace peerAddr)
forall peerAddr. Show peerAddr => Int -> Trace peerAddr -> ShowS
forall peerAddr. Show peerAddr => [Trace peerAddr] -> ShowS
forall peerAddr. Show peerAddr => Trace peerAddr -> String
forall a.
(Int -> a -> ShowS) -> (a -> String) -> ([a] -> ShowS) -> Show a
$cshowsPrec :: forall peerAddr. Show peerAddr => Int -> Trace peerAddr -> ShowS
showsPrec :: Int -> Trace peerAddr -> ShowS
$cshow :: forall peerAddr. Show peerAddr => Trace peerAddr -> String
show :: Trace peerAddr -> String
$cshowList :: forall peerAddr. Show peerAddr => [Trace peerAddr] -> ShowS
showList :: [Trace peerAddr] -> ShowS
Show


data Debug peerAddr versionData = forall muxMode initiatorCtx m a b.
    Debug (State muxMode initiatorCtx peerAddr versionData m a b)

data LoopDecision state peer = OnlyStateChange         !state
                             | StateWithPeerTransition !state !peer