{-# LANGUAGE BangPatterns              #-}
{-# LANGUAGE BlockArguments            #-}
{-# LANGUAGE DataKinds                 #-}
{-# LANGUAGE ExistentialQuantification #-}
{-# LANGUAGE FlexibleContexts          #-}
{-# LANGUAGE GADTs                     #-}
{-# LANGUAGE LambdaCase                #-}
{-# LANGUAGE NamedFieldPuns            #-}
{-# LANGUAGE RankNTypes                #-}
{-# LANGUAGE ScopedTypeVariables       #-}
{-# LANGUAGE TypeFamilyDependencies    #-}
{-# LANGUAGE TypeOperators             #-}

-- | Implementation of 'ConnectionHandler'
--
-- While connection manager responsibility is to keep track of resources:
-- sockets and threads running connection and their state changes (including
-- changes imposed by 'ConnectionHandler', e.g. weather a uni- or duplex- data
-- flow was negotiated), the responsibility of 'ConnectionHandler' is to:
--
-- * run handshake protocol on the underlying bearer
-- * start mux
--
-- 'ConnectionHandler' is run on each inbound or outbound connection and returns
-- 'Handle'.  Upon successful handshake negotiation it returns all the
-- necessary information to run mini-protocols.  Note that it is not responsible
-- for running them: that's what a server does or p2p-governor by means of
-- 'PeerStateActions'.
--
module Ouroboros.Network.ConnectionHandler
  ( Handle (..)
  , HandleWithExpandedCtx
  , HandleWithMinimalCtx
  , HandlerError (..)
  , classifyHandlerError
  , MkMuxConnectionHandler (..)
  , MuxConnectionHandler
  , makeConnectionHandler
  , MuxConnectionManager
  , ConnectionManagerWithExpandedCtx
    -- * tracing
  , ConnectionHandlerTrace (..)
  ) where

import Control.Applicative (Alternative)
import Control.Concurrent.Class.MonadSTM.Strict
import Control.Exception (SomeAsyncException)
import Control.Monad.Class.MonadAsync
import Control.Monad.Class.MonadFork
import Control.Monad.Class.MonadThrow hiding (handle)
import Control.Monad.Class.MonadTime.SI
import Control.Monad.Class.MonadTimer.SI
import Control.Tracer (Tracer, traceWith)

import Data.ByteString.Lazy (ByteString)
import Data.Map (Map)
import Data.Maybe.Strict
import Data.Text (Text)
import Data.Typeable (Typeable)

import Network.Mux (Mux)
import Network.Mux qualified as Mx
import Network.Mux.Trace

import Ouroboros.Network.ConnectionId (ConnectionId (..))
import Ouroboros.Network.ConnectionManager.Types
import Ouroboros.Network.Context (ExpandedInitiatorContext,
           MinimalInitiatorContext, ResponderContext)
import Ouroboros.Network.ControlMessage (ControlMessage (..))
import Ouroboros.Network.InboundGovernor.State
import Ouroboros.Network.Mux
import Ouroboros.Network.MuxMode
import Ouroboros.Network.Protocol.Handshake
import Ouroboros.Network.Protocol.Handshake.Version qualified as Handshake
import Ouroboros.Network.RethrowPolicy


-- | We place an upper limit of `30s` on the time we wait on receiving an SDU.
-- There is no upper bound on the time we wait when waiting for a new SDU.
-- This makes it possible for mini-protocols to use timeouts that are larger
-- than 30s or wait forever.  `30s` for receiving an SDU corresponds to
-- a minimum speed limit of 17kbps.
--
-- ( 8      -- mux header length
-- + 0xffff -- maximum SDU payload
-- )
-- * 8
-- = 524_344 -- maximum bits in an SDU
--
--  524_344 / 30 / 1024 = 17kbps
--
sduTimeout :: DiffTime
sduTimeout :: DiffTime
sduTimeout = DiffTime
30


-- | For handshake, we put a limit of `10s` for sending or receiving a single
-- `MuxSDU`.
--
sduHandshakeTimeout :: DiffTime
sduHandshakeTimeout :: DiffTime
sduHandshakeTimeout = DiffTime
10


-- | States of the connection handler thread.
--
-- * 'MuxRunning' - successful Handshake, mux started
-- * 'HandleHandshakeClientError'
--                - the connection handler thread was running client side
--                of the handshake negotiation, which failed with
--                a 'HandshakeException'
-- * 'HandleHandshakeServerError'
--                - the connection handler thread was running server side of the
--                handshake protocol, which fail with 'HandshakeException'
-- * 'HandlerError'
--                - the multiplexer thrown 'MuxError'.
--
data Handle (muxMode :: Mx.Mode) initiatorCtx responderCtx versionData bytes m a b =
    Handle {
        forall (muxMode :: Mode) initiatorCtx responderCtx versionData
       bytes (m :: * -> *) a b.
Handle muxMode initiatorCtx responderCtx versionData bytes m a b
-> Mux muxMode m
hMux            :: !(Mux muxMode m),
        forall (muxMode :: Mode) initiatorCtx responderCtx versionData
       bytes (m :: * -> *) a b.
Handle muxMode initiatorCtx responderCtx versionData bytes m a b
-> OuroborosBundle muxMode initiatorCtx responderCtx bytes m a b
hMuxBundle      :: !(OuroborosBundle muxMode initiatorCtx responderCtx bytes m a b),
        forall (muxMode :: Mode) initiatorCtx responderCtx versionData
       bytes (m :: * -> *) a b.
Handle muxMode initiatorCtx responderCtx versionData bytes m a b
-> TemperatureBundle (StrictTVar m ControlMessage)
hControlMessage :: !(TemperatureBundle (StrictTVar m ControlMessage)),
        forall (muxMode :: Mode) initiatorCtx responderCtx versionData
       bytes (m :: * -> *) a b.
Handle muxMode initiatorCtx responderCtx versionData bytes m a b
-> versionData
hVersionData    :: !versionData
      }

data MkMuxConnectionHandler (muxMode :: Mx.Mode) socket initiatorCtx responderCtx
                            peerAddr versionNumber versionData bytes m a b where
  MuxInitiatorConnectionHandler :: MkMuxConnectionHandler
                                     Mx.InitiatorMode socket initiatorCtx responderCtx
                                     peerAddr versionNumber versionData bytes m a b
  MuxResponderConnectionHandler :: (   StrictTVar m (StrictMaybe ResponderCounters)
                                    -> Tracer m (WithBearer (ConnectionId peerAddr) Trace))
                                -> MkMuxConnectionHandler
                                     Mx.ResponderMode socket initiatorCtx responderCtx
                                     peerAddr versionNumber versionData bytes m a b
  MuxInitiatorResponderConnectionHandler
    :: (versionData -> DataFlow)
    -> (   StrictTVar m (StrictMaybe ResponderCounters)
        -> Tracer m (WithBearer (ConnectionId peerAddr) Trace))
    -> MkMuxConnectionHandler Mx.InitiatorResponderMode socket initiatorCtx responderCtx peerAddr
                              versionNumber versionData bytes m a b

-- | 'Handle' used by `node-to-node` P2P connections.
--
type HandleWithExpandedCtx muxMode peerAddr versionData bytes m a b =
     Handle    muxMode (ExpandedInitiatorContext peerAddr m)
                       (ResponderContext peerAddr)
                       versionData bytes m a b

-- | 'Handle' used by:
--
-- * `node-to-node` non P2P mode;
-- * `node-to-client` connections.
--
type HandleWithMinimalCtx muxMode peerAddr versionData bytes m a b =
     Handle       muxMode (MinimalInitiatorContext peerAddr)
                          (ResponderContext peerAddr)
                          versionData bytes m a b

-- | A connection handler error.
--
-- It is returned either when creating the `Handle` or raised by the connection
-- handler.
--
data HandlerError versionNumber =
    -- | A handshake exception when creating `Handle`.
    HandleHandshakeClientError !(HandshakeException versionNumber)

    -- | A handshake exception when creating `Handle`.
  | HandleHandshakeServerError !(HandshakeException versionNumber)

    -- | A connection handler exception (e.g. might be a mini-protocol error, io
    -- exception, etc).
  | HandlerError !SomeException
  deriving Int -> HandlerError versionNumber -> ShowS
[HandlerError versionNumber] -> ShowS
HandlerError versionNumber -> String
(Int -> HandlerError versionNumber -> ShowS)
-> (HandlerError versionNumber -> String)
-> ([HandlerError versionNumber] -> ShowS)
-> Show (HandlerError versionNumber)
forall versionNumber.
Show versionNumber =>
Int -> HandlerError versionNumber -> ShowS
forall versionNumber.
Show versionNumber =>
[HandlerError versionNumber] -> ShowS
forall versionNumber.
Show versionNumber =>
HandlerError versionNumber -> String
forall a.
(Int -> a -> ShowS) -> (a -> String) -> ([a] -> ShowS) -> Show a
$cshowsPrec :: forall versionNumber.
Show versionNumber =>
Int -> HandlerError versionNumber -> ShowS
showsPrec :: Int -> HandlerError versionNumber -> ShowS
$cshow :: forall versionNumber.
Show versionNumber =>
HandlerError versionNumber -> String
show :: HandlerError versionNumber -> String
$cshowList :: forall versionNumber.
Show versionNumber =>
[HandlerError versionNumber] -> ShowS
showList :: [HandlerError versionNumber] -> ShowS
Show

instance ( Typeable versionNumber
         , Show versionNumber
         )
      => Exception (HandlerError versionNumber) where
    displayException :: HandlerError versionNumber -> String
displayException (HandleHandshakeClientError HandshakeException versionNumber
err) = String
"handshake client error: " String -> ShowS
forall a. [a] -> [a] -> [a]
++ HandshakeException versionNumber -> String
forall a. Show a => a -> String
show HandshakeException versionNumber
err
    displayException (HandleHandshakeServerError HandshakeException versionNumber
err) = String
"handshake server error: " String -> ShowS
forall a. [a] -> [a] -> [a]
++ HandshakeException versionNumber -> String
forall a. Show a => a -> String
show HandshakeException versionNumber
err
    displayException (HandlerError SomeException
err)                = String
"connection handler error: " String -> ShowS
forall a. [a] -> [a] -> [a]
++ SomeException -> String
forall a. Show a => a -> String
show SomeException
err


classifyHandlerError :: HandlerError versionNumber
                     -> HandlerErrorType
classifyHandlerError :: forall versionNumber.
HandlerError versionNumber -> HandlerErrorType
classifyHandlerError (HandleHandshakeClientError (HandshakeProtocolLimit ProtocolLimitFailure
_)) =
    HandlerErrorType
HandshakeProtocolViolation
-- TODO: 'HandshakeProtocolError' is not a protocol error! It is just
-- a negotiation failure.  It should be renamed.
classifyHandlerError (HandleHandshakeClientError (HandshakeProtocolError HandshakeProtocolError versionNumber
_)) =
    HandlerErrorType
HandshakeFailure
classifyHandlerError (HandleHandshakeServerError (HandshakeProtocolLimit ProtocolLimitFailure
_)) =
    HandlerErrorType
HandshakeProtocolViolation
classifyHandlerError (HandleHandshakeServerError (HandshakeProtocolError HandshakeProtocolError versionNumber
_)) =
    HandlerErrorType
HandshakeFailure
-- any other exception, e.g. MuxError \/ IOError, codec errors, etc.
classifyHandlerError (HandlerError SomeException
_) =
    HandlerErrorType
HandshakeProtocolViolation


-- | Type of 'ConnectionHandler' implemented in this module.
--
type MuxConnectionHandler muxMode socket initiatorCtx responderCtx peerAddr versionNumber versionData bytes m a b =
    ConnectionHandler muxMode
                      (ConnectionHandlerTrace versionNumber versionData)
                      socket
                      peerAddr
                      (Handle muxMode initiatorCtx responderCtx versionData bytes m a b)
                      (HandlerError versionNumber)
                      versionNumber
                      versionData
                      m

-- | Type alias for 'ConnectionManager' using 'Handle'.
--
type MuxConnectionManager muxMode socket initiatorCtx responderCtx peerAddr versionData versionNumber bytes m a b =
    ConnectionManager muxMode socket peerAddr
                      (Handle muxMode initiatorCtx responderCtx versionData bytes m a b)
                      (HandlerError versionNumber)
                      m

-- | Type alias for 'ConnectionManager' which is using expanded context.
--
type ConnectionManagerWithExpandedCtx muxMode socket peerAddr versionData versionNumber bytes m a b =
    ConnectionManager muxMode socket peerAddr
                      (HandleWithExpandedCtx muxMode peerAddr versionData bytes m a b)
                      (HandlerError versionNumber)
                      m

-- | To be used as `makeConnectionHandler` field of 'ConnectionManagerArguments'.
--
-- Note: We need to pass `MiniProtocolBundle` what forces us to have two
-- different `ConnectionManager`s: one for `node-to-client` and another for
-- `node-to-node` connections.  But this is ok, as these resources are
-- independent.
-- When a server is running, the inbound governor creates a tracer which is passed here,
-- and the connection handler appends it to the muxer tracer for
-- inbound and (negotiated) outbound duplex connections. This tracer
-- efficiently informs the IG loop of miniprotocol activity.
--
makeConnectionHandler
    :: forall initiatorCtx responderCtx peerAddr muxMode socket versionNumber versionData m a b.
       ( Alternative (STM m)
       , MonadAsync m
       , MonadDelay m
       , MonadFork  m
       , MonadLabelledSTM m
       , MonadThrow (STM m)
       , MonadTimer m
       , MonadMask  m
       , Ord      versionNumber
       , Show     peerAddr
       , Typeable peerAddr
       )
    => Mx.TracersWithBearer (ConnectionId peerAddr) m
    -> ForkPolicy peerAddr
    -> HandshakeArguments (ConnectionId peerAddr) versionNumber versionData m
    -> Versions versionNumber versionData
                (OuroborosBundle muxMode initiatorCtx responderCtx ByteString m a b)
    -> (ThreadId m, RethrowPolicy)
    -- ^ 'ThreadId' and rethrow policy.  Rethrow policy might throw an async
    -- exception to that thread, when trying to terminate the process.
    -> MkMuxConnectionHandler muxMode socket initiatorCtx responderCtx peerAddr versionNumber versionData ByteString m a b
    -> MuxConnectionHandler muxMode socket initiatorCtx responderCtx peerAddr
                            versionNumber versionData ByteString m a b
makeConnectionHandler :: forall initiatorCtx responderCtx peerAddr (muxMode :: Mode) socket
       versionNumber versionData (m :: * -> *) a b.
(Alternative (STM m), MonadAsync m, MonadDelay m, MonadFork m,
 MonadLabelledSTM m, MonadThrow (STM m), MonadTimer m, MonadMask m,
 Ord versionNumber, Show peerAddr, Typeable peerAddr) =>
TracersWithBearer (ConnectionId peerAddr) m
-> ForkPolicy peerAddr
-> HandshakeArguments
     (ConnectionId peerAddr) versionNumber versionData m
-> Versions
     versionNumber
     versionData
     (OuroborosBundle
        muxMode initiatorCtx responderCtx ByteString m a b)
-> (ThreadId m, RethrowPolicy)
-> MkMuxConnectionHandler
     muxMode
     socket
     initiatorCtx
     responderCtx
     peerAddr
     versionNumber
     versionData
     ByteString
     m
     a
     b
-> MuxConnectionHandler
     muxMode
     socket
     initiatorCtx
     responderCtx
     peerAddr
     versionNumber
     versionData
     ByteString
     m
     a
     b
makeConnectionHandler TracersWithBearer (ConnectionId peerAddr) m
muxTracers ForkPolicy peerAddr
forkPolicy
                      HandshakeArguments
  (ConnectionId peerAddr) versionNumber versionData m
handshakeArguments
                      Versions
  versionNumber
  versionData
  (OuroborosBundle
     muxMode initiatorCtx responderCtx ByteString m a b)
versionedApplication
                      (ThreadId m
mainThreadId, RethrowPolicy
rethrowPolicy) =
  \case
    MkMuxConnectionHandler
  muxMode
  socket
  initiatorCtx
  responderCtx
  peerAddr
  versionNumber
  versionData
  ByteString
  m
  a
  b
MuxInitiatorConnectionHandler ->
      WithMuxTuple
  muxMode
  (ConnectionHandlerFn
     (ConnectionHandlerTrace versionNumber versionData)
     socket
     peerAddr
     (Handle
        muxMode initiatorCtx responderCtx versionData ByteString m a b)
     (HandlerError versionNumber)
     versionNumber
     versionData
     m)
-> MuxConnectionHandler
     muxMode
     socket
     initiatorCtx
     responderCtx
     peerAddr
     versionNumber
     versionData
     ByteString
     m
     a
     b
forall (muxMode :: Mode) handlerTrace socket peerAddr handle
       handleError versionNumber versionData (m :: * -> *).
WithMuxTuple
  muxMode
  (ConnectionHandlerFn
     handlerTrace
     socket
     peerAddr
     handle
     handleError
     versionNumber
     versionData
     m)
-> ConnectionHandler
     muxMode
     handlerTrace
     socket
     peerAddr
     handle
     handleError
     versionNumber
     versionData
     m
ConnectionHandler (WithMuxTuple
   muxMode
   (ConnectionHandlerFn
      (ConnectionHandlerTrace versionNumber versionData)
      socket
      peerAddr
      (Handle
         muxMode initiatorCtx responderCtx versionData ByteString m a b)
      (HandlerError versionNumber)
      versionNumber
      versionData
      m)
 -> MuxConnectionHandler
      muxMode
      socket
      initiatorCtx
      responderCtx
      peerAddr
      versionNumber
      versionData
      ByteString
      m
      a
      b)
-> (ConnectionHandlerFn
      (ConnectionHandlerTrace versionNumber versionData)
      socket
      peerAddr
      (Handle
         muxMode initiatorCtx responderCtx versionData ByteString m a b)
      (HandlerError versionNumber)
      versionNumber
      versionData
      m
    -> WithMuxTuple
         muxMode
         (ConnectionHandlerFn
            (ConnectionHandlerTrace versionNumber versionData)
            socket
            peerAddr
            (Handle
               muxMode initiatorCtx responderCtx versionData ByteString m a b)
            (HandlerError versionNumber)
            versionNumber
            versionData
            m))
-> ConnectionHandlerFn
     (ConnectionHandlerTrace versionNumber versionData)
     socket
     peerAddr
     (Handle
        muxMode initiatorCtx responderCtx versionData ByteString m a b)
     (HandlerError versionNumber)
     versionNumber
     versionData
     m
-> MuxConnectionHandler
     muxMode
     socket
     initiatorCtx
     responderCtx
     peerAddr
     versionNumber
     versionData
     ByteString
     m
     a
     b
forall b c a. (b -> c) -> (a -> b) -> a -> c
. ConnectionHandlerFn
  (ConnectionHandlerTrace versionNumber versionData)
  socket
  peerAddr
  (Handle
     muxMode initiatorCtx responderCtx versionData ByteString m a b)
  (HandlerError versionNumber)
  versionNumber
  versionData
  m
-> WithMuxTuple
     muxMode
     (ConnectionHandlerFn
        (ConnectionHandlerTrace versionNumber versionData)
        socket
        peerAddr
        (Handle
           muxMode initiatorCtx responderCtx versionData ByteString m a b)
        (HandlerError versionNumber)
        versionNumber
        versionData
        m)
ConnectionHandlerFn
  (ConnectionHandlerTrace versionNumber versionData)
  socket
  peerAddr
  (Handle
     muxMode initiatorCtx responderCtx versionData ByteString m a b)
  (HandlerError versionNumber)
  versionNumber
  versionData
  m
-> WithMuxMode
     'InitiatorMode
     (ConnectionHandlerFn
        (ConnectionHandlerTrace versionNumber versionData)
        socket
        peerAddr
        (Handle
           muxMode initiatorCtx responderCtx versionData ByteString m a b)
        (HandlerError versionNumber)
        versionNumber
        versionData
        m)
     (ConnectionHandlerFn
        (ConnectionHandlerTrace versionNumber versionData)
        socket
        peerAddr
        (Handle
           muxMode initiatorCtx responderCtx versionData ByteString m a b)
        (HandlerError versionNumber)
        versionNumber
        versionData
        m)
forall a b. a -> WithMuxMode 'InitiatorMode a b
WithInitiatorMode
      (ConnectionHandlerFn
   (ConnectionHandlerTrace versionNumber versionData)
   socket
   peerAddr
   (Handle
      muxMode initiatorCtx responderCtx versionData ByteString m a b)
   (HandlerError versionNumber)
   versionNumber
   versionData
   m
 -> MuxConnectionHandler
      muxMode
      socket
      initiatorCtx
      responderCtx
      peerAddr
      versionNumber
      versionData
      ByteString
      m
      a
      b)
-> ConnectionHandlerFn
     (ConnectionHandlerTrace versionNumber versionData)
     socket
     peerAddr
     (Handle
        muxMode initiatorCtx responderCtx versionData ByteString m a b)
     (HandlerError versionNumber)
     versionNumber
     versionData
     m
-> MuxConnectionHandler
     muxMode
     socket
     initiatorCtx
     responderCtx
     peerAddr
     versionNumber
     versionData
     ByteString
     m
     a
     b
forall a b. (a -> b) -> a -> b
$ InResponderMode
  muxMode
  (StrictTVar m (StrictMaybe ResponderCounters)
   -> Tracer m (WithBearer (ConnectionId peerAddr) Trace),
   versionData -> DataFlow)
-> ConnectionHandlerFn
     (ConnectionHandlerTrace versionNumber versionData)
     socket
     peerAddr
     (Handle
        muxMode initiatorCtx responderCtx versionData ByteString m a b)
     (HandlerError versionNumber)
     versionNumber
     versionData
     m
outboundConnectionHandler InResponderMode
  muxMode
  (StrictTVar m (StrictMaybe ResponderCounters)
   -> Tracer m (WithBearer (ConnectionId peerAddr) Trace),
   versionData -> DataFlow)
forall (mode :: Mode) a. InResponderMode mode a
NotInResponderMode
    MuxResponderConnectionHandler StrictTVar m (StrictMaybe ResponderCounters)
-> Tracer m (WithBearer (ConnectionId peerAddr) Trace)
inboundGovernorMuxTracer ->
      WithMuxTuple
  muxMode
  (ConnectionHandlerFn
     (ConnectionHandlerTrace versionNumber versionData)
     socket
     peerAddr
     (Handle
        muxMode initiatorCtx responderCtx versionData ByteString m a b)
     (HandlerError versionNumber)
     versionNumber
     versionData
     m)
-> MuxConnectionHandler
     muxMode
     socket
     initiatorCtx
     responderCtx
     peerAddr
     versionNumber
     versionData
     ByteString
     m
     a
     b
forall (muxMode :: Mode) handlerTrace socket peerAddr handle
       handleError versionNumber versionData (m :: * -> *).
WithMuxTuple
  muxMode
  (ConnectionHandlerFn
     handlerTrace
     socket
     peerAddr
     handle
     handleError
     versionNumber
     versionData
     m)
-> ConnectionHandler
     muxMode
     handlerTrace
     socket
     peerAddr
     handle
     handleError
     versionNumber
     versionData
     m
ConnectionHandler (WithMuxTuple
   muxMode
   (ConnectionHandlerFn
      (ConnectionHandlerTrace versionNumber versionData)
      socket
      peerAddr
      (Handle
         muxMode initiatorCtx responderCtx versionData ByteString m a b)
      (HandlerError versionNumber)
      versionNumber
      versionData
      m)
 -> MuxConnectionHandler
      muxMode
      socket
      initiatorCtx
      responderCtx
      peerAddr
      versionNumber
      versionData
      ByteString
      m
      a
      b)
-> ((StrictTVar m (StrictMaybe ResponderCounters)
     -> Tracer m (WithBearer (ConnectionId peerAddr) Trace))
    -> WithMuxTuple
         muxMode
         (ConnectionHandlerFn
            (ConnectionHandlerTrace versionNumber versionData)
            socket
            peerAddr
            (Handle
               muxMode initiatorCtx responderCtx versionData ByteString m a b)
            (HandlerError versionNumber)
            versionNumber
            versionData
            m))
-> (StrictTVar m (StrictMaybe ResponderCounters)
    -> Tracer m (WithBearer (ConnectionId peerAddr) Trace))
-> MuxConnectionHandler
     muxMode
     socket
     initiatorCtx
     responderCtx
     peerAddr
     versionNumber
     versionData
     ByteString
     m
     a
     b
forall b c a. (b -> c) -> (a -> b) -> a -> c
. ConnectionHandlerFn
  (ConnectionHandlerTrace versionNumber versionData)
  socket
  peerAddr
  (Handle
     muxMode initiatorCtx responderCtx versionData ByteString m a b)
  (HandlerError versionNumber)
  versionNumber
  versionData
  m
-> WithMuxTuple
     muxMode
     (ConnectionHandlerFn
        (ConnectionHandlerTrace versionNumber versionData)
        socket
        peerAddr
        (Handle
           muxMode initiatorCtx responderCtx versionData ByteString m a b)
        (HandlerError versionNumber)
        versionNumber
        versionData
        m)
ConnectionHandlerFn
  (ConnectionHandlerTrace versionNumber versionData)
  socket
  peerAddr
  (Handle
     muxMode initiatorCtx responderCtx versionData ByteString m a b)
  (HandlerError versionNumber)
  versionNumber
  versionData
  m
-> WithMuxMode
     'ResponderMode
     (ConnectionHandlerFn
        (ConnectionHandlerTrace versionNumber versionData)
        socket
        peerAddr
        (Handle
           muxMode initiatorCtx responderCtx versionData ByteString m a b)
        (HandlerError versionNumber)
        versionNumber
        versionData
        m)
     (ConnectionHandlerFn
        (ConnectionHandlerTrace versionNumber versionData)
        socket
        peerAddr
        (Handle
           muxMode initiatorCtx responderCtx versionData ByteString m a b)
        (HandlerError versionNumber)
        versionNumber
        versionData
        m)
forall b a. b -> WithMuxMode 'ResponderMode a b
WithResponderMode (ConnectionHandlerFn
   (ConnectionHandlerTrace versionNumber versionData)
   socket
   peerAddr
   (Handle
      muxMode initiatorCtx responderCtx versionData ByteString m a b)
   (HandlerError versionNumber)
   versionNumber
   versionData
   m
 -> WithMuxTuple
      muxMode
      (ConnectionHandlerFn
         (ConnectionHandlerTrace versionNumber versionData)
         socket
         peerAddr
         (Handle
            muxMode initiatorCtx responderCtx versionData ByteString m a b)
         (HandlerError versionNumber)
         versionNumber
         versionData
         m))
-> ((StrictTVar m (StrictMaybe ResponderCounters)
     -> Tracer m (WithBearer (ConnectionId peerAddr) Trace))
    -> ConnectionHandlerFn
         (ConnectionHandlerTrace versionNumber versionData)
         socket
         peerAddr
         (Handle
            muxMode initiatorCtx responderCtx versionData ByteString m a b)
         (HandlerError versionNumber)
         versionNumber
         versionData
         m)
-> (StrictTVar m (StrictMaybe ResponderCounters)
    -> Tracer m (WithBearer (ConnectionId peerAddr) Trace))
-> WithMuxTuple
     muxMode
     (ConnectionHandlerFn
        (ConnectionHandlerTrace versionNumber versionData)
        socket
        peerAddr
        (Handle
           muxMode initiatorCtx responderCtx versionData ByteString m a b)
        (HandlerError versionNumber)
        versionNumber
        versionData
        m)
forall b c a. (b -> c) -> (a -> b) -> a -> c
. (StrictTVar m (StrictMaybe ResponderCounters)
 -> Tracer m (WithBearer (ConnectionId peerAddr) Trace))
-> ConnectionHandlerFn
     (ConnectionHandlerTrace versionNumber versionData)
     socket
     peerAddr
     (Handle
        muxMode initiatorCtx responderCtx versionData ByteString m a b)
     (HandlerError versionNumber)
     versionNumber
     versionData
     m
inboundConnectionHandler ((StrictTVar m (StrictMaybe ResponderCounters)
  -> Tracer m (WithBearer (ConnectionId peerAddr) Trace))
 -> MuxConnectionHandler
      muxMode
      socket
      initiatorCtx
      responderCtx
      peerAddr
      versionNumber
      versionData
      ByteString
      m
      a
      b)
-> (StrictTVar m (StrictMaybe ResponderCounters)
    -> Tracer m (WithBearer (ConnectionId peerAddr) Trace))
-> MuxConnectionHandler
     muxMode
     socket
     initiatorCtx
     responderCtx
     peerAddr
     versionNumber
     versionData
     ByteString
     m
     a
     b
forall a b. (a -> b) -> a -> b
$ StrictTVar m (StrictMaybe ResponderCounters)
-> Tracer m (WithBearer (ConnectionId peerAddr) Trace)
inboundGovernorMuxTracer
    MuxInitiatorResponderConnectionHandler versionData -> DataFlow
connectionDataFlow StrictTVar m (StrictMaybe ResponderCounters)
-> Tracer m (WithBearer (ConnectionId peerAddr) Trace)
inboundGovernorMuxTracer ->
      WithMuxTuple
  muxMode
  (ConnectionHandlerFn
     (ConnectionHandlerTrace versionNumber versionData)
     socket
     peerAddr
     (Handle
        muxMode initiatorCtx responderCtx versionData ByteString m a b)
     (HandlerError versionNumber)
     versionNumber
     versionData
     m)
-> MuxConnectionHandler
     muxMode
     socket
     initiatorCtx
     responderCtx
     peerAddr
     versionNumber
     versionData
     ByteString
     m
     a
     b
forall (muxMode :: Mode) handlerTrace socket peerAddr handle
       handleError versionNumber versionData (m :: * -> *).
WithMuxTuple
  muxMode
  (ConnectionHandlerFn
     handlerTrace
     socket
     peerAddr
     handle
     handleError
     versionNumber
     versionData
     m)
-> ConnectionHandler
     muxMode
     handlerTrace
     socket
     peerAddr
     handle
     handleError
     versionNumber
     versionData
     m
ConnectionHandler (WithMuxTuple
   muxMode
   (ConnectionHandlerFn
      (ConnectionHandlerTrace versionNumber versionData)
      socket
      peerAddr
      (Handle
         muxMode initiatorCtx responderCtx versionData ByteString m a b)
      (HandlerError versionNumber)
      versionNumber
      versionData
      m)
 -> MuxConnectionHandler
      muxMode
      socket
      initiatorCtx
      responderCtx
      peerAddr
      versionNumber
      versionData
      ByteString
      m
      a
      b)
-> WithMuxTuple
     muxMode
     (ConnectionHandlerFn
        (ConnectionHandlerTrace versionNumber versionData)
        socket
        peerAddr
        (Handle
           muxMode initiatorCtx responderCtx versionData ByteString m a b)
        (HandlerError versionNumber)
        versionNumber
        versionData
        m)
-> MuxConnectionHandler
     muxMode
     socket
     initiatorCtx
     responderCtx
     peerAddr
     versionNumber
     versionData
     ByteString
     m
     a
     b
forall a b. (a -> b) -> a -> b
$ ConnectionHandlerFn
  (ConnectionHandlerTrace versionNumber versionData)
  socket
  peerAddr
  (Handle
     muxMode initiatorCtx responderCtx versionData ByteString m a b)
  (HandlerError versionNumber)
  versionNumber
  versionData
  m
-> ConnectionHandlerFn
     (ConnectionHandlerTrace versionNumber versionData)
     socket
     peerAddr
     (Handle
        muxMode initiatorCtx responderCtx versionData ByteString m a b)
     (HandlerError versionNumber)
     versionNumber
     versionData
     m
-> WithMuxMode
     'InitiatorResponderMode
     (ConnectionHandlerFn
        (ConnectionHandlerTrace versionNumber versionData)
        socket
        peerAddr
        (Handle
           muxMode initiatorCtx responderCtx versionData ByteString m a b)
        (HandlerError versionNumber)
        versionNumber
        versionData
        m)
     (ConnectionHandlerFn
        (ConnectionHandlerTrace versionNumber versionData)
        socket
        peerAddr
        (Handle
           muxMode initiatorCtx responderCtx versionData ByteString m a b)
        (HandlerError versionNumber)
        versionNumber
        versionData
        m)
forall a b. a -> b -> WithMuxMode 'InitiatorResponderMode a b
WithInitiatorResponderMode
        (InResponderMode
  muxMode
  (StrictTVar m (StrictMaybe ResponderCounters)
   -> Tracer m (WithBearer (ConnectionId peerAddr) Trace),
   versionData -> DataFlow)
-> ConnectionHandlerFn
     (ConnectionHandlerTrace versionNumber versionData)
     socket
     peerAddr
     (Handle
        muxMode initiatorCtx responderCtx versionData ByteString m a b)
     (HandlerError versionNumber)
     versionNumber
     versionData
     m
outboundConnectionHandler (InResponderMode
   muxMode
   (StrictTVar m (StrictMaybe ResponderCounters)
    -> Tracer m (WithBearer (ConnectionId peerAddr) Trace),
    versionData -> DataFlow)
 -> ConnectionHandlerFn
      (ConnectionHandlerTrace versionNumber versionData)
      socket
      peerAddr
      (Handle
         muxMode initiatorCtx responderCtx versionData ByteString m a b)
      (HandlerError versionNumber)
      versionNumber
      versionData
      m)
-> InResponderMode
     muxMode
     (StrictTVar m (StrictMaybe ResponderCounters)
      -> Tracer m (WithBearer (ConnectionId peerAddr) Trace),
      versionData -> DataFlow)
-> ConnectionHandlerFn
     (ConnectionHandlerTrace versionNumber versionData)
     socket
     peerAddr
     (Handle
        muxMode initiatorCtx responderCtx versionData ByteString m a b)
     (HandlerError versionNumber)
     versionNumber
     versionData
     m
forall a b. (a -> b) -> a -> b
$ (StrictTVar m (StrictMaybe ResponderCounters)
 -> Tracer m (WithBearer (ConnectionId peerAddr) Trace),
 versionData -> DataFlow)
-> InResponderMode
     muxMode
     (StrictTVar m (StrictMaybe ResponderCounters)
      -> Tracer m (WithBearer (ConnectionId peerAddr) Trace),
      versionData -> DataFlow)
forall (mode :: Mode) a.
(HasResponder mode ~ 'True) =>
a -> InResponderMode mode a
InResponderMode (StrictTVar m (StrictMaybe ResponderCounters)
-> Tracer m (WithBearer (ConnectionId peerAddr) Trace)
inboundGovernorMuxTracer, versionData -> DataFlow
connectionDataFlow))
        ((StrictTVar m (StrictMaybe ResponderCounters)
 -> Tracer m (WithBearer (ConnectionId peerAddr) Trace))
-> ConnectionHandlerFn
     (ConnectionHandlerTrace versionNumber versionData)
     socket
     peerAddr
     (Handle
        muxMode initiatorCtx responderCtx versionData ByteString m a b)
     (HandlerError versionNumber)
     versionNumber
     versionData
     m
inboundConnectionHandler StrictTVar m (StrictMaybe ResponderCounters)
-> Tracer m (WithBearer (ConnectionId peerAddr) Trace)
inboundGovernorMuxTracer)
  where
    -- install classify exception handler
    classifyExceptions :: forall x.
                          Tracer m (ConnectionHandlerTrace versionNumber versionData)
                       -> peerAddr
                       -> ErrorContext
                       -> m x -> m x
    classifyExceptions :: forall x.
Tracer m (ConnectionHandlerTrace versionNumber versionData)
-> peerAddr -> ErrorContext -> m x -> m x
classifyExceptions Tracer m (ConnectionHandlerTrace versionNumber versionData)
tracer peerAddr
remoteAddress ErrorContext
ctx m x
io =
      -- handle non-async exceptions
      (SomeException -> Maybe SomeException)
-> m x -> (SomeException -> m x) -> m x
forall e b a.
Exception e =>
(e -> Maybe b) -> m a -> (b -> m a) -> m a
forall (m :: * -> *) e b a.
(MonadCatch m, Exception e) =>
(e -> Maybe b) -> m a -> (b -> m a) -> m a
catchJust
        (\SomeException
e -> case SomeException -> Maybe SomeAsyncException
forall e. Exception e => SomeException -> Maybe e
fromException SomeException
e :: Maybe SomeAsyncException of
                Just SomeAsyncException
_  -> Maybe SomeException
forall a. Maybe a
Nothing
                Maybe SomeAsyncException
Nothing -> SomeException -> Maybe SomeException
forall a. a -> Maybe a
Just SomeException
e)
        m x
io
        ((SomeException -> m x) -> m x) -> (SomeException -> m x) -> m x
forall a b. (a -> b) -> a -> b
$ \SomeException
err -> do
          let cmd :: ErrorCommand
cmd = RethrowPolicy -> ErrorContext -> SomeException -> ErrorCommand
runRethrowPolicy RethrowPolicy
rethrowPolicy ErrorContext
ctx SomeException
err
          Tracer m (ConnectionHandlerTrace versionNumber versionData)
-> ConnectionHandlerTrace versionNumber versionData -> m ()
forall (m :: * -> *) a. Tracer m a -> a -> m ()
traceWith Tracer m (ConnectionHandlerTrace versionNumber versionData)
tracer (ErrorContext
-> SomeException
-> ErrorCommand
-> ConnectionHandlerTrace versionNumber versionData
forall versionNumber versionData.
ErrorContext
-> SomeException
-> ErrorCommand
-> ConnectionHandlerTrace versionNumber versionData
TrConnectionHandlerError ErrorContext
ctx SomeException
err ErrorCommand
cmd)
          case ErrorCommand
cmd of
            ErrorCommand
ShutdownNode -> do
              ThreadId m -> ExceptionInHandler -> m ()
forall e. Exception e => ThreadId m -> e -> m ()
forall (m :: * -> *) e.
(MonadFork m, Exception e) =>
ThreadId m -> e -> m ()
throwTo ThreadId m
mainThreadId (peerAddr -> SomeException -> ExceptionInHandler
forall peerAddr.
(Typeable peerAddr, Show peerAddr) =>
peerAddr -> SomeException -> ExceptionInHandler
ExceptionInHandler peerAddr
remoteAddress SomeException
err)
              SomeException -> m x
forall e a. Exception e => e -> m a
forall (m :: * -> *) e a. (MonadThrow m, Exception e) => e -> m a
throwIO SomeException
err
            ErrorCommand
ShutdownPeer ->
              ExceptionInHandler -> m x
forall e a. Exception e => e -> m a
forall (m :: * -> *) e a. (MonadThrow m, Exception e) => e -> m a
throwIO (peerAddr -> SomeException -> ExceptionInHandler
forall peerAddr.
(Typeable peerAddr, Show peerAddr) =>
peerAddr -> SomeException -> ExceptionInHandler
ExceptionInHandler peerAddr
remoteAddress SomeException
err)

    outboundConnectionHandler
      :: InResponderMode muxMode (   StrictTVar m (StrictMaybe ResponderCounters)
                                  -> Tracer m (WithBearer (ConnectionId peerAddr) Trace)
                                 , versionData -> DataFlow)
      -> ConnectionHandlerFn (ConnectionHandlerTrace versionNumber versionData)
                             socket
                             peerAddr
                             (Handle muxMode initiatorCtx responderCtx versionData ByteString m a b)
                             (HandlerError versionNumber)
                             versionNumber
                             versionData
                             m
    outboundConnectionHandler :: InResponderMode
  muxMode
  (StrictTVar m (StrictMaybe ResponderCounters)
   -> Tracer m (WithBearer (ConnectionId peerAddr) Trace),
   versionData -> DataFlow)
-> ConnectionHandlerFn
     (ConnectionHandlerTrace versionNumber versionData)
     socket
     peerAddr
     (Handle
        muxMode initiatorCtx responderCtx versionData ByteString m a b)
     (HandlerError versionNumber)
     versionNumber
     versionData
     m
outboundConnectionHandler InResponderMode
  muxMode
  (StrictTVar m (StrictMaybe ResponderCounters)
   -> Tracer m (WithBearer (ConnectionId peerAddr) Trace),
   versionData -> DataFlow)
inResponderMode
                              versionData -> versionData
versionDataFn
                              socket
socket
                              PromiseWriter { Either
  (HandlerError versionNumber)
  (HandshakeConnectionResult
     (Handle
        muxMode initiatorCtx responderCtx versionData ByteString m a b)
     (versionNumber, versionData))
-> STM m ()
writePromise :: Either
  (HandlerError versionNumber)
  (HandshakeConnectionResult
     (Handle
        muxMode initiatorCtx responderCtx versionData ByteString m a b)
     (versionNumber, versionData))
-> STM m ()
writePromise :: forall (m :: * -> *) a. PromiseWriter m a -> a -> STM m ()
writePromise }
                              Tracer m (ConnectionHandlerTrace versionNumber versionData)
tracer
                              connectionId :: ConnectionId peerAddr
connectionId@ConnectionId { peerAddr
localAddress :: peerAddr
localAddress :: forall addr. ConnectionId addr -> addr
localAddress
                                                        , peerAddr
remoteAddress :: peerAddr
remoteAddress :: forall addr. ConnectionId addr -> addr
remoteAddress }
                              DiffTime -> socket -> Maybe (ReadBuffer m) -> m (Bearer m)
mkMuxBearer
                              (Maybe (ReadBuffer m) -> m ()) -> m ()
withBuffer
        = MaskedAction { (forall x. m x -> m x) -> m ()
runWithUnmask :: (forall x. m x -> m x) -> m ()
runWithUnmask :: (forall x. m x -> m x) -> m ()
runWithUnmask }
      where
        runWithUnmask :: (forall x. m x -> m x) -> m ()
        runWithUnmask :: (forall x. m x -> m x) -> m ()
runWithUnmask forall x. m x -> m x
unmask =
          Tracer m (ConnectionHandlerTrace versionNumber versionData)
-> peerAddr -> ErrorContext -> m () -> m ()
forall x.
Tracer m (ConnectionHandlerTrace versionNumber versionData)
-> peerAddr -> ErrorContext -> m x -> m x
classifyExceptions Tracer m (ConnectionHandlerTrace versionNumber versionData)
tracer peerAddr
remoteAddress ErrorContext
OutboundError (m () -> m ()) -> m () -> m ()
forall a b. (a -> b) -> a -> b
$ do
            String -> m ()
forall (m :: * -> *). MonadThread m => String -> m ()
labelThisThread ([String] -> String
forall (t :: * -> *) a. Foldable t => t [a] -> [a]
concat [String
"out-conn-hndlr-"
                                    , peerAddr -> String
forall a. Show a => a -> String
show peerAddr
localAddress
                                    , String
"-"
                                    , peerAddr -> String
forall a. Show a => a -> String
show peerAddr
remoteAddress
                                    ])
            handshakeBearer <- DiffTime -> socket -> Maybe (ReadBuffer m) -> m (Bearer m)
mkMuxBearer DiffTime
sduHandshakeTimeout socket
socket Maybe (ReadBuffer m)
forall a. Maybe a
Nothing
            hsResult <-
              unmask (runHandshakeClient handshakeBearer
                                         connectionId
                                         handshakeArguments
                                         (Handshake.updateVersionData versionDataFn versionedApplication))
              -- 'runHandshakeClient' only deals with protocol limit errors or
              -- handshake negotiation failures, but not with 'IOException's or
              -- 'MuxError's.
              `catch` \(SomeException
err :: SomeException) -> do
                STM m () -> m ()
forall a. HasCallStack => STM m a -> m a
forall (m :: * -> *) a.
(MonadSTM m, HasCallStack) =>
STM m a -> m a
atomically (STM m () -> m ()) -> STM m () -> m ()
forall a b. (a -> b) -> a -> b
$ Either
  (HandlerError versionNumber)
  (HandshakeConnectionResult
     (Handle
        muxMode initiatorCtx responderCtx versionData ByteString m a b)
     (versionNumber, versionData))
-> STM m ()
writePromise (HandlerError versionNumber
-> Either
     (HandlerError versionNumber)
     (HandshakeConnectionResult
        (Handle
           muxMode initiatorCtx responderCtx versionData ByteString m a b)
        (versionNumber, versionData))
forall a b. a -> Either a b
Left (SomeException -> HandlerError versionNumber
forall versionNumber. SomeException -> HandlerError versionNumber
HandlerError SomeException
err))
                SomeException
-> m (Either
        (HandshakeException versionNumber)
        (HandshakeResult
           (OuroborosBundle
              muxMode initiatorCtx responderCtx ByteString m a b)
           versionNumber
           versionData))
forall e a. Exception e => e -> m a
forall (m :: * -> *) e a. (MonadThrow m, Exception e) => e -> m a
throwIO SomeException
err
            case hsResult of
              Left !HandshakeException versionNumber
err -> do
                STM m () -> m ()
forall a. HasCallStack => STM m a -> m a
forall (m :: * -> *) a.
(MonadSTM m, HasCallStack) =>
STM m a -> m a
atomically (STM m () -> m ()) -> STM m () -> m ()
forall a b. (a -> b) -> a -> b
$ Either
  (HandlerError versionNumber)
  (HandshakeConnectionResult
     (Handle
        muxMode initiatorCtx responderCtx versionData ByteString m a b)
     (versionNumber, versionData))
-> STM m ()
writePromise (HandlerError versionNumber
-> Either
     (HandlerError versionNumber)
     (HandshakeConnectionResult
        (Handle
           muxMode initiatorCtx responderCtx versionData ByteString m a b)
        (versionNumber, versionData))
forall a b. a -> Either a b
Left (HandshakeException versionNumber -> HandlerError versionNumber
forall versionNumber.
HandshakeException versionNumber -> HandlerError versionNumber
HandleHandshakeClientError HandshakeException versionNumber
err))
                Tracer m (ConnectionHandlerTrace versionNumber versionData)
-> ConnectionHandlerTrace versionNumber versionData -> m ()
forall (m :: * -> *) a. Tracer m a -> a -> m ()
traceWith Tracer m (ConnectionHandlerTrace versionNumber versionData)
tracer (HandshakeException versionNumber
-> ConnectionHandlerTrace versionNumber versionData
forall versionNumber versionData.
HandshakeException versionNumber
-> ConnectionHandlerTrace versionNumber versionData
TrHandshakeClientError HandshakeException versionNumber
err)

              Right (HandshakeNegotiationResult OuroborosBundle muxMode initiatorCtx responderCtx ByteString m a b
app versionNumber
versionNumber versionData
agreedOptions) -> do
                Tracer m (ConnectionHandlerTrace versionNumber versionData)
-> ConnectionHandlerTrace versionNumber versionData -> m ()
forall (m :: * -> *) a. Tracer m a -> a -> m ()
traceWith Tracer m (ConnectionHandlerTrace versionNumber versionData)
tracer (versionNumber
-> versionData -> ConnectionHandlerTrace versionNumber versionData
forall versionNumber versionData.
versionNumber
-> versionData -> ConnectionHandlerTrace versionNumber versionData
TrHandshakeSuccess versionNumber
versionNumber versionData
agreedOptions)
                controlMessageBundle
                  <- (\StrictTVar m ControlMessage
a StrictTVar m ControlMessage
b StrictTVar m ControlMessage
c -> WithProtocolTemperature 'Hot (StrictTVar m ControlMessage)
-> WithProtocolTemperature 'Warm (StrictTVar m ControlMessage)
-> WithProtocolTemperature
     'Established (StrictTVar m ControlMessage)
-> TemperatureBundle (StrictTVar m ControlMessage)
forall a.
WithProtocolTemperature 'Hot a
-> WithProtocolTemperature 'Warm a
-> WithProtocolTemperature 'Established a
-> TemperatureBundle a
TemperatureBundle (StrictTVar m ControlMessage
-> WithProtocolTemperature 'Hot (StrictTVar m ControlMessage)
forall a. a -> WithProtocolTemperature 'Hot a
WithHot StrictTVar m ControlMessage
a) (StrictTVar m ControlMessage
-> WithProtocolTemperature 'Warm (StrictTVar m ControlMessage)
forall a. a -> WithProtocolTemperature 'Warm a
WithWarm StrictTVar m ControlMessage
b) (StrictTVar m ControlMessage
-> WithProtocolTemperature
     'Established (StrictTVar m ControlMessage)
forall a. a -> WithProtocolTemperature 'Established a
WithEstablished StrictTVar m ControlMessage
c))
                      (StrictTVar m ControlMessage
 -> StrictTVar m ControlMessage
 -> StrictTVar m ControlMessage
 -> TemperatureBundle (StrictTVar m ControlMessage))
-> m (StrictTVar m ControlMessage)
-> m (StrictTVar m ControlMessage
      -> StrictTVar m ControlMessage
      -> TemperatureBundle (StrictTVar m ControlMessage))
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> ControlMessage -> m (StrictTVar m ControlMessage)
forall (m :: * -> *) a. MonadSTM m => a -> m (StrictTVar m a)
newTVarIO ControlMessage
Continue
                      m (StrictTVar m ControlMessage
   -> StrictTVar m ControlMessage
   -> TemperatureBundle (StrictTVar m ControlMessage))
-> m (StrictTVar m ControlMessage)
-> m (StrictTVar m ControlMessage
      -> TemperatureBundle (StrictTVar m ControlMessage))
forall a b. m (a -> b) -> m a -> m b
forall (f :: * -> *) a b. Applicative f => f (a -> b) -> f a -> f b
<*> ControlMessage -> m (StrictTVar m ControlMessage)
forall (m :: * -> *) a. MonadSTM m => a -> m (StrictTVar m a)
newTVarIO ControlMessage
Continue
                      m (StrictTVar m ControlMessage
   -> TemperatureBundle (StrictTVar m ControlMessage))
-> m (StrictTVar m ControlMessage)
-> m (TemperatureBundle (StrictTVar m ControlMessage))
forall a b. m (a -> b) -> m a -> m b
forall (f :: * -> *) a b. Applicative f => f (a -> b) -> f a -> f b
<*> ControlMessage -> m (StrictTVar m ControlMessage)
forall (m :: * -> *) a. MonadSTM m => a -> m (StrictTVar m a)
newTVarIO ControlMessage
Continue
                mux <- Mx.new (mkMiniProtocolInfos (runForkPolicy forkPolicy remoteAddress) app)
                let !handle = Handle {
                        hMux :: Mux muxMode m
hMux            = Mux muxMode m
mux,
                        hMuxBundle :: OuroborosBundle muxMode initiatorCtx responderCtx ByteString m a b
hMuxBundle      = OuroborosBundle muxMode initiatorCtx responderCtx ByteString m a b
app,
                        hControlMessage :: TemperatureBundle (StrictTVar m ControlMessage)
hControlMessage = TemperatureBundle (StrictTVar m ControlMessage)
controlMessageBundle,
                        hVersionData :: versionData
hVersionData    = versionData
agreedOptions
                      }
                atomically $ writePromise (Right $ HandshakeConnectionResult handle (versionNumber, agreedOptions))
                withBuffer \Maybe (ReadBuffer m)
buffer -> do
                  bearer <- DiffTime -> socket -> Maybe (ReadBuffer m) -> m (Bearer m)
mkMuxBearer DiffTime
sduTimeout socket
socket Maybe (ReadBuffer m)
buffer
                  muxTracers' <- case inResponderMode of
                    InResponderMode (StrictTVar m (StrictMaybe ResponderCounters)
-> Tracer m (WithBearer (ConnectionId peerAddr) Trace)
inboundGovernorMuxTracer, versionData -> DataFlow
connectionDataFlow)
                      | DataFlow
Duplex <- versionData -> DataFlow
connectionDataFlow versionData
agreedOptions -> do
                          countersVar <- StrictMaybe ResponderCounters
-> m (StrictTVar m (StrictMaybe ResponderCounters))
forall (m :: * -> *) a. MonadSTM m => a -> m (StrictTVar m a)
newTVarIO (StrictMaybe ResponderCounters
 -> m (StrictTVar m (StrictMaybe ResponderCounters)))
-> (ResponderCounters -> StrictMaybe ResponderCounters)
-> ResponderCounters
-> m (StrictTVar m (StrictMaybe ResponderCounters))
forall b c a. (b -> c) -> (a -> b) -> a -> c
. ResponderCounters -> StrictMaybe ResponderCounters
forall a. a -> StrictMaybe a
SJust (ResponderCounters
 -> m (StrictTVar m (StrictMaybe ResponderCounters)))
-> ResponderCounters
-> m (StrictTVar m (StrictMaybe ResponderCounters))
forall a b. (a -> b) -> a -> b
$ Int -> Int -> ResponderCounters
ResponderCounters Int
0 Int
0
                          pure $ Mx.tracersWithBearer connectionId muxTracers {
                              Mx.tracer = Mx.tracer muxTracers <> inboundGovernorMuxTracer countersVar
                            }
                    InResponderMode
  muxMode
  (StrictTVar m (StrictMaybe ResponderCounters)
   -> Tracer m (WithBearer (ConnectionId peerAddr) Trace),
   versionData -> DataFlow)
_notResponder ->
                          -- If this is InitiatorOnly, or a server where unidirectional flow was negotiated
                          -- the IG will never be informed of this remote for obvious reasons.
                          Tracers m -> m (Tracers m)
forall a. a -> m a
forall (f :: * -> *) a. Applicative f => a -> f a
pure (Tracers m -> m (Tracers m)) -> Tracers m -> m (Tracers m)
forall a b. (a -> b) -> a -> b
$ ConnectionId peerAddr
-> TracersWithBearer (ConnectionId peerAddr) m -> Tracers m
forall peerId (m :: * -> *).
peerId -> TracersWithBearer peerId m -> Tracers m
Mx.tracersWithBearer ConnectionId peerAddr
connectionId TracersWithBearer (ConnectionId peerAddr) m
muxTracers
                  unmask $ Mx.run muxTracers' mux bearer

              Right (HandshakeQueryResult Map versionNumber (Either Text versionData)
vMap) -> do
                STM m () -> m ()
forall a. HasCallStack => STM m a -> m a
forall (m :: * -> *) a.
(MonadSTM m, HasCallStack) =>
STM m a -> m a
atomically (STM m () -> m ()) -> STM m () -> m ()
forall a b. (a -> b) -> a -> b
$ Either
  (HandlerError versionNumber)
  (HandshakeConnectionResult
     (Handle
        muxMode initiatorCtx responderCtx versionData ByteString m a b)
     (versionNumber, versionData))
-> STM m ()
writePromise (HandshakeConnectionResult
  (Handle
     muxMode initiatorCtx responderCtx versionData ByteString m a b)
  (versionNumber, versionData)
-> Either
     (HandlerError versionNumber)
     (HandshakeConnectionResult
        (Handle
           muxMode initiatorCtx responderCtx versionData ByteString m a b)
        (versionNumber, versionData))
forall a b. b -> Either a b
Right HandshakeConnectionResult
  (Handle
     muxMode initiatorCtx responderCtx versionData ByteString m a b)
  (versionNumber, versionData)
forall handle version. HandshakeConnectionResult handle version
HandshakeConnectionQuery)
                Tracer m (ConnectionHandlerTrace versionNumber versionData)
-> ConnectionHandlerTrace versionNumber versionData -> m ()
forall (m :: * -> *) a. Tracer m a -> a -> m ()
traceWith Tracer m (ConnectionHandlerTrace versionNumber versionData)
tracer (ConnectionHandlerTrace versionNumber versionData -> m ())
-> ConnectionHandlerTrace versionNumber versionData -> m ()
forall a b. (a -> b) -> a -> b
$ Map versionNumber (Either Text versionData)
-> ConnectionHandlerTrace versionNumber versionData
forall versionNumber versionData.
Map versionNumber (Either Text versionData)
-> ConnectionHandlerTrace versionNumber versionData
TrHandshakeQuery Map versionNumber (Either Text versionData)
vMap


    inboundConnectionHandler
      :: (   StrictTVar m (StrictMaybe ResponderCounters)
          -> Tracer m (WithBearer (ConnectionId peerAddr) Trace))
      -> ConnectionHandlerFn (ConnectionHandlerTrace versionNumber versionData)
                             socket
                             peerAddr
                             (Handle muxMode initiatorCtx responderCtx versionData ByteString m a b)
                             (HandlerError versionNumber)
                             versionNumber
                             versionData
                             m
    inboundConnectionHandler :: (StrictTVar m (StrictMaybe ResponderCounters)
 -> Tracer m (WithBearer (ConnectionId peerAddr) Trace))
-> ConnectionHandlerFn
     (ConnectionHandlerTrace versionNumber versionData)
     socket
     peerAddr
     (Handle
        muxMode initiatorCtx responderCtx versionData ByteString m a b)
     (HandlerError versionNumber)
     versionNumber
     versionData
     m
inboundConnectionHandler StrictTVar m (StrictMaybe ResponderCounters)
-> Tracer m (WithBearer (ConnectionId peerAddr) Trace)
inboundGovernorMuxTracer
                             versionData -> versionData
updateVersionDataFn
                             socket
socket
                             PromiseWriter { Either
  (HandlerError versionNumber)
  (HandshakeConnectionResult
     (Handle
        muxMode initiatorCtx responderCtx versionData ByteString m a b)
     (versionNumber, versionData))
-> STM m ()
writePromise :: forall (m :: * -> *) a. PromiseWriter m a -> a -> STM m ()
writePromise :: Either
  (HandlerError versionNumber)
  (HandshakeConnectionResult
     (Handle
        muxMode initiatorCtx responderCtx versionData ByteString m a b)
     (versionNumber, versionData))
-> STM m ()
writePromise }
                             Tracer m (ConnectionHandlerTrace versionNumber versionData)
tracer
                             connectionId :: ConnectionId peerAddr
connectionId@ConnectionId { peerAddr
localAddress :: forall addr. ConnectionId addr -> addr
localAddress :: peerAddr
localAddress
                                                       , peerAddr
remoteAddress :: forall addr. ConnectionId addr -> addr
remoteAddress :: peerAddr
remoteAddress }
                             DiffTime -> socket -> Maybe (ReadBuffer m) -> m (Bearer m)
mkMuxBearer
                             (Maybe (ReadBuffer m) -> m ()) -> m ()
withBuffer
        = MaskedAction { (forall x. m x -> m x) -> m ()
runWithUnmask :: (forall x. m x -> m x) -> m ()
runWithUnmask :: (forall x. m x -> m x) -> m ()
runWithUnmask }
      where
        runWithUnmask :: (forall x. m x -> m x) -> m ()
        runWithUnmask :: (forall x. m x -> m x) -> m ()
runWithUnmask forall x. m x -> m x
unmask =
          Tracer m (ConnectionHandlerTrace versionNumber versionData)
-> peerAddr -> ErrorContext -> m () -> m ()
forall x.
Tracer m (ConnectionHandlerTrace versionNumber versionData)
-> peerAddr -> ErrorContext -> m x -> m x
classifyExceptions Tracer m (ConnectionHandlerTrace versionNumber versionData)
tracer peerAddr
remoteAddress ErrorContext
InboundError (m () -> m ()) -> m () -> m ()
forall a b. (a -> b) -> a -> b
$ do
            String -> m ()
forall (m :: * -> *). MonadThread m => String -> m ()
labelThisThread ([String] -> String
forall (t :: * -> *) a. Foldable t => t [a] -> [a]
concat [String
"in-conn-hndlr-"
                                    , peerAddr -> String
forall a. Show a => a -> String
show peerAddr
localAddress
                                    , String
"-"
                                    , peerAddr -> String
forall a. Show a => a -> String
show peerAddr
remoteAddress
                                    ])
            handshakeBearer <- DiffTime -> socket -> Maybe (ReadBuffer m) -> m (Bearer m)
mkMuxBearer DiffTime
sduHandshakeTimeout socket
socket Maybe (ReadBuffer m)
forall a. Maybe a
Nothing
            hsResult <-
              unmask (runHandshakeServer handshakeBearer
                                         connectionId
                                         handshakeArguments
                                         (Handshake.updateVersionData updateVersionDataFn versionedApplication))
              -- 'runHandshakeServer' only deals with protocol limit errors or
              -- handshake negotiation failures, but not with 'IOException's or
              -- 'MuxError's.
              `catch` \(SomeException
err :: SomeException) -> do
                STM m () -> m ()
forall a. HasCallStack => STM m a -> m a
forall (m :: * -> *) a.
(MonadSTM m, HasCallStack) =>
STM m a -> m a
atomically (STM m () -> m ()) -> STM m () -> m ()
forall a b. (a -> b) -> a -> b
$ Either
  (HandlerError versionNumber)
  (HandshakeConnectionResult
     (Handle
        muxMode initiatorCtx responderCtx versionData ByteString m a b)
     (versionNumber, versionData))
-> STM m ()
writePromise (HandlerError versionNumber
-> Either
     (HandlerError versionNumber)
     (HandshakeConnectionResult
        (Handle
           muxMode initiatorCtx responderCtx versionData ByteString m a b)
        (versionNumber, versionData))
forall a b. a -> Either a b
Left (SomeException -> HandlerError versionNumber
forall versionNumber. SomeException -> HandlerError versionNumber
HandlerError SomeException
err))
                SomeException
-> m (Either
        (HandshakeException versionNumber)
        (HandshakeResult
           (OuroborosBundle
              muxMode initiatorCtx responderCtx ByteString m a b)
           versionNumber
           versionData))
forall e a. Exception e => e -> m a
forall (m :: * -> *) e a. (MonadThrow m, Exception e) => e -> m a
throwIO SomeException
err

            case hsResult of
              Left !HandshakeException versionNumber
err -> do
                STM m () -> m ()
forall a. HasCallStack => STM m a -> m a
forall (m :: * -> *) a.
(MonadSTM m, HasCallStack) =>
STM m a -> m a
atomically (STM m () -> m ()) -> STM m () -> m ()
forall a b. (a -> b) -> a -> b
$ Either
  (HandlerError versionNumber)
  (HandshakeConnectionResult
     (Handle
        muxMode initiatorCtx responderCtx versionData ByteString m a b)
     (versionNumber, versionData))
-> STM m ()
writePromise (HandlerError versionNumber
-> Either
     (HandlerError versionNumber)
     (HandshakeConnectionResult
        (Handle
           muxMode initiatorCtx responderCtx versionData ByteString m a b)
        (versionNumber, versionData))
forall a b. a -> Either a b
Left (HandshakeException versionNumber -> HandlerError versionNumber
forall versionNumber.
HandshakeException versionNumber -> HandlerError versionNumber
HandleHandshakeServerError HandshakeException versionNumber
err))
                Tracer m (ConnectionHandlerTrace versionNumber versionData)
-> ConnectionHandlerTrace versionNumber versionData -> m ()
forall (m :: * -> *) a. Tracer m a -> a -> m ()
traceWith Tracer m (ConnectionHandlerTrace versionNumber versionData)
tracer (HandshakeException versionNumber
-> ConnectionHandlerTrace versionNumber versionData
forall versionNumber versionData.
HandshakeException versionNumber
-> ConnectionHandlerTrace versionNumber versionData
TrHandshakeServerError HandshakeException versionNumber
err)
              Right (HandshakeNegotiationResult OuroborosBundle muxMode initiatorCtx responderCtx ByteString m a b
app versionNumber
versionNumber versionData
agreedOptions) -> do
               Tracer m (ConnectionHandlerTrace versionNumber versionData)
-> ConnectionHandlerTrace versionNumber versionData -> m ()
forall (m :: * -> *) a. Tracer m a -> a -> m ()
traceWith Tracer m (ConnectionHandlerTrace versionNumber versionData)
tracer (versionNumber
-> versionData -> ConnectionHandlerTrace versionNumber versionData
forall versionNumber versionData.
versionNumber
-> versionData -> ConnectionHandlerTrace versionNumber versionData
TrHandshakeSuccess versionNumber
versionNumber versionData
agreedOptions)
               controlMessageBundle
                 <- (\StrictTVar m ControlMessage
a StrictTVar m ControlMessage
b StrictTVar m ControlMessage
c -> WithProtocolTemperature 'Hot (StrictTVar m ControlMessage)
-> WithProtocolTemperature 'Warm (StrictTVar m ControlMessage)
-> WithProtocolTemperature
     'Established (StrictTVar m ControlMessage)
-> TemperatureBundle (StrictTVar m ControlMessage)
forall a.
WithProtocolTemperature 'Hot a
-> WithProtocolTemperature 'Warm a
-> WithProtocolTemperature 'Established a
-> TemperatureBundle a
TemperatureBundle (StrictTVar m ControlMessage
-> WithProtocolTemperature 'Hot (StrictTVar m ControlMessage)
forall a. a -> WithProtocolTemperature 'Hot a
WithHot StrictTVar m ControlMessage
a) (StrictTVar m ControlMessage
-> WithProtocolTemperature 'Warm (StrictTVar m ControlMessage)
forall a. a -> WithProtocolTemperature 'Warm a
WithWarm StrictTVar m ControlMessage
b) (StrictTVar m ControlMessage
-> WithProtocolTemperature
     'Established (StrictTVar m ControlMessage)
forall a. a -> WithProtocolTemperature 'Established a
WithEstablished StrictTVar m ControlMessage
c))
                     (StrictTVar m ControlMessage
 -> StrictTVar m ControlMessage
 -> StrictTVar m ControlMessage
 -> TemperatureBundle (StrictTVar m ControlMessage))
-> m (StrictTVar m ControlMessage)
-> m (StrictTVar m ControlMessage
      -> StrictTVar m ControlMessage
      -> TemperatureBundle (StrictTVar m ControlMessage))
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> ControlMessage -> m (StrictTVar m ControlMessage)
forall (m :: * -> *) a. MonadSTM m => a -> m (StrictTVar m a)
newTVarIO ControlMessage
Continue
                     m (StrictTVar m ControlMessage
   -> StrictTVar m ControlMessage
   -> TemperatureBundle (StrictTVar m ControlMessage))
-> m (StrictTVar m ControlMessage)
-> m (StrictTVar m ControlMessage
      -> TemperatureBundle (StrictTVar m ControlMessage))
forall a b. m (a -> b) -> m a -> m b
forall (f :: * -> *) a b. Applicative f => f (a -> b) -> f a -> f b
<*> ControlMessage -> m (StrictTVar m ControlMessage)
forall (m :: * -> *) a. MonadSTM m => a -> m (StrictTVar m a)
newTVarIO ControlMessage
Continue
                     m (StrictTVar m ControlMessage
   -> TemperatureBundle (StrictTVar m ControlMessage))
-> m (StrictTVar m ControlMessage)
-> m (TemperatureBundle (StrictTVar m ControlMessage))
forall a b. m (a -> b) -> m a -> m b
forall (f :: * -> *) a b. Applicative f => f (a -> b) -> f a -> f b
<*> ControlMessage -> m (StrictTVar m ControlMessage)
forall (m :: * -> *) a. MonadSTM m => a -> m (StrictTVar m a)
newTVarIO ControlMessage
Continue
               mux <- Mx.new (mkMiniProtocolInfos (runForkPolicy forkPolicy remoteAddress) app)

               let !handle = Handle {
                       hMux :: Mux muxMode m
hMux            = Mux muxMode m
mux,
                       hMuxBundle :: OuroborosBundle muxMode initiatorCtx responderCtx ByteString m a b
hMuxBundle      = OuroborosBundle muxMode initiatorCtx responderCtx ByteString m a b
app,
                       hControlMessage :: TemperatureBundle (StrictTVar m ControlMessage)
hControlMessage = TemperatureBundle (StrictTVar m ControlMessage)
controlMessageBundle,
                       hVersionData :: versionData
hVersionData    = versionData
agreedOptions
                     }
               atomically $ writePromise (Right $ HandshakeConnectionResult handle (versionNumber, agreedOptions))
               withBuffer \Maybe (ReadBuffer m)
buffer -> do
                 bearer <- DiffTime -> socket -> Maybe (ReadBuffer m) -> m (Bearer m)
mkMuxBearer DiffTime
sduTimeout socket
socket Maybe (ReadBuffer m)
buffer
                 countersVar <- newTVarIO . SJust $ ResponderCounters 0 0
                 unmask $ Mx.run (Mx.tracersWithBearer connectionId muxTracers {
                               Mx.tracer = Mx.tracer muxTracers <> inboundGovernorMuxTracer countersVar
                             })
                             mux bearer
              Right (HandshakeQueryResult Map versionNumber (Either Text versionData)
vMap) -> do
                STM m () -> m ()
forall a. HasCallStack => STM m a -> m a
forall (m :: * -> *) a.
(MonadSTM m, HasCallStack) =>
STM m a -> m a
atomically (STM m () -> m ()) -> STM m () -> m ()
forall a b. (a -> b) -> a -> b
$ Either
  (HandlerError versionNumber)
  (HandshakeConnectionResult
     (Handle
        muxMode initiatorCtx responderCtx versionData ByteString m a b)
     (versionNumber, versionData))
-> STM m ()
writePromise (HandshakeConnectionResult
  (Handle
     muxMode initiatorCtx responderCtx versionData ByteString m a b)
  (versionNumber, versionData)
-> Either
     (HandlerError versionNumber)
     (HandshakeConnectionResult
        (Handle
           muxMode initiatorCtx responderCtx versionData ByteString m a b)
        (versionNumber, versionData))
forall a b. b -> Either a b
Right HandshakeConnectionResult
  (Handle
     muxMode initiatorCtx responderCtx versionData ByteString m a b)
  (versionNumber, versionData)
forall handle version. HandshakeConnectionResult handle version
HandshakeConnectionQuery)
                Tracer m (ConnectionHandlerTrace versionNumber versionData)
-> ConnectionHandlerTrace versionNumber versionData -> m ()
forall (m :: * -> *) a. Tracer m a -> a -> m ()
traceWith Tracer m (ConnectionHandlerTrace versionNumber versionData)
tracer (ConnectionHandlerTrace versionNumber versionData -> m ())
-> ConnectionHandlerTrace versionNumber versionData -> m ()
forall a b. (a -> b) -> a -> b
$ Map versionNumber (Either Text versionData)
-> ConnectionHandlerTrace versionNumber versionData
forall versionNumber versionData.
Map versionNumber (Either Text versionData)
-> ConnectionHandlerTrace versionNumber versionData
TrHandshakeQuery Map versionNumber (Either Text versionData)
vMap
                -- Wait 20s for client to receive response, who should close the connection.
                DiffTime -> m ()
forall (m :: * -> *). MonadDelay m => DiffTime -> m ()
threadDelay DiffTime
handshake_QUERY_SHUTDOWN_DELAY



--
-- Tracing
--


-- | 'ConnectionHandlerTrace' is embedded into
-- 'Ouroboros.Network.ConnectionManager.Core.Trace' with
-- 'Ouroboros.Network.ConnectionManager.Types.TrConnectionHandler' constructor.
-- It already includes 'ConnectionId' so we don't need to take care of it here.
--
-- TODO: when 'Handshake' will get its own tracer, independent of 'Mux', it
-- should be embedded into 'ConnectionHandlerTrace'.
--
data ConnectionHandlerTrace versionNumber versionData =
      TrHandshakeSuccess versionNumber versionData
    | TrHandshakeQuery (Map versionNumber (Either Text versionData))
    | TrHandshakeClientError
        (HandshakeException versionNumber)
    | TrHandshakeServerError
        (HandshakeException versionNumber)
    | TrConnectionHandlerError ErrorContext SomeException ErrorCommand
  deriving Int -> ConnectionHandlerTrace versionNumber versionData -> ShowS
[ConnectionHandlerTrace versionNumber versionData] -> ShowS
ConnectionHandlerTrace versionNumber versionData -> String
(Int -> ConnectionHandlerTrace versionNumber versionData -> ShowS)
-> (ConnectionHandlerTrace versionNumber versionData -> String)
-> ([ConnectionHandlerTrace versionNumber versionData] -> ShowS)
-> Show (ConnectionHandlerTrace versionNumber versionData)
forall a.
(Int -> a -> ShowS) -> (a -> String) -> ([a] -> ShowS) -> Show a
forall versionNumber versionData.
(Show versionNumber, Show versionData) =>
Int -> ConnectionHandlerTrace versionNumber versionData -> ShowS
forall versionNumber versionData.
(Show versionNumber, Show versionData) =>
[ConnectionHandlerTrace versionNumber versionData] -> ShowS
forall versionNumber versionData.
(Show versionNumber, Show versionData) =>
ConnectionHandlerTrace versionNumber versionData -> String
$cshowsPrec :: forall versionNumber versionData.
(Show versionNumber, Show versionData) =>
Int -> ConnectionHandlerTrace versionNumber versionData -> ShowS
showsPrec :: Int -> ConnectionHandlerTrace versionNumber versionData -> ShowS
$cshow :: forall versionNumber versionData.
(Show versionNumber, Show versionData) =>
ConnectionHandlerTrace versionNumber versionData -> String
show :: ConnectionHandlerTrace versionNumber versionData -> String
$cshowList :: forall versionNumber versionData.
(Show versionNumber, Show versionData) =>
[ConnectionHandlerTrace versionNumber versionData] -> ShowS
showList :: [ConnectionHandlerTrace versionNumber versionData] -> ShowS
Show