{-# LANGUAGE BangPatterns #-}
{-# LANGUAGE BlockArguments #-}
{-# LANGUAGE DataKinds #-}
{-# LANGUAGE ExistentialQuantification #-}
{-# LANGUAGE FlexibleContexts #-}
{-# LANGUAGE GADTs #-}
{-# LANGUAGE LambdaCase #-}
{-# LANGUAGE NamedFieldPuns #-}
{-# LANGUAGE RankNTypes #-}
{-# LANGUAGE ScopedTypeVariables #-}
{-# LANGUAGE TypeFamilyDependencies #-}
{-# LANGUAGE TypeOperators #-}
module Ouroboros.Network.ConnectionHandler
( Handle (..)
, HandleWithExpandedCtx
, HandleWithMinimalCtx
, HandleError (..)
, classifyHandleError
, MkMuxConnectionHandler (..)
, MuxConnectionHandler
, makeConnectionHandler
, MuxConnectionManager
, ConnectionManagerWithExpandedCtx
, ConnectionHandlerTrace (..)
) where
import Control.Applicative (Alternative)
import Control.Concurrent.Class.MonadSTM.Strict
import Control.Exception (SomeAsyncException)
import Control.Monad.Class.MonadAsync
import Control.Monad.Class.MonadFork
import Control.Monad.Class.MonadThrow hiding (handle)
import Control.Monad.Class.MonadTime.SI
import Control.Monad.Class.MonadTimer.SI
import Control.Tracer (Tracer, traceWith)
import Data.ByteString.Lazy (ByteString)
import Data.Map (Map)
import Data.Maybe.Strict
import Data.Text (Text)
import Data.Typeable (Typeable)
import Network.Mux (Mux)
import Network.Mux qualified as Mx
import Network.Mux.Trace
import Ouroboros.Network.ConnectionId (ConnectionId (..))
import Ouroboros.Network.ConnectionManager.Types
import Ouroboros.Network.Context (ExpandedInitiatorContext,
MinimalInitiatorContext, ResponderContext)
import Ouroboros.Network.ControlMessage (ControlMessage (..))
import Ouroboros.Network.InboundGovernor.State
import Ouroboros.Network.Mux
import Ouroboros.Network.MuxMode
import Ouroboros.Network.Protocol.Handshake
import Ouroboros.Network.Protocol.Handshake.Version qualified as Handshake
import Ouroboros.Network.RethrowPolicy
sduTimeout :: DiffTime
sduTimeout :: DiffTime
sduTimeout = DiffTime
30
sduHandshakeTimeout :: DiffTime
sduHandshakeTimeout :: DiffTime
sduHandshakeTimeout = DiffTime
10
data Handle (muxMode :: Mx.Mode) initiatorCtx responderCtx versionData bytes m a b =
Handle {
forall (muxMode :: Mode) initiatorCtx responderCtx versionData
bytes (m :: * -> *) a b.
Handle muxMode initiatorCtx responderCtx versionData bytes m a b
-> Mux muxMode m
hMux :: !(Mux muxMode m),
forall (muxMode :: Mode) initiatorCtx responderCtx versionData
bytes (m :: * -> *) a b.
Handle muxMode initiatorCtx responderCtx versionData bytes m a b
-> OuroborosBundle muxMode initiatorCtx responderCtx bytes m a b
hMuxBundle :: !(OuroborosBundle muxMode initiatorCtx responderCtx bytes m a b),
forall (muxMode :: Mode) initiatorCtx responderCtx versionData
bytes (m :: * -> *) a b.
Handle muxMode initiatorCtx responderCtx versionData bytes m a b
-> TemperatureBundle (StrictTVar m ControlMessage)
hControlMessage :: !(TemperatureBundle (StrictTVar m ControlMessage)),
forall (muxMode :: Mode) initiatorCtx responderCtx versionData
bytes (m :: * -> *) a b.
Handle muxMode initiatorCtx responderCtx versionData bytes m a b
-> versionData
hVersionData :: !versionData
}
data MkMuxConnectionHandler (muxMode :: Mx.Mode) socket initiatorCtx responderCtx
peerAddr versionNumber versionData bytes m a b where
MuxInitiatorConnectionHandler :: MkMuxConnectionHandler
Mx.InitiatorMode socket initiatorCtx responderCtx
peerAddr versionNumber versionData bytes m a b
MuxResponderConnectionHandler :: ( StrictTVar m (StrictMaybe ResponderCounters)
-> Tracer m (WithBearer (ConnectionId peerAddr) Trace))
-> MkMuxConnectionHandler
Mx.ResponderMode socket initiatorCtx responderCtx
peerAddr versionNumber versionData bytes m a b
MuxInitiatorResponderConnectionHandler
:: (versionData -> DataFlow)
-> ( StrictTVar m (StrictMaybe ResponderCounters)
-> Tracer m (WithBearer (ConnectionId peerAddr) Trace))
-> MkMuxConnectionHandler Mx.InitiatorResponderMode socket initiatorCtx responderCtx peerAddr
versionNumber versionData bytes m a b
type HandleWithExpandedCtx muxMode peerAddr versionData bytes m a b =
Handle muxMode (ExpandedInitiatorContext peerAddr m)
(ResponderContext peerAddr)
versionData bytes m a b
type HandleWithMinimalCtx muxMode peerAddr versionData bytes m a b =
Handle muxMode (MinimalInitiatorContext peerAddr)
(ResponderContext peerAddr)
versionData bytes m a b
data HandleError (muxMode :: Mx.Mode) versionNumber where
HandleHandshakeClientError
:: HasInitiator muxMode ~ True
=> !(HandshakeException versionNumber)
-> HandleError muxMode versionNumber
HandleHandshakeServerError
:: HasResponder muxMode ~ True
=> !(HandshakeException versionNumber)
-> HandleError muxMode versionNumber
HandleError
:: !SomeException
-> HandleError muxMode versionNumber
instance Show versionNumber
=> Show (HandleError muxMode versionNumber) where
show :: HandleError muxMode versionNumber -> String
show (HandleHandshakeServerError HandshakeException versionNumber
err) = String
"HandleHandshakeServerError " String -> ShowS
forall a. [a] -> [a] -> [a]
++ HandshakeException versionNumber -> String
forall a. Show a => a -> String
show HandshakeException versionNumber
err
show (HandleHandshakeClientError HandshakeException versionNumber
err) = String
"HandleHandshakeClientError " String -> ShowS
forall a. [a] -> [a] -> [a]
++ HandshakeException versionNumber -> String
forall a. Show a => a -> String
show HandshakeException versionNumber
err
show (HandleError SomeException
err) = String
"HandleError " String -> ShowS
forall a. [a] -> [a] -> [a]
++ SomeException -> String
forall a. Show a => a -> String
show SomeException
err
classifyHandleError :: HandleError muxMode versionNumber
-> HandleErrorType
classifyHandleError :: forall (muxMode :: Mode) versionNumber.
HandleError muxMode versionNumber -> HandleErrorType
classifyHandleError (HandleHandshakeClientError (HandshakeProtocolLimit ProtocolLimitFailure
_)) =
HandleErrorType
HandshakeProtocolViolation
classifyHandleError (HandleHandshakeClientError (HandshakeProtocolError HandshakeProtocolError versionNumber
_)) =
HandleErrorType
HandshakeFailure
classifyHandleError (HandleHandshakeServerError (HandshakeProtocolLimit ProtocolLimitFailure
_)) =
HandleErrorType
HandshakeProtocolViolation
classifyHandleError (HandleHandshakeServerError (HandshakeProtocolError HandshakeProtocolError versionNumber
_)) =
HandleErrorType
HandshakeFailure
classifyHandleError (HandleError SomeException
_) =
HandleErrorType
HandshakeProtocolViolation
type MuxConnectionHandler muxMode socket initiatorCtx responderCtx peerAddr versionNumber versionData bytes m a b =
ConnectionHandler muxMode
(ConnectionHandlerTrace versionNumber versionData)
socket
peerAddr
(Handle muxMode initiatorCtx responderCtx versionData bytes m a b)
(HandleError muxMode versionNumber)
versionNumber
versionData
m
type MuxConnectionManager muxMode socket initiatorCtx responderCtx peerAddr versionData versionNumber bytes m a b =
ConnectionManager muxMode socket peerAddr
(Handle muxMode initiatorCtx responderCtx versionData bytes m a b)
(HandleError muxMode versionNumber)
m
type ConnectionManagerWithExpandedCtx muxMode socket peerAddr versionData versionNumber bytes m a b =
ConnectionManager muxMode socket peerAddr
(HandleWithExpandedCtx muxMode peerAddr versionData bytes m a b)
(HandleError muxMode versionNumber)
m
makeConnectionHandler
:: forall initiatorCtx responderCtx peerAddr muxMode socket versionNumber versionData m a b.
( Alternative (STM m)
, MonadAsync m
, MonadDelay m
, MonadFork m
, MonadLabelledSTM m
, MonadThrow (STM m)
, MonadTimer m
, MonadMask m
, Ord versionNumber
, Show peerAddr
, Typeable peerAddr
)
=> Mx.TracersWithBearer (ConnectionId peerAddr) m
-> ForkPolicy peerAddr
-> HandshakeArguments (ConnectionId peerAddr) versionNumber versionData m
-> Versions versionNumber versionData
(OuroborosBundle muxMode initiatorCtx responderCtx ByteString m a b)
-> (ThreadId m, RethrowPolicy)
-> MkMuxConnectionHandler muxMode socket initiatorCtx responderCtx peerAddr versionNumber versionData ByteString m a b
-> MuxConnectionHandler muxMode socket initiatorCtx responderCtx peerAddr
versionNumber versionData ByteString m a b
makeConnectionHandler :: forall initiatorCtx responderCtx peerAddr (muxMode :: Mode) socket
versionNumber versionData (m :: * -> *) a b.
(Alternative (STM m), MonadAsync m, MonadDelay m, MonadFork m,
MonadLabelledSTM m, MonadThrow (STM m), MonadTimer m, MonadMask m,
Ord versionNumber, Show peerAddr, Typeable peerAddr) =>
TracersWithBearer (ConnectionId peerAddr) m
-> ForkPolicy peerAddr
-> HandshakeArguments
(ConnectionId peerAddr) versionNumber versionData m
-> Versions
versionNumber
versionData
(OuroborosBundle
muxMode initiatorCtx responderCtx ByteString m a b)
-> (ThreadId m, RethrowPolicy)
-> MkMuxConnectionHandler
muxMode
socket
initiatorCtx
responderCtx
peerAddr
versionNumber
versionData
ByteString
m
a
b
-> MuxConnectionHandler
muxMode
socket
initiatorCtx
responderCtx
peerAddr
versionNumber
versionData
ByteString
m
a
b
makeConnectionHandler TracersWithBearer (ConnectionId peerAddr) m
muxTracers ForkPolicy peerAddr
forkPolicy
HandshakeArguments
(ConnectionId peerAddr) versionNumber versionData m
handshakeArguments
Versions
versionNumber
versionData
(OuroborosBundle
muxMode initiatorCtx responderCtx ByteString m a b)
versionedApplication
(ThreadId m
mainThreadId, RethrowPolicy
rethrowPolicy) =
\case
MkMuxConnectionHandler
muxMode
socket
initiatorCtx
responderCtx
peerAddr
versionNumber
versionData
ByteString
m
a
b
MuxInitiatorConnectionHandler ->
WithMuxTuple
muxMode
(ConnectionHandlerFn
(ConnectionHandlerTrace versionNumber versionData)
socket
peerAddr
(Handle
muxMode initiatorCtx responderCtx versionData ByteString m a b)
(HandleError muxMode versionNumber)
versionNumber
versionData
m)
-> MuxConnectionHandler
muxMode
socket
initiatorCtx
responderCtx
peerAddr
versionNumber
versionData
ByteString
m
a
b
forall (muxMode :: Mode) handlerTrace socket peerAddr handle
handleError versionNumber versionData (m :: * -> *).
WithMuxTuple
muxMode
(ConnectionHandlerFn
handlerTrace
socket
peerAddr
handle
handleError
versionNumber
versionData
m)
-> ConnectionHandler
muxMode
handlerTrace
socket
peerAddr
handle
handleError
versionNumber
versionData
m
ConnectionHandler (WithMuxTuple
muxMode
(ConnectionHandlerFn
(ConnectionHandlerTrace versionNumber versionData)
socket
peerAddr
(Handle
muxMode initiatorCtx responderCtx versionData ByteString m a b)
(HandleError muxMode versionNumber)
versionNumber
versionData
m)
-> MuxConnectionHandler
muxMode
socket
initiatorCtx
responderCtx
peerAddr
versionNumber
versionData
ByteString
m
a
b)
-> (ConnectionHandlerFn
(ConnectionHandlerTrace versionNumber versionData)
socket
peerAddr
(Handle
muxMode initiatorCtx responderCtx versionData ByteString m a b)
(HandleError muxMode versionNumber)
versionNumber
versionData
m
-> WithMuxTuple
muxMode
(ConnectionHandlerFn
(ConnectionHandlerTrace versionNumber versionData)
socket
peerAddr
(Handle
muxMode initiatorCtx responderCtx versionData ByteString m a b)
(HandleError muxMode versionNumber)
versionNumber
versionData
m))
-> ConnectionHandlerFn
(ConnectionHandlerTrace versionNumber versionData)
socket
peerAddr
(Handle
muxMode initiatorCtx responderCtx versionData ByteString m a b)
(HandleError muxMode versionNumber)
versionNumber
versionData
m
-> MuxConnectionHandler
muxMode
socket
initiatorCtx
responderCtx
peerAddr
versionNumber
versionData
ByteString
m
a
b
forall b c a. (b -> c) -> (a -> b) -> a -> c
. ConnectionHandlerFn
(ConnectionHandlerTrace versionNumber versionData)
socket
peerAddr
(Handle
muxMode initiatorCtx responderCtx versionData ByteString m a b)
(HandleError muxMode versionNumber)
versionNumber
versionData
m
-> WithMuxTuple
muxMode
(ConnectionHandlerFn
(ConnectionHandlerTrace versionNumber versionData)
socket
peerAddr
(Handle
muxMode initiatorCtx responderCtx versionData ByteString m a b)
(HandleError muxMode versionNumber)
versionNumber
versionData
m)
ConnectionHandlerFn
(ConnectionHandlerTrace versionNumber versionData)
socket
peerAddr
(Handle
muxMode initiatorCtx responderCtx versionData ByteString m a b)
(HandleError muxMode versionNumber)
versionNumber
versionData
m
-> WithMuxMode
'InitiatorMode
(ConnectionHandlerFn
(ConnectionHandlerTrace versionNumber versionData)
socket
peerAddr
(Handle
muxMode initiatorCtx responderCtx versionData ByteString m a b)
(HandleError muxMode versionNumber)
versionNumber
versionData
m)
(ConnectionHandlerFn
(ConnectionHandlerTrace versionNumber versionData)
socket
peerAddr
(Handle
muxMode initiatorCtx responderCtx versionData ByteString m a b)
(HandleError muxMode versionNumber)
versionNumber
versionData
m)
forall a b. a -> WithMuxMode 'InitiatorMode a b
WithInitiatorMode
(ConnectionHandlerFn
(ConnectionHandlerTrace versionNumber versionData)
socket
peerAddr
(Handle
muxMode initiatorCtx responderCtx versionData ByteString m a b)
(HandleError muxMode versionNumber)
versionNumber
versionData
m
-> MuxConnectionHandler
muxMode
socket
initiatorCtx
responderCtx
peerAddr
versionNumber
versionData
ByteString
m
a
b)
-> ConnectionHandlerFn
(ConnectionHandlerTrace versionNumber versionData)
socket
peerAddr
(Handle
muxMode initiatorCtx responderCtx versionData ByteString m a b)
(HandleError muxMode versionNumber)
versionNumber
versionData
m
-> MuxConnectionHandler
muxMode
socket
initiatorCtx
responderCtx
peerAddr
versionNumber
versionData
ByteString
m
a
b
forall a b. (a -> b) -> a -> b
$ (HasInitiator muxMode ~ 'True) =>
InResponderMode
muxMode
(StrictTVar m (StrictMaybe ResponderCounters)
-> Tracer m (WithBearer (ConnectionId peerAddr) Trace),
versionData -> DataFlow)
-> ConnectionHandlerFn
(ConnectionHandlerTrace versionNumber versionData)
socket
peerAddr
(Handle
muxMode initiatorCtx responderCtx versionData ByteString m a b)
(HandleError muxMode versionNumber)
versionNumber
versionData
m
InResponderMode
muxMode
(StrictTVar m (StrictMaybe ResponderCounters)
-> Tracer m (WithBearer (ConnectionId peerAddr) Trace),
versionData -> DataFlow)
-> ConnectionHandlerFn
(ConnectionHandlerTrace versionNumber versionData)
socket
peerAddr
(Handle
muxMode initiatorCtx responderCtx versionData ByteString m a b)
(HandleError muxMode versionNumber)
versionNumber
versionData
m
outboundConnectionHandler InResponderMode
muxMode
(StrictTVar m (StrictMaybe ResponderCounters)
-> Tracer m (WithBearer (ConnectionId peerAddr) Trace),
versionData -> DataFlow)
forall (mode :: Mode) a. InResponderMode mode a
NotInResponderMode
MuxResponderConnectionHandler StrictTVar m (StrictMaybe ResponderCounters)
-> Tracer m (WithBearer (ConnectionId peerAddr) Trace)
inboundGovernorMuxTracer ->
WithMuxTuple
muxMode
(ConnectionHandlerFn
(ConnectionHandlerTrace versionNumber versionData)
socket
peerAddr
(Handle
muxMode initiatorCtx responderCtx versionData ByteString m a b)
(HandleError muxMode versionNumber)
versionNumber
versionData
m)
-> MuxConnectionHandler
muxMode
socket
initiatorCtx
responderCtx
peerAddr
versionNumber
versionData
ByteString
m
a
b
forall (muxMode :: Mode) handlerTrace socket peerAddr handle
handleError versionNumber versionData (m :: * -> *).
WithMuxTuple
muxMode
(ConnectionHandlerFn
handlerTrace
socket
peerAddr
handle
handleError
versionNumber
versionData
m)
-> ConnectionHandler
muxMode
handlerTrace
socket
peerAddr
handle
handleError
versionNumber
versionData
m
ConnectionHandler (WithMuxTuple
muxMode
(ConnectionHandlerFn
(ConnectionHandlerTrace versionNumber versionData)
socket
peerAddr
(Handle
muxMode initiatorCtx responderCtx versionData ByteString m a b)
(HandleError muxMode versionNumber)
versionNumber
versionData
m)
-> MuxConnectionHandler
muxMode
socket
initiatorCtx
responderCtx
peerAddr
versionNumber
versionData
ByteString
m
a
b)
-> ((StrictTVar m (StrictMaybe ResponderCounters)
-> Tracer m (WithBearer (ConnectionId peerAddr) Trace))
-> WithMuxTuple
muxMode
(ConnectionHandlerFn
(ConnectionHandlerTrace versionNumber versionData)
socket
peerAddr
(Handle
muxMode initiatorCtx responderCtx versionData ByteString m a b)
(HandleError muxMode versionNumber)
versionNumber
versionData
m))
-> (StrictTVar m (StrictMaybe ResponderCounters)
-> Tracer m (WithBearer (ConnectionId peerAddr) Trace))
-> MuxConnectionHandler
muxMode
socket
initiatorCtx
responderCtx
peerAddr
versionNumber
versionData
ByteString
m
a
b
forall b c a. (b -> c) -> (a -> b) -> a -> c
. ConnectionHandlerFn
(ConnectionHandlerTrace versionNumber versionData)
socket
peerAddr
(Handle
muxMode initiatorCtx responderCtx versionData ByteString m a b)
(HandleError muxMode versionNumber)
versionNumber
versionData
m
-> WithMuxTuple
muxMode
(ConnectionHandlerFn
(ConnectionHandlerTrace versionNumber versionData)
socket
peerAddr
(Handle
muxMode initiatorCtx responderCtx versionData ByteString m a b)
(HandleError muxMode versionNumber)
versionNumber
versionData
m)
ConnectionHandlerFn
(ConnectionHandlerTrace versionNumber versionData)
socket
peerAddr
(Handle
muxMode initiatorCtx responderCtx versionData ByteString m a b)
(HandleError muxMode versionNumber)
versionNumber
versionData
m
-> WithMuxMode
'ResponderMode
(ConnectionHandlerFn
(ConnectionHandlerTrace versionNumber versionData)
socket
peerAddr
(Handle
muxMode initiatorCtx responderCtx versionData ByteString m a b)
(HandleError muxMode versionNumber)
versionNumber
versionData
m)
(ConnectionHandlerFn
(ConnectionHandlerTrace versionNumber versionData)
socket
peerAddr
(Handle
muxMode initiatorCtx responderCtx versionData ByteString m a b)
(HandleError muxMode versionNumber)
versionNumber
versionData
m)
forall b a. b -> WithMuxMode 'ResponderMode a b
WithResponderMode (ConnectionHandlerFn
(ConnectionHandlerTrace versionNumber versionData)
socket
peerAddr
(Handle
muxMode initiatorCtx responderCtx versionData ByteString m a b)
(HandleError muxMode versionNumber)
versionNumber
versionData
m
-> WithMuxTuple
muxMode
(ConnectionHandlerFn
(ConnectionHandlerTrace versionNumber versionData)
socket
peerAddr
(Handle
muxMode initiatorCtx responderCtx versionData ByteString m a b)
(HandleError muxMode versionNumber)
versionNumber
versionData
m))
-> ((StrictTVar m (StrictMaybe ResponderCounters)
-> Tracer m (WithBearer (ConnectionId peerAddr) Trace))
-> ConnectionHandlerFn
(ConnectionHandlerTrace versionNumber versionData)
socket
peerAddr
(Handle
muxMode initiatorCtx responderCtx versionData ByteString m a b)
(HandleError muxMode versionNumber)
versionNumber
versionData
m)
-> (StrictTVar m (StrictMaybe ResponderCounters)
-> Tracer m (WithBearer (ConnectionId peerAddr) Trace))
-> WithMuxTuple
muxMode
(ConnectionHandlerFn
(ConnectionHandlerTrace versionNumber versionData)
socket
peerAddr
(Handle
muxMode initiatorCtx responderCtx versionData ByteString m a b)
(HandleError muxMode versionNumber)
versionNumber
versionData
m)
forall b c a. (b -> c) -> (a -> b) -> a -> c
. (HasResponder muxMode ~ 'True) =>
(StrictTVar m (StrictMaybe ResponderCounters)
-> Tracer m (WithBearer (ConnectionId peerAddr) Trace))
-> ConnectionHandlerFn
(ConnectionHandlerTrace versionNumber versionData)
socket
peerAddr
(Handle
muxMode initiatorCtx responderCtx versionData ByteString m a b)
(HandleError muxMode versionNumber)
versionNumber
versionData
m
(StrictTVar m (StrictMaybe ResponderCounters)
-> Tracer m (WithBearer (ConnectionId peerAddr) Trace))
-> ConnectionHandlerFn
(ConnectionHandlerTrace versionNumber versionData)
socket
peerAddr
(Handle
muxMode initiatorCtx responderCtx versionData ByteString m a b)
(HandleError muxMode versionNumber)
versionNumber
versionData
m
inboundConnectionHandler ((StrictTVar m (StrictMaybe ResponderCounters)
-> Tracer m (WithBearer (ConnectionId peerAddr) Trace))
-> MuxConnectionHandler
muxMode
socket
initiatorCtx
responderCtx
peerAddr
versionNumber
versionData
ByteString
m
a
b)
-> (StrictTVar m (StrictMaybe ResponderCounters)
-> Tracer m (WithBearer (ConnectionId peerAddr) Trace))
-> MuxConnectionHandler
muxMode
socket
initiatorCtx
responderCtx
peerAddr
versionNumber
versionData
ByteString
m
a
b
forall a b. (a -> b) -> a -> b
$ StrictTVar m (StrictMaybe ResponderCounters)
-> Tracer m (WithBearer (ConnectionId peerAddr) Trace)
inboundGovernorMuxTracer
MuxInitiatorResponderConnectionHandler versionData -> DataFlow
connectionDataFlow StrictTVar m (StrictMaybe ResponderCounters)
-> Tracer m (WithBearer (ConnectionId peerAddr) Trace)
inboundGovernorMuxTracer ->
WithMuxTuple
muxMode
(ConnectionHandlerFn
(ConnectionHandlerTrace versionNumber versionData)
socket
peerAddr
(Handle
muxMode initiatorCtx responderCtx versionData ByteString m a b)
(HandleError muxMode versionNumber)
versionNumber
versionData
m)
-> MuxConnectionHandler
muxMode
socket
initiatorCtx
responderCtx
peerAddr
versionNumber
versionData
ByteString
m
a
b
forall (muxMode :: Mode) handlerTrace socket peerAddr handle
handleError versionNumber versionData (m :: * -> *).
WithMuxTuple
muxMode
(ConnectionHandlerFn
handlerTrace
socket
peerAddr
handle
handleError
versionNumber
versionData
m)
-> ConnectionHandler
muxMode
handlerTrace
socket
peerAddr
handle
handleError
versionNumber
versionData
m
ConnectionHandler (WithMuxTuple
muxMode
(ConnectionHandlerFn
(ConnectionHandlerTrace versionNumber versionData)
socket
peerAddr
(Handle
muxMode initiatorCtx responderCtx versionData ByteString m a b)
(HandleError muxMode versionNumber)
versionNumber
versionData
m)
-> MuxConnectionHandler
muxMode
socket
initiatorCtx
responderCtx
peerAddr
versionNumber
versionData
ByteString
m
a
b)
-> WithMuxTuple
muxMode
(ConnectionHandlerFn
(ConnectionHandlerTrace versionNumber versionData)
socket
peerAddr
(Handle
muxMode initiatorCtx responderCtx versionData ByteString m a b)
(HandleError muxMode versionNumber)
versionNumber
versionData
m)
-> MuxConnectionHandler
muxMode
socket
initiatorCtx
responderCtx
peerAddr
versionNumber
versionData
ByteString
m
a
b
forall a b. (a -> b) -> a -> b
$ ConnectionHandlerFn
(ConnectionHandlerTrace versionNumber versionData)
socket
peerAddr
(Handle
muxMode initiatorCtx responderCtx versionData ByteString m a b)
(HandleError muxMode versionNumber)
versionNumber
versionData
m
-> ConnectionHandlerFn
(ConnectionHandlerTrace versionNumber versionData)
socket
peerAddr
(Handle
muxMode initiatorCtx responderCtx versionData ByteString m a b)
(HandleError muxMode versionNumber)
versionNumber
versionData
m
-> WithMuxMode
'InitiatorResponderMode
(ConnectionHandlerFn
(ConnectionHandlerTrace versionNumber versionData)
socket
peerAddr
(Handle
muxMode initiatorCtx responderCtx versionData ByteString m a b)
(HandleError muxMode versionNumber)
versionNumber
versionData
m)
(ConnectionHandlerFn
(ConnectionHandlerTrace versionNumber versionData)
socket
peerAddr
(Handle
muxMode initiatorCtx responderCtx versionData ByteString m a b)
(HandleError muxMode versionNumber)
versionNumber
versionData
m)
forall a b. a -> b -> WithMuxMode 'InitiatorResponderMode a b
WithInitiatorResponderMode
((HasInitiator muxMode ~ 'True) =>
InResponderMode
muxMode
(StrictTVar m (StrictMaybe ResponderCounters)
-> Tracer m (WithBearer (ConnectionId peerAddr) Trace),
versionData -> DataFlow)
-> ConnectionHandlerFn
(ConnectionHandlerTrace versionNumber versionData)
socket
peerAddr
(Handle
muxMode initiatorCtx responderCtx versionData ByteString m a b)
(HandleError muxMode versionNumber)
versionNumber
versionData
m
InResponderMode
muxMode
(StrictTVar m (StrictMaybe ResponderCounters)
-> Tracer m (WithBearer (ConnectionId peerAddr) Trace),
versionData -> DataFlow)
-> ConnectionHandlerFn
(ConnectionHandlerTrace versionNumber versionData)
socket
peerAddr
(Handle
muxMode initiatorCtx responderCtx versionData ByteString m a b)
(HandleError muxMode versionNumber)
versionNumber
versionData
m
outboundConnectionHandler (InResponderMode
muxMode
(StrictTVar m (StrictMaybe ResponderCounters)
-> Tracer m (WithBearer (ConnectionId peerAddr) Trace),
versionData -> DataFlow)
-> ConnectionHandlerFn
(ConnectionHandlerTrace versionNumber versionData)
socket
peerAddr
(Handle
muxMode initiatorCtx responderCtx versionData ByteString m a b)
(HandleError muxMode versionNumber)
versionNumber
versionData
m)
-> InResponderMode
muxMode
(StrictTVar m (StrictMaybe ResponderCounters)
-> Tracer m (WithBearer (ConnectionId peerAddr) Trace),
versionData -> DataFlow)
-> ConnectionHandlerFn
(ConnectionHandlerTrace versionNumber versionData)
socket
peerAddr
(Handle
muxMode initiatorCtx responderCtx versionData ByteString m a b)
(HandleError muxMode versionNumber)
versionNumber
versionData
m
forall a b. (a -> b) -> a -> b
$ (StrictTVar m (StrictMaybe ResponderCounters)
-> Tracer m (WithBearer (ConnectionId peerAddr) Trace),
versionData -> DataFlow)
-> InResponderMode
muxMode
(StrictTVar m (StrictMaybe ResponderCounters)
-> Tracer m (WithBearer (ConnectionId peerAddr) Trace),
versionData -> DataFlow)
forall (mode :: Mode) a.
(HasResponder mode ~ 'True) =>
a -> InResponderMode mode a
InResponderMode (StrictTVar m (StrictMaybe ResponderCounters)
-> Tracer m (WithBearer (ConnectionId peerAddr) Trace)
inboundGovernorMuxTracer, versionData -> DataFlow
connectionDataFlow))
((HasResponder muxMode ~ 'True) =>
(StrictTVar m (StrictMaybe ResponderCounters)
-> Tracer m (WithBearer (ConnectionId peerAddr) Trace))
-> ConnectionHandlerFn
(ConnectionHandlerTrace versionNumber versionData)
socket
peerAddr
(Handle
muxMode initiatorCtx responderCtx versionData ByteString m a b)
(HandleError muxMode versionNumber)
versionNumber
versionData
m
(StrictTVar m (StrictMaybe ResponderCounters)
-> Tracer m (WithBearer (ConnectionId peerAddr) Trace))
-> ConnectionHandlerFn
(ConnectionHandlerTrace versionNumber versionData)
socket
peerAddr
(Handle
muxMode initiatorCtx responderCtx versionData ByteString m a b)
(HandleError muxMode versionNumber)
versionNumber
versionData
m
inboundConnectionHandler StrictTVar m (StrictMaybe ResponderCounters)
-> Tracer m (WithBearer (ConnectionId peerAddr) Trace)
inboundGovernorMuxTracer)
where
classifyExceptions :: forall x.
Tracer m (ConnectionHandlerTrace versionNumber versionData)
-> peerAddr
-> ErrorContext
-> m x -> m x
classifyExceptions :: forall x.
Tracer m (ConnectionHandlerTrace versionNumber versionData)
-> peerAddr -> ErrorContext -> m x -> m x
classifyExceptions Tracer m (ConnectionHandlerTrace versionNumber versionData)
tracer peerAddr
remoteAddress ErrorContext
ctx m x
io =
(SomeException -> Maybe SomeException)
-> m x -> (SomeException -> m x) -> m x
forall e b a.
Exception e =>
(e -> Maybe b) -> m a -> (b -> m a) -> m a
forall (m :: * -> *) e b a.
(MonadCatch m, Exception e) =>
(e -> Maybe b) -> m a -> (b -> m a) -> m a
catchJust
(\SomeException
e -> case SomeException -> Maybe SomeAsyncException
forall e. Exception e => SomeException -> Maybe e
fromException SomeException
e :: Maybe SomeAsyncException of
Just SomeAsyncException
_ -> Maybe SomeException
forall a. Maybe a
Nothing
Maybe SomeAsyncException
Nothing -> SomeException -> Maybe SomeException
forall a. a -> Maybe a
Just SomeException
e)
m x
io
((SomeException -> m x) -> m x) -> (SomeException -> m x) -> m x
forall a b. (a -> b) -> a -> b
$ \SomeException
err -> do
let cmd :: ErrorCommand
cmd = RethrowPolicy -> ErrorContext -> SomeException -> ErrorCommand
runRethrowPolicy RethrowPolicy
rethrowPolicy ErrorContext
ctx SomeException
err
Tracer m (ConnectionHandlerTrace versionNumber versionData)
-> ConnectionHandlerTrace versionNumber versionData -> m ()
forall (m :: * -> *) a. Tracer m a -> a -> m ()
traceWith Tracer m (ConnectionHandlerTrace versionNumber versionData)
tracer (ErrorContext
-> SomeException
-> ErrorCommand
-> ConnectionHandlerTrace versionNumber versionData
forall versionNumber versionData.
ErrorContext
-> SomeException
-> ErrorCommand
-> ConnectionHandlerTrace versionNumber versionData
TrConnectionHandlerError ErrorContext
ctx SomeException
err ErrorCommand
cmd)
case ErrorCommand
cmd of
ErrorCommand
ShutdownNode -> do
ThreadId m -> ExceptionInHandler -> m ()
forall e. Exception e => ThreadId m -> e -> m ()
forall (m :: * -> *) e.
(MonadFork m, Exception e) =>
ThreadId m -> e -> m ()
throwTo ThreadId m
mainThreadId (peerAddr -> SomeException -> ExceptionInHandler
forall peerAddr.
(Typeable peerAddr, Show peerAddr) =>
peerAddr -> SomeException -> ExceptionInHandler
ExceptionInHandler peerAddr
remoteAddress SomeException
err)
SomeException -> m x
forall e a. Exception e => e -> m a
forall (m :: * -> *) e a. (MonadThrow m, Exception e) => e -> m a
throwIO SomeException
err
ErrorCommand
ShutdownPeer ->
ExceptionInHandler -> m x
forall e a. Exception e => e -> m a
forall (m :: * -> *) e a. (MonadThrow m, Exception e) => e -> m a
throwIO (peerAddr -> SomeException -> ExceptionInHandler
forall peerAddr.
(Typeable peerAddr, Show peerAddr) =>
peerAddr -> SomeException -> ExceptionInHandler
ExceptionInHandler peerAddr
remoteAddress SomeException
err)
outboundConnectionHandler
:: HasInitiator muxMode ~ True
=> InResponderMode muxMode ( StrictTVar m (StrictMaybe ResponderCounters)
-> Tracer m (WithBearer (ConnectionId peerAddr) Trace)
, versionData -> DataFlow)
-> ConnectionHandlerFn (ConnectionHandlerTrace versionNumber versionData)
socket
peerAddr
(Handle muxMode initiatorCtx responderCtx versionData ByteString m a b)
(HandleError muxMode versionNumber)
versionNumber
versionData
m
outboundConnectionHandler :: (HasInitiator muxMode ~ 'True) =>
InResponderMode
muxMode
(StrictTVar m (StrictMaybe ResponderCounters)
-> Tracer m (WithBearer (ConnectionId peerAddr) Trace),
versionData -> DataFlow)
-> ConnectionHandlerFn
(ConnectionHandlerTrace versionNumber versionData)
socket
peerAddr
(Handle
muxMode initiatorCtx responderCtx versionData ByteString m a b)
(HandleError muxMode versionNumber)
versionNumber
versionData
m
outboundConnectionHandler InResponderMode
muxMode
(StrictTVar m (StrictMaybe ResponderCounters)
-> Tracer m (WithBearer (ConnectionId peerAddr) Trace),
versionData -> DataFlow)
inResponderMode
versionData -> versionData
versionDataFn
socket
socket
PromiseWriter { Either
(HandleError muxMode versionNumber)
(HandshakeConnectionResult
(Handle
muxMode initiatorCtx responderCtx versionData ByteString m a b)
(versionNumber, versionData))
-> STM m ()
writePromise :: Either
(HandleError muxMode versionNumber)
(HandshakeConnectionResult
(Handle
muxMode initiatorCtx responderCtx versionData ByteString m a b)
(versionNumber, versionData))
-> STM m ()
writePromise :: forall (m :: * -> *) a. PromiseWriter m a -> a -> STM m ()
writePromise }
Tracer m (ConnectionHandlerTrace versionNumber versionData)
tracer
connectionId :: ConnectionId peerAddr
connectionId@ConnectionId { peerAddr
localAddress :: peerAddr
localAddress :: forall addr. ConnectionId addr -> addr
localAddress
, peerAddr
remoteAddress :: peerAddr
remoteAddress :: forall addr. ConnectionId addr -> addr
remoteAddress }
DiffTime -> socket -> Maybe (ReadBuffer m) -> m (Bearer m)
mkMuxBearer
(Maybe (ReadBuffer m) -> m ()) -> m ()
withBuffer
= MaskedAction { (forall x. m x -> m x) -> m ()
runWithUnmask :: (forall x. m x -> m x) -> m ()
runWithUnmask :: (forall x. m x -> m x) -> m ()
runWithUnmask }
where
runWithUnmask :: (forall x. m x -> m x) -> m ()
runWithUnmask :: (forall x. m x -> m x) -> m ()
runWithUnmask forall x. m x -> m x
unmask =
Tracer m (ConnectionHandlerTrace versionNumber versionData)
-> peerAddr -> ErrorContext -> m () -> m ()
forall x.
Tracer m (ConnectionHandlerTrace versionNumber versionData)
-> peerAddr -> ErrorContext -> m x -> m x
classifyExceptions Tracer m (ConnectionHandlerTrace versionNumber versionData)
tracer peerAddr
remoteAddress ErrorContext
OutboundError (m () -> m ()) -> m () -> m ()
forall a b. (a -> b) -> a -> b
$ do
String -> m ()
forall (m :: * -> *). MonadThread m => String -> m ()
labelThisThread ([String] -> String
forall (t :: * -> *) a. Foldable t => t [a] -> [a]
concat [String
"out-conn-hndlr-"
, peerAddr -> String
forall a. Show a => a -> String
show peerAddr
localAddress
, String
"-"
, peerAddr -> String
forall a. Show a => a -> String
show peerAddr
remoteAddress
])
handshakeBearer <- DiffTime -> socket -> Maybe (ReadBuffer m) -> m (Bearer m)
mkMuxBearer DiffTime
sduHandshakeTimeout socket
socket Maybe (ReadBuffer m)
forall a. Maybe a
Nothing
hsResult <-
unmask (runHandshakeClient handshakeBearer
connectionId
handshakeArguments
(Handshake.updateVersionData versionDataFn versionedApplication))
`catch` \(SomeException
err :: SomeException) -> do
STM m () -> m ()
forall a. HasCallStack => STM m a -> m a
forall (m :: * -> *) a.
(MonadSTM m, HasCallStack) =>
STM m a -> m a
atomically (STM m () -> m ()) -> STM m () -> m ()
forall a b. (a -> b) -> a -> b
$ Either
(HandleError muxMode versionNumber)
(HandshakeConnectionResult
(Handle
muxMode initiatorCtx responderCtx versionData ByteString m a b)
(versionNumber, versionData))
-> STM m ()
writePromise (HandleError muxMode versionNumber
-> Either
(HandleError muxMode versionNumber)
(HandshakeConnectionResult
(Handle
muxMode initiatorCtx responderCtx versionData ByteString m a b)
(versionNumber, versionData))
forall a b. a -> Either a b
Left (SomeException -> HandleError muxMode versionNumber
forall (muxMode :: Mode) versionNumber.
SomeException -> HandleError muxMode versionNumber
HandleError SomeException
err))
SomeException
-> m (Either
(HandshakeException versionNumber)
(HandshakeResult
(OuroborosBundle
muxMode initiatorCtx responderCtx ByteString m a b)
versionNumber
versionData))
forall e a. Exception e => e -> m a
forall (m :: * -> *) e a. (MonadThrow m, Exception e) => e -> m a
throwIO SomeException
err
case hsResult of
Left !HandshakeException versionNumber
err -> do
STM m () -> m ()
forall a. HasCallStack => STM m a -> m a
forall (m :: * -> *) a.
(MonadSTM m, HasCallStack) =>
STM m a -> m a
atomically (STM m () -> m ()) -> STM m () -> m ()
forall a b. (a -> b) -> a -> b
$ Either
(HandleError muxMode versionNumber)
(HandshakeConnectionResult
(Handle
muxMode initiatorCtx responderCtx versionData ByteString m a b)
(versionNumber, versionData))
-> STM m ()
writePromise (HandleError muxMode versionNumber
-> Either
(HandleError muxMode versionNumber)
(HandshakeConnectionResult
(Handle
muxMode initiatorCtx responderCtx versionData ByteString m a b)
(versionNumber, versionData))
forall a b. a -> Either a b
Left (HandshakeException versionNumber
-> HandleError muxMode versionNumber
forall (muxMode :: Mode) versionNumber.
(HasInitiator muxMode ~ 'True) =>
HandshakeException versionNumber
-> HandleError muxMode versionNumber
HandleHandshakeClientError HandshakeException versionNumber
err))
Tracer m (ConnectionHandlerTrace versionNumber versionData)
-> ConnectionHandlerTrace versionNumber versionData -> m ()
forall (m :: * -> *) a. Tracer m a -> a -> m ()
traceWith Tracer m (ConnectionHandlerTrace versionNumber versionData)
tracer (HandshakeException versionNumber
-> ConnectionHandlerTrace versionNumber versionData
forall versionNumber versionData.
HandshakeException versionNumber
-> ConnectionHandlerTrace versionNumber versionData
TrHandshakeClientError HandshakeException versionNumber
err)
Right (HandshakeNegotiationResult OuroborosBundle muxMode initiatorCtx responderCtx ByteString m a b
app versionNumber
versionNumber versionData
agreedOptions) -> do
Tracer m (ConnectionHandlerTrace versionNumber versionData)
-> ConnectionHandlerTrace versionNumber versionData -> m ()
forall (m :: * -> *) a. Tracer m a -> a -> m ()
traceWith Tracer m (ConnectionHandlerTrace versionNumber versionData)
tracer (versionNumber
-> versionData -> ConnectionHandlerTrace versionNumber versionData
forall versionNumber versionData.
versionNumber
-> versionData -> ConnectionHandlerTrace versionNumber versionData
TrHandshakeSuccess versionNumber
versionNumber versionData
agreedOptions)
controlMessageBundle
<- (\StrictTVar m ControlMessage
a StrictTVar m ControlMessage
b StrictTVar m ControlMessage
c -> WithProtocolTemperature 'Hot (StrictTVar m ControlMessage)
-> WithProtocolTemperature 'Warm (StrictTVar m ControlMessage)
-> WithProtocolTemperature
'Established (StrictTVar m ControlMessage)
-> TemperatureBundle (StrictTVar m ControlMessage)
forall a.
WithProtocolTemperature 'Hot a
-> WithProtocolTemperature 'Warm a
-> WithProtocolTemperature 'Established a
-> TemperatureBundle a
TemperatureBundle (StrictTVar m ControlMessage
-> WithProtocolTemperature 'Hot (StrictTVar m ControlMessage)
forall a. a -> WithProtocolTemperature 'Hot a
WithHot StrictTVar m ControlMessage
a) (StrictTVar m ControlMessage
-> WithProtocolTemperature 'Warm (StrictTVar m ControlMessage)
forall a. a -> WithProtocolTemperature 'Warm a
WithWarm StrictTVar m ControlMessage
b) (StrictTVar m ControlMessage
-> WithProtocolTemperature
'Established (StrictTVar m ControlMessage)
forall a. a -> WithProtocolTemperature 'Established a
WithEstablished StrictTVar m ControlMessage
c))
(StrictTVar m ControlMessage
-> StrictTVar m ControlMessage
-> StrictTVar m ControlMessage
-> TemperatureBundle (StrictTVar m ControlMessage))
-> m (StrictTVar m ControlMessage)
-> m (StrictTVar m ControlMessage
-> StrictTVar m ControlMessage
-> TemperatureBundle (StrictTVar m ControlMessage))
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> ControlMessage -> m (StrictTVar m ControlMessage)
forall (m :: * -> *) a. MonadSTM m => a -> m (StrictTVar m a)
newTVarIO ControlMessage
Continue
m (StrictTVar m ControlMessage
-> StrictTVar m ControlMessage
-> TemperatureBundle (StrictTVar m ControlMessage))
-> m (StrictTVar m ControlMessage)
-> m (StrictTVar m ControlMessage
-> TemperatureBundle (StrictTVar m ControlMessage))
forall a b. m (a -> b) -> m a -> m b
forall (f :: * -> *) a b. Applicative f => f (a -> b) -> f a -> f b
<*> ControlMessage -> m (StrictTVar m ControlMessage)
forall (m :: * -> *) a. MonadSTM m => a -> m (StrictTVar m a)
newTVarIO ControlMessage
Continue
m (StrictTVar m ControlMessage
-> TemperatureBundle (StrictTVar m ControlMessage))
-> m (StrictTVar m ControlMessage)
-> m (TemperatureBundle (StrictTVar m ControlMessage))
forall a b. m (a -> b) -> m a -> m b
forall (f :: * -> *) a b. Applicative f => f (a -> b) -> f a -> f b
<*> ControlMessage -> m (StrictTVar m ControlMessage)
forall (m :: * -> *) a. MonadSTM m => a -> m (StrictTVar m a)
newTVarIO ControlMessage
Continue
mux <- Mx.new (mkMiniProtocolInfos (runForkPolicy forkPolicy remoteAddress) app)
let !handle = Handle {
hMux :: Mux muxMode m
hMux = Mux muxMode m
mux,
hMuxBundle :: OuroborosBundle muxMode initiatorCtx responderCtx ByteString m a b
hMuxBundle = OuroborosBundle muxMode initiatorCtx responderCtx ByteString m a b
app,
hControlMessage :: TemperatureBundle (StrictTVar m ControlMessage)
hControlMessage = TemperatureBundle (StrictTVar m ControlMessage)
controlMessageBundle,
hVersionData :: versionData
hVersionData = versionData
agreedOptions
}
atomically $ writePromise (Right $ HandshakeConnectionResult handle (versionNumber, agreedOptions))
withBuffer \Maybe (ReadBuffer m)
buffer -> do
bearer <- DiffTime -> socket -> Maybe (ReadBuffer m) -> m (Bearer m)
mkMuxBearer DiffTime
sduTimeout socket
socket Maybe (ReadBuffer m)
buffer
muxTracers' <- case inResponderMode of
InResponderMode (StrictTVar m (StrictMaybe ResponderCounters)
-> Tracer m (WithBearer (ConnectionId peerAddr) Trace)
inboundGovernorMuxTracer, versionData -> DataFlow
connectionDataFlow)
| DataFlow
Duplex <- versionData -> DataFlow
connectionDataFlow versionData
agreedOptions -> do
countersVar <- StrictMaybe ResponderCounters
-> m (StrictTVar m (StrictMaybe ResponderCounters))
forall (m :: * -> *) a. MonadSTM m => a -> m (StrictTVar m a)
newTVarIO (StrictMaybe ResponderCounters
-> m (StrictTVar m (StrictMaybe ResponderCounters)))
-> (ResponderCounters -> StrictMaybe ResponderCounters)
-> ResponderCounters
-> m (StrictTVar m (StrictMaybe ResponderCounters))
forall b c a. (b -> c) -> (a -> b) -> a -> c
. ResponderCounters -> StrictMaybe ResponderCounters
forall a. a -> StrictMaybe a
SJust (ResponderCounters
-> m (StrictTVar m (StrictMaybe ResponderCounters)))
-> ResponderCounters
-> m (StrictTVar m (StrictMaybe ResponderCounters))
forall a b. (a -> b) -> a -> b
$ Int -> Int -> ResponderCounters
ResponderCounters Int
0 Int
0
pure $ Mx.tracersWithBearer connectionId muxTracers {
Mx.tracer = Mx.tracer muxTracers <> inboundGovernorMuxTracer countersVar
}
InResponderMode
muxMode
(StrictTVar m (StrictMaybe ResponderCounters)
-> Tracer m (WithBearer (ConnectionId peerAddr) Trace),
versionData -> DataFlow)
_notResponder ->
Tracers m -> m (Tracers m)
forall a. a -> m a
forall (f :: * -> *) a. Applicative f => a -> f a
pure (Tracers m -> m (Tracers m)) -> Tracers m -> m (Tracers m)
forall a b. (a -> b) -> a -> b
$ ConnectionId peerAddr
-> TracersWithBearer (ConnectionId peerAddr) m -> Tracers m
forall peerId (m :: * -> *).
peerId -> TracersWithBearer peerId m -> Tracers m
Mx.tracersWithBearer ConnectionId peerAddr
connectionId TracersWithBearer (ConnectionId peerAddr) m
muxTracers
unmask $ Mx.run muxTracers' mux bearer
Right (HandshakeQueryResult Map versionNumber (Either Text versionData)
vMap) -> do
STM m () -> m ()
forall a. HasCallStack => STM m a -> m a
forall (m :: * -> *) a.
(MonadSTM m, HasCallStack) =>
STM m a -> m a
atomically (STM m () -> m ()) -> STM m () -> m ()
forall a b. (a -> b) -> a -> b
$ Either
(HandleError muxMode versionNumber)
(HandshakeConnectionResult
(Handle
muxMode initiatorCtx responderCtx versionData ByteString m a b)
(versionNumber, versionData))
-> STM m ()
writePromise (HandshakeConnectionResult
(Handle
muxMode initiatorCtx responderCtx versionData ByteString m a b)
(versionNumber, versionData)
-> Either
(HandleError muxMode versionNumber)
(HandshakeConnectionResult
(Handle
muxMode initiatorCtx responderCtx versionData ByteString m a b)
(versionNumber, versionData))
forall a b. b -> Either a b
Right HandshakeConnectionResult
(Handle
muxMode initiatorCtx responderCtx versionData ByteString m a b)
(versionNumber, versionData)
forall handle version. HandshakeConnectionResult handle version
HandshakeConnectionQuery)
Tracer m (ConnectionHandlerTrace versionNumber versionData)
-> ConnectionHandlerTrace versionNumber versionData -> m ()
forall (m :: * -> *) a. Tracer m a -> a -> m ()
traceWith Tracer m (ConnectionHandlerTrace versionNumber versionData)
tracer (ConnectionHandlerTrace versionNumber versionData -> m ())
-> ConnectionHandlerTrace versionNumber versionData -> m ()
forall a b. (a -> b) -> a -> b
$ Map versionNumber (Either Text versionData)
-> ConnectionHandlerTrace versionNumber versionData
forall versionNumber versionData.
Map versionNumber (Either Text versionData)
-> ConnectionHandlerTrace versionNumber versionData
TrHandshakeQuery Map versionNumber (Either Text versionData)
vMap
inboundConnectionHandler
:: HasResponder muxMode ~ True
=> ( StrictTVar m (StrictMaybe ResponderCounters)
-> Tracer m (WithBearer (ConnectionId peerAddr) Trace))
-> ConnectionHandlerFn (ConnectionHandlerTrace versionNumber versionData)
socket
peerAddr
(Handle muxMode initiatorCtx responderCtx versionData ByteString m a b)
(HandleError muxMode versionNumber)
versionNumber
versionData
m
inboundConnectionHandler :: (HasResponder muxMode ~ 'True) =>
(StrictTVar m (StrictMaybe ResponderCounters)
-> Tracer m (WithBearer (ConnectionId peerAddr) Trace))
-> ConnectionHandlerFn
(ConnectionHandlerTrace versionNumber versionData)
socket
peerAddr
(Handle
muxMode initiatorCtx responderCtx versionData ByteString m a b)
(HandleError muxMode versionNumber)
versionNumber
versionData
m
inboundConnectionHandler StrictTVar m (StrictMaybe ResponderCounters)
-> Tracer m (WithBearer (ConnectionId peerAddr) Trace)
inboundGovernorMuxTracer
versionData -> versionData
updateVersionDataFn
socket
socket
PromiseWriter { Either
(HandleError muxMode versionNumber)
(HandshakeConnectionResult
(Handle
muxMode initiatorCtx responderCtx versionData ByteString m a b)
(versionNumber, versionData))
-> STM m ()
writePromise :: forall (m :: * -> *) a. PromiseWriter m a -> a -> STM m ()
writePromise :: Either
(HandleError muxMode versionNumber)
(HandshakeConnectionResult
(Handle
muxMode initiatorCtx responderCtx versionData ByteString m a b)
(versionNumber, versionData))
-> STM m ()
writePromise }
Tracer m (ConnectionHandlerTrace versionNumber versionData)
tracer
connectionId :: ConnectionId peerAddr
connectionId@ConnectionId { peerAddr
localAddress :: forall addr. ConnectionId addr -> addr
localAddress :: peerAddr
localAddress
, peerAddr
remoteAddress :: forall addr. ConnectionId addr -> addr
remoteAddress :: peerAddr
remoteAddress }
DiffTime -> socket -> Maybe (ReadBuffer m) -> m (Bearer m)
mkMuxBearer
(Maybe (ReadBuffer m) -> m ()) -> m ()
withBuffer
= MaskedAction { (forall x. m x -> m x) -> m ()
runWithUnmask :: (forall x. m x -> m x) -> m ()
runWithUnmask :: (forall x. m x -> m x) -> m ()
runWithUnmask }
where
runWithUnmask :: (forall x. m x -> m x) -> m ()
runWithUnmask :: (forall x. m x -> m x) -> m ()
runWithUnmask forall x. m x -> m x
unmask =
Tracer m (ConnectionHandlerTrace versionNumber versionData)
-> peerAddr -> ErrorContext -> m () -> m ()
forall x.
Tracer m (ConnectionHandlerTrace versionNumber versionData)
-> peerAddr -> ErrorContext -> m x -> m x
classifyExceptions Tracer m (ConnectionHandlerTrace versionNumber versionData)
tracer peerAddr
remoteAddress ErrorContext
InboundError (m () -> m ()) -> m () -> m ()
forall a b. (a -> b) -> a -> b
$ do
String -> m ()
forall (m :: * -> *). MonadThread m => String -> m ()
labelThisThread ([String] -> String
forall (t :: * -> *) a. Foldable t => t [a] -> [a]
concat [String
"in-conn-hndlr-"
, peerAddr -> String
forall a. Show a => a -> String
show peerAddr
localAddress
, String
"-"
, peerAddr -> String
forall a. Show a => a -> String
show peerAddr
remoteAddress
])
handshakeBearer <- DiffTime -> socket -> Maybe (ReadBuffer m) -> m (Bearer m)
mkMuxBearer DiffTime
sduHandshakeTimeout socket
socket Maybe (ReadBuffer m)
forall a. Maybe a
Nothing
hsResult <-
unmask (runHandshakeServer handshakeBearer
connectionId
handshakeArguments
(Handshake.updateVersionData updateVersionDataFn versionedApplication))
`catch` \(SomeException
err :: SomeException) -> do
STM m () -> m ()
forall a. HasCallStack => STM m a -> m a
forall (m :: * -> *) a.
(MonadSTM m, HasCallStack) =>
STM m a -> m a
atomically (STM m () -> m ()) -> STM m () -> m ()
forall a b. (a -> b) -> a -> b
$ Either
(HandleError muxMode versionNumber)
(HandshakeConnectionResult
(Handle
muxMode initiatorCtx responderCtx versionData ByteString m a b)
(versionNumber, versionData))
-> STM m ()
writePromise (HandleError muxMode versionNumber
-> Either
(HandleError muxMode versionNumber)
(HandshakeConnectionResult
(Handle
muxMode initiatorCtx responderCtx versionData ByteString m a b)
(versionNumber, versionData))
forall a b. a -> Either a b
Left (SomeException -> HandleError muxMode versionNumber
forall (muxMode :: Mode) versionNumber.
SomeException -> HandleError muxMode versionNumber
HandleError SomeException
err))
SomeException
-> m (Either
(HandshakeException versionNumber)
(HandshakeResult
(OuroborosBundle
muxMode initiatorCtx responderCtx ByteString m a b)
versionNumber
versionData))
forall e a. Exception e => e -> m a
forall (m :: * -> *) e a. (MonadThrow m, Exception e) => e -> m a
throwIO SomeException
err
case hsResult of
Left !HandshakeException versionNumber
err -> do
STM m () -> m ()
forall a. HasCallStack => STM m a -> m a
forall (m :: * -> *) a.
(MonadSTM m, HasCallStack) =>
STM m a -> m a
atomically (STM m () -> m ()) -> STM m () -> m ()
forall a b. (a -> b) -> a -> b
$ Either
(HandleError muxMode versionNumber)
(HandshakeConnectionResult
(Handle
muxMode initiatorCtx responderCtx versionData ByteString m a b)
(versionNumber, versionData))
-> STM m ()
writePromise (HandleError muxMode versionNumber
-> Either
(HandleError muxMode versionNumber)
(HandshakeConnectionResult
(Handle
muxMode initiatorCtx responderCtx versionData ByteString m a b)
(versionNumber, versionData))
forall a b. a -> Either a b
Left (HandshakeException versionNumber
-> HandleError muxMode versionNumber
forall (muxMode :: Mode) versionNumber.
(HasResponder muxMode ~ 'True) =>
HandshakeException versionNumber
-> HandleError muxMode versionNumber
HandleHandshakeServerError HandshakeException versionNumber
err))
Tracer m (ConnectionHandlerTrace versionNumber versionData)
-> ConnectionHandlerTrace versionNumber versionData -> m ()
forall (m :: * -> *) a. Tracer m a -> a -> m ()
traceWith Tracer m (ConnectionHandlerTrace versionNumber versionData)
tracer (HandshakeException versionNumber
-> ConnectionHandlerTrace versionNumber versionData
forall versionNumber versionData.
HandshakeException versionNumber
-> ConnectionHandlerTrace versionNumber versionData
TrHandshakeServerError HandshakeException versionNumber
err)
Right (HandshakeNegotiationResult OuroborosBundle muxMode initiatorCtx responderCtx ByteString m a b
app versionNumber
versionNumber versionData
agreedOptions) -> do
Tracer m (ConnectionHandlerTrace versionNumber versionData)
-> ConnectionHandlerTrace versionNumber versionData -> m ()
forall (m :: * -> *) a. Tracer m a -> a -> m ()
traceWith Tracer m (ConnectionHandlerTrace versionNumber versionData)
tracer (versionNumber
-> versionData -> ConnectionHandlerTrace versionNumber versionData
forall versionNumber versionData.
versionNumber
-> versionData -> ConnectionHandlerTrace versionNumber versionData
TrHandshakeSuccess versionNumber
versionNumber versionData
agreedOptions)
controlMessageBundle
<- (\StrictTVar m ControlMessage
a StrictTVar m ControlMessage
b StrictTVar m ControlMessage
c -> WithProtocolTemperature 'Hot (StrictTVar m ControlMessage)
-> WithProtocolTemperature 'Warm (StrictTVar m ControlMessage)
-> WithProtocolTemperature
'Established (StrictTVar m ControlMessage)
-> TemperatureBundle (StrictTVar m ControlMessage)
forall a.
WithProtocolTemperature 'Hot a
-> WithProtocolTemperature 'Warm a
-> WithProtocolTemperature 'Established a
-> TemperatureBundle a
TemperatureBundle (StrictTVar m ControlMessage
-> WithProtocolTemperature 'Hot (StrictTVar m ControlMessage)
forall a. a -> WithProtocolTemperature 'Hot a
WithHot StrictTVar m ControlMessage
a) (StrictTVar m ControlMessage
-> WithProtocolTemperature 'Warm (StrictTVar m ControlMessage)
forall a. a -> WithProtocolTemperature 'Warm a
WithWarm StrictTVar m ControlMessage
b) (StrictTVar m ControlMessage
-> WithProtocolTemperature
'Established (StrictTVar m ControlMessage)
forall a. a -> WithProtocolTemperature 'Established a
WithEstablished StrictTVar m ControlMessage
c))
(StrictTVar m ControlMessage
-> StrictTVar m ControlMessage
-> StrictTVar m ControlMessage
-> TemperatureBundle (StrictTVar m ControlMessage))
-> m (StrictTVar m ControlMessage)
-> m (StrictTVar m ControlMessage
-> StrictTVar m ControlMessage
-> TemperatureBundle (StrictTVar m ControlMessage))
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> ControlMessage -> m (StrictTVar m ControlMessage)
forall (m :: * -> *) a. MonadSTM m => a -> m (StrictTVar m a)
newTVarIO ControlMessage
Continue
m (StrictTVar m ControlMessage
-> StrictTVar m ControlMessage
-> TemperatureBundle (StrictTVar m ControlMessage))
-> m (StrictTVar m ControlMessage)
-> m (StrictTVar m ControlMessage
-> TemperatureBundle (StrictTVar m ControlMessage))
forall a b. m (a -> b) -> m a -> m b
forall (f :: * -> *) a b. Applicative f => f (a -> b) -> f a -> f b
<*> ControlMessage -> m (StrictTVar m ControlMessage)
forall (m :: * -> *) a. MonadSTM m => a -> m (StrictTVar m a)
newTVarIO ControlMessage
Continue
m (StrictTVar m ControlMessage
-> TemperatureBundle (StrictTVar m ControlMessage))
-> m (StrictTVar m ControlMessage)
-> m (TemperatureBundle (StrictTVar m ControlMessage))
forall a b. m (a -> b) -> m a -> m b
forall (f :: * -> *) a b. Applicative f => f (a -> b) -> f a -> f b
<*> ControlMessage -> m (StrictTVar m ControlMessage)
forall (m :: * -> *) a. MonadSTM m => a -> m (StrictTVar m a)
newTVarIO ControlMessage
Continue
mux <- Mx.new (mkMiniProtocolInfos (runForkPolicy forkPolicy remoteAddress) app)
let !handle = Handle {
hMux :: Mux muxMode m
hMux = Mux muxMode m
mux,
hMuxBundle :: OuroborosBundle muxMode initiatorCtx responderCtx ByteString m a b
hMuxBundle = OuroborosBundle muxMode initiatorCtx responderCtx ByteString m a b
app,
hControlMessage :: TemperatureBundle (StrictTVar m ControlMessage)
hControlMessage = TemperatureBundle (StrictTVar m ControlMessage)
controlMessageBundle,
hVersionData :: versionData
hVersionData = versionData
agreedOptions
}
atomically $ writePromise (Right $ HandshakeConnectionResult handle (versionNumber, agreedOptions))
withBuffer \Maybe (ReadBuffer m)
buffer -> do
bearer <- DiffTime -> socket -> Maybe (ReadBuffer m) -> m (Bearer m)
mkMuxBearer DiffTime
sduTimeout socket
socket Maybe (ReadBuffer m)
buffer
countersVar <- newTVarIO . SJust $ ResponderCounters 0 0
unmask $ Mx.run (Mx.tracersWithBearer connectionId muxTracers {
Mx.tracer = Mx.tracer muxTracers <> inboundGovernorMuxTracer countersVar
})
mux bearer
Right (HandshakeQueryResult Map versionNumber (Either Text versionData)
vMap) -> do
STM m () -> m ()
forall a. HasCallStack => STM m a -> m a
forall (m :: * -> *) a.
(MonadSTM m, HasCallStack) =>
STM m a -> m a
atomically (STM m () -> m ()) -> STM m () -> m ()
forall a b. (a -> b) -> a -> b
$ Either
(HandleError muxMode versionNumber)
(HandshakeConnectionResult
(Handle
muxMode initiatorCtx responderCtx versionData ByteString m a b)
(versionNumber, versionData))
-> STM m ()
writePromise (HandshakeConnectionResult
(Handle
muxMode initiatorCtx responderCtx versionData ByteString m a b)
(versionNumber, versionData)
-> Either
(HandleError muxMode versionNumber)
(HandshakeConnectionResult
(Handle
muxMode initiatorCtx responderCtx versionData ByteString m a b)
(versionNumber, versionData))
forall a b. b -> Either a b
Right HandshakeConnectionResult
(Handle
muxMode initiatorCtx responderCtx versionData ByteString m a b)
(versionNumber, versionData)
forall handle version. HandshakeConnectionResult handle version
HandshakeConnectionQuery)
Tracer m (ConnectionHandlerTrace versionNumber versionData)
-> ConnectionHandlerTrace versionNumber versionData -> m ()
forall (m :: * -> *) a. Tracer m a -> a -> m ()
traceWith Tracer m (ConnectionHandlerTrace versionNumber versionData)
tracer (ConnectionHandlerTrace versionNumber versionData -> m ())
-> ConnectionHandlerTrace versionNumber versionData -> m ()
forall a b. (a -> b) -> a -> b
$ Map versionNumber (Either Text versionData)
-> ConnectionHandlerTrace versionNumber versionData
forall versionNumber versionData.
Map versionNumber (Either Text versionData)
-> ConnectionHandlerTrace versionNumber versionData
TrHandshakeQuery Map versionNumber (Either Text versionData)
vMap
DiffTime -> m ()
forall (m :: * -> *). MonadDelay m => DiffTime -> m ()
threadDelay DiffTime
handshake_QUERY_SHUTDOWN_DELAY
data ConnectionHandlerTrace versionNumber versionData =
TrHandshakeSuccess versionNumber versionData
| TrHandshakeQuery (Map versionNumber (Either Text versionData))
| TrHandshakeClientError
(HandshakeException versionNumber)
| TrHandshakeServerError
(HandshakeException versionNumber)
| TrConnectionHandlerError ErrorContext SomeException ErrorCommand
deriving Int -> ConnectionHandlerTrace versionNumber versionData -> ShowS
[ConnectionHandlerTrace versionNumber versionData] -> ShowS
ConnectionHandlerTrace versionNumber versionData -> String
(Int -> ConnectionHandlerTrace versionNumber versionData -> ShowS)
-> (ConnectionHandlerTrace versionNumber versionData -> String)
-> ([ConnectionHandlerTrace versionNumber versionData] -> ShowS)
-> Show (ConnectionHandlerTrace versionNumber versionData)
forall a.
(Int -> a -> ShowS) -> (a -> String) -> ([a] -> ShowS) -> Show a
forall versionNumber versionData.
(Show versionNumber, Show versionData) =>
Int -> ConnectionHandlerTrace versionNumber versionData -> ShowS
forall versionNumber versionData.
(Show versionNumber, Show versionData) =>
[ConnectionHandlerTrace versionNumber versionData] -> ShowS
forall versionNumber versionData.
(Show versionNumber, Show versionData) =>
ConnectionHandlerTrace versionNumber versionData -> String
$cshowsPrec :: forall versionNumber versionData.
(Show versionNumber, Show versionData) =>
Int -> ConnectionHandlerTrace versionNumber versionData -> ShowS
showsPrec :: Int -> ConnectionHandlerTrace versionNumber versionData -> ShowS
$cshow :: forall versionNumber versionData.
(Show versionNumber, Show versionData) =>
ConnectionHandlerTrace versionNumber versionData -> String
show :: ConnectionHandlerTrace versionNumber versionData -> String
$cshowList :: forall versionNumber versionData.
(Show versionNumber, Show versionData) =>
[ConnectionHandlerTrace versionNumber versionData] -> ShowS
showList :: [ConnectionHandlerTrace versionNumber versionData] -> ShowS
Show