{-# LANGUAGE BangPatterns #-}
{-# LANGUAGE DataKinds #-}
{-# LANGUAGE ExistentialQuantification #-}
{-# LANGUAGE FlexibleContexts #-}
{-# LANGUAGE GADTs #-}
{-# LANGUAGE KindSignatures #-}
{-# LANGUAGE NamedFieldPuns #-}
{-# LANGUAGE RankNTypes #-}
{-# LANGUAGE ScopedTypeVariables #-}
{-# LANGUAGE TypeOperators #-}
module Ouroboros.Network.ConnectionHandler
( Handle (..)
, HandleWithExpandedCtx
, HandleWithMinimalCtx
, HandleError (..)
, classifyHandleError
, MuxConnectionHandler
, makeConnectionHandler
, MuxConnectionManager
, ConnectionManagerWithExpandedCtx
, ConnectionHandlerTrace (..)
) where
import Control.Applicative (Alternative)
import Control.Concurrent.Class.MonadSTM.Strict
import Control.Exception (SomeAsyncException)
import Control.Monad.Class.MonadAsync
import Control.Monad.Class.MonadFork
import Control.Monad.Class.MonadThrow hiding (handle)
import Control.Monad.Class.MonadTime.SI
import Control.Monad.Class.MonadTimer.SI
import Control.Tracer (Tracer, contramap, traceWith)
import Data.ByteString.Lazy (ByteString)
import Data.Map (Map)
import Data.Text (Text)
import Data.Typeable (Typeable)
import Network.Mux (Mux)
import Network.Mux qualified as Mx
import Ouroboros.Network.ConnectionId (ConnectionId (..))
import Ouroboros.Network.ConnectionManager.Types
import Ouroboros.Network.Context (ExpandedInitiatorContext,
MinimalInitiatorContext, ResponderContext)
import Ouroboros.Network.ControlMessage (ControlMessage (..))
import Ouroboros.Network.Mux
import Ouroboros.Network.MuxMode
import Ouroboros.Network.Protocol.Handshake
import Ouroboros.Network.Protocol.Handshake.Version qualified as Handshake
import Ouroboros.Network.RethrowPolicy
sduTimeout :: DiffTime
sduTimeout :: DiffTime
sduTimeout = DiffTime
30
sduHandshakeTimeout :: DiffTime
sduHandshakeTimeout :: DiffTime
sduHandshakeTimeout = DiffTime
10
data Handle (muxMode :: Mx.Mode) initiatorCtx responderCtx versionData bytes m a b =
Handle {
forall (muxMode :: Mode) initiatorCtx responderCtx versionData
bytes (m :: * -> *) a b.
Handle muxMode initiatorCtx responderCtx versionData bytes m a b
-> Mux muxMode m
hMux :: !(Mux muxMode m),
forall (muxMode :: Mode) initiatorCtx responderCtx versionData
bytes (m :: * -> *) a b.
Handle muxMode initiatorCtx responderCtx versionData bytes m a b
-> OuroborosBundle muxMode initiatorCtx responderCtx bytes m a b
hMuxBundle :: !(OuroborosBundle muxMode initiatorCtx responderCtx bytes m a b),
forall (muxMode :: Mode) initiatorCtx responderCtx versionData
bytes (m :: * -> *) a b.
Handle muxMode initiatorCtx responderCtx versionData bytes m a b
-> TemperatureBundle (StrictTVar m ControlMessage)
hControlMessage :: !(TemperatureBundle (StrictTVar m ControlMessage)),
forall (muxMode :: Mode) initiatorCtx responderCtx versionData
bytes (m :: * -> *) a b.
Handle muxMode initiatorCtx responderCtx versionData bytes m a b
-> versionData
hVersionData :: !versionData
}
type HandleWithExpandedCtx muxMode peerAddr versionData bytes m a b =
Handle muxMode (ExpandedInitiatorContext peerAddr m)
(ResponderContext peerAddr)
versionData bytes m a b
type HandleWithMinimalCtx muxMode peerAddr versionData bytes m a b =
Handle muxMode (MinimalInitiatorContext peerAddr)
(ResponderContext peerAddr)
versionData bytes m a b
data HandleError (muxMode :: Mx.Mode) versionNumber where
HandleHandshakeClientError
:: HasInitiator muxMode ~ True
=> !(HandshakeException versionNumber)
-> HandleError muxMode versionNumber
HandleHandshakeServerError
:: HasResponder muxMode ~ True
=> !(HandshakeException versionNumber)
-> HandleError muxMode versionNumber
HandleError
:: !SomeException
-> HandleError muxMode versionNumber
instance Show versionNumber
=> Show (HandleError muxMode versionNumber) where
show :: HandleError muxMode versionNumber -> String
show (HandleHandshakeServerError HandshakeException versionNumber
err) = String
"HandleHandshakeServerError " String -> ShowS
forall a. [a] -> [a] -> [a]
++ HandshakeException versionNumber -> String
forall a. Show a => a -> String
show HandshakeException versionNumber
err
show (HandleHandshakeClientError HandshakeException versionNumber
err) = String
"HandleHandshakeClientError " String -> ShowS
forall a. [a] -> [a] -> [a]
++ HandshakeException versionNumber -> String
forall a. Show a => a -> String
show HandshakeException versionNumber
err
show (HandleError SomeException
err) = String
"HandleError " String -> ShowS
forall a. [a] -> [a] -> [a]
++ SomeException -> String
forall a. Show a => a -> String
show SomeException
err
classifyHandleError :: HandleError muxMode versionNumber
-> HandleErrorType
classifyHandleError :: forall (muxMode :: Mode) versionNumber.
HandleError muxMode versionNumber -> HandleErrorType
classifyHandleError (HandleHandshakeClientError (HandshakeProtocolLimit ProtocolLimitFailure
_)) =
HandleErrorType
HandshakeProtocolViolation
classifyHandleError (HandleHandshakeClientError (HandshakeProtocolError HandshakeProtocolError versionNumber
_)) =
HandleErrorType
HandshakeFailure
classifyHandleError (HandleHandshakeServerError (HandshakeProtocolLimit ProtocolLimitFailure
_)) =
HandleErrorType
HandshakeProtocolViolation
classifyHandleError (HandleHandshakeServerError (HandshakeProtocolError HandshakeProtocolError versionNumber
_)) =
HandleErrorType
HandshakeFailure
classifyHandleError (HandleError SomeException
_) =
HandleErrorType
HandshakeProtocolViolation
type MuxConnectionHandler muxMode socket initiatorCtx responderCtx peerAddr versionNumber versionData bytes m a b =
ConnectionHandler muxMode
(ConnectionHandlerTrace versionNumber versionData)
socket
peerAddr
(Handle muxMode initiatorCtx responderCtx versionData bytes m a b)
(HandleError muxMode versionNumber)
versionNumber
versionData
m
type MuxConnectionManager muxMode socket initiatorCtx responderCtx peerAddr versionData versionNumber bytes m a b =
ConnectionManager muxMode socket peerAddr
(Handle muxMode initiatorCtx responderCtx versionData bytes m a b)
(HandleError muxMode versionNumber)
m
type ConnectionManagerWithExpandedCtx muxMode socket peerAddr versionData versionNumber bytes m a b =
ConnectionManager muxMode socket peerAddr
(HandleWithExpandedCtx muxMode peerAddr versionData bytes m a b)
(HandleError muxMode versionNumber)
m
makeConnectionHandler
:: forall initiatorCtx responderCtx peerAddr muxMode socket versionNumber versionData m a b.
( Alternative (STM m)
, MonadAsync m
, MonadDelay m
, MonadFork m
, MonadLabelledSTM m
, MonadThrow (STM m)
, MonadTimer m
, MonadMask m
, Ord versionNumber
, Show peerAddr
, Typeable peerAddr
)
=> Tracer m (Mx.WithBearer (ConnectionId peerAddr) Mx.Trace)
-> SingMuxMode muxMode
-> HandshakeArguments (ConnectionId peerAddr) versionNumber versionData m
-> Versions versionNumber versionData
(OuroborosBundle muxMode initiatorCtx responderCtx ByteString m a b)
-> (ThreadId m, RethrowPolicy)
-> MuxConnectionHandler muxMode socket initiatorCtx responderCtx peerAddr versionNumber versionData ByteString m a b
makeConnectionHandler :: forall initiatorCtx responderCtx peerAddr (muxMode :: Mode) socket
versionNumber versionData (m :: * -> *) a b.
(Alternative (STM m), MonadAsync m, MonadDelay m, MonadFork m,
MonadLabelledSTM m, MonadThrow (STM m), MonadTimer m, MonadMask m,
Ord versionNumber, Show peerAddr, Typeable peerAddr) =>
Tracer m (WithBearer (ConnectionId peerAddr) Trace)
-> SingMuxMode muxMode
-> HandshakeArguments
(ConnectionId peerAddr) versionNumber versionData m
-> Versions
versionNumber
versionData
(OuroborosBundle
muxMode initiatorCtx responderCtx ByteString m a b)
-> (ThreadId m, RethrowPolicy)
-> MuxConnectionHandler
muxMode
socket
initiatorCtx
responderCtx
peerAddr
versionNumber
versionData
ByteString
m
a
b
makeConnectionHandler Tracer m (WithBearer (ConnectionId peerAddr) Trace)
muxTracer SingMuxMode muxMode
singMuxMode
HandshakeArguments
(ConnectionId peerAddr) versionNumber versionData m
handshakeArguments
Versions
versionNumber
versionData
(OuroborosBundle
muxMode initiatorCtx responderCtx ByteString m a b)
versionedApplication
(ThreadId m
mainThreadId, RethrowPolicy
rethrowPolicy) =
ConnectionHandler {
connectionHandler :: WithMuxTuple
muxMode
(ConnectionHandlerFn
(ConnectionHandlerTrace versionNumber versionData)
socket
peerAddr
(Handle
muxMode initiatorCtx responderCtx versionData ByteString m a b)
(HandleError muxMode versionNumber)
versionNumber
versionData
m)
connectionHandler =
case SingMuxMode muxMode
singMuxMode of
SingMuxMode muxMode
SingInitiatorMode ->
ConnectionHandlerFn
(ConnectionHandlerTrace versionNumber versionData)
socket
peerAddr
(Handle
muxMode initiatorCtx responderCtx versionData ByteString m a b)
(HandleError muxMode versionNumber)
versionNumber
versionData
m
-> WithMuxMode
'InitiatorMode
(ConnectionHandlerFn
(ConnectionHandlerTrace versionNumber versionData)
socket
peerAddr
(Handle
muxMode initiatorCtx responderCtx versionData ByteString m a b)
(HandleError muxMode versionNumber)
versionNumber
versionData
m)
(ConnectionHandlerFn
(ConnectionHandlerTrace versionNumber versionData)
socket
peerAddr
(Handle
muxMode initiatorCtx responderCtx versionData ByteString m a b)
(HandleError muxMode versionNumber)
versionNumber
versionData
m)
forall a b. a -> WithMuxMode 'InitiatorMode a b
WithInitiatorMode (HasInitiator muxMode ~ 'True) =>
ConnectionHandlerFn
(ConnectionHandlerTrace versionNumber versionData)
socket
peerAddr
(Handle
muxMode initiatorCtx responderCtx versionData ByteString m a b)
(HandleError muxMode versionNumber)
versionNumber
versionData
m
ConnectionHandlerFn
(ConnectionHandlerTrace versionNumber versionData)
socket
peerAddr
(Handle
muxMode initiatorCtx responderCtx versionData ByteString m a b)
(HandleError muxMode versionNumber)
versionNumber
versionData
m
outboundConnectionHandler
SingMuxMode muxMode
SingResponderMode ->
ConnectionHandlerFn
(ConnectionHandlerTrace versionNumber versionData)
socket
peerAddr
(Handle
muxMode initiatorCtx responderCtx versionData ByteString m a b)
(HandleError muxMode versionNumber)
versionNumber
versionData
m
-> WithMuxMode
'ResponderMode
(ConnectionHandlerFn
(ConnectionHandlerTrace versionNumber versionData)
socket
peerAddr
(Handle
muxMode initiatorCtx responderCtx versionData ByteString m a b)
(HandleError muxMode versionNumber)
versionNumber
versionData
m)
(ConnectionHandlerFn
(ConnectionHandlerTrace versionNumber versionData)
socket
peerAddr
(Handle
muxMode initiatorCtx responderCtx versionData ByteString m a b)
(HandleError muxMode versionNumber)
versionNumber
versionData
m)
forall b a. b -> WithMuxMode 'ResponderMode a b
WithResponderMode (HasResponder muxMode ~ 'True) =>
ConnectionHandlerFn
(ConnectionHandlerTrace versionNumber versionData)
socket
peerAddr
(Handle
muxMode initiatorCtx responderCtx versionData ByteString m a b)
(HandleError muxMode versionNumber)
versionNumber
versionData
m
ConnectionHandlerFn
(ConnectionHandlerTrace versionNumber versionData)
socket
peerAddr
(Handle
muxMode initiatorCtx responderCtx versionData ByteString m a b)
(HandleError muxMode versionNumber)
versionNumber
versionData
m
inboundConnectionHandler
SingMuxMode muxMode
SingInitiatorResponderMode ->
ConnectionHandlerFn
(ConnectionHandlerTrace versionNumber versionData)
socket
peerAddr
(Handle
muxMode initiatorCtx responderCtx versionData ByteString m a b)
(HandleError muxMode versionNumber)
versionNumber
versionData
m
-> ConnectionHandlerFn
(ConnectionHandlerTrace versionNumber versionData)
socket
peerAddr
(Handle
muxMode initiatorCtx responderCtx versionData ByteString m a b)
(HandleError muxMode versionNumber)
versionNumber
versionData
m
-> WithMuxMode
'InitiatorResponderMode
(ConnectionHandlerFn
(ConnectionHandlerTrace versionNumber versionData)
socket
peerAddr
(Handle
muxMode initiatorCtx responderCtx versionData ByteString m a b)
(HandleError muxMode versionNumber)
versionNumber
versionData
m)
(ConnectionHandlerFn
(ConnectionHandlerTrace versionNumber versionData)
socket
peerAddr
(Handle
muxMode initiatorCtx responderCtx versionData ByteString m a b)
(HandleError muxMode versionNumber)
versionNumber
versionData
m)
forall a b. a -> b -> WithMuxMode 'InitiatorResponderMode a b
WithInitiatorResponderMode (HasInitiator muxMode ~ 'True) =>
ConnectionHandlerFn
(ConnectionHandlerTrace versionNumber versionData)
socket
peerAddr
(Handle
muxMode initiatorCtx responderCtx versionData ByteString m a b)
(HandleError muxMode versionNumber)
versionNumber
versionData
m
ConnectionHandlerFn
(ConnectionHandlerTrace versionNumber versionData)
socket
peerAddr
(Handle
muxMode initiatorCtx responderCtx versionData ByteString m a b)
(HandleError muxMode versionNumber)
versionNumber
versionData
m
outboundConnectionHandler
(HasResponder muxMode ~ 'True) =>
ConnectionHandlerFn
(ConnectionHandlerTrace versionNumber versionData)
socket
peerAddr
(Handle
muxMode initiatorCtx responderCtx versionData ByteString m a b)
(HandleError muxMode versionNumber)
versionNumber
versionData
m
ConnectionHandlerFn
(ConnectionHandlerTrace versionNumber versionData)
socket
peerAddr
(Handle
muxMode initiatorCtx responderCtx versionData ByteString m a b)
(HandleError muxMode versionNumber)
versionNumber
versionData
m
inboundConnectionHandler
}
where
classifyExceptions :: forall x.
Tracer m (ConnectionHandlerTrace versionNumber versionData)
-> peerAddr
-> ErrorContext
-> m x -> m x
classifyExceptions :: forall x.
Tracer m (ConnectionHandlerTrace versionNumber versionData)
-> peerAddr -> ErrorContext -> m x -> m x
classifyExceptions Tracer m (ConnectionHandlerTrace versionNumber versionData)
tracer peerAddr
remoteAddress ErrorContext
ctx m x
io =
(SomeException -> Maybe SomeException)
-> m x -> (SomeException -> m x) -> m x
forall e b a.
Exception e =>
(e -> Maybe b) -> m a -> (b -> m a) -> m a
forall (m :: * -> *) e b a.
(MonadCatch m, Exception e) =>
(e -> Maybe b) -> m a -> (b -> m a) -> m a
catchJust
(\SomeException
e -> case SomeException -> Maybe SomeAsyncException
forall e. Exception e => SomeException -> Maybe e
fromException SomeException
e :: Maybe SomeAsyncException of
Just SomeAsyncException
_ -> Maybe SomeException
forall a. Maybe a
Nothing
Maybe SomeAsyncException
Nothing -> SomeException -> Maybe SomeException
forall a. a -> Maybe a
Just SomeException
e)
m x
io
((SomeException -> m x) -> m x) -> (SomeException -> m x) -> m x
forall a b. (a -> b) -> a -> b
$ \SomeException
err -> do
let cmd :: ErrorCommand
cmd = RethrowPolicy -> ErrorContext -> SomeException -> ErrorCommand
runRethrowPolicy RethrowPolicy
rethrowPolicy ErrorContext
ctx SomeException
err
Tracer m (ConnectionHandlerTrace versionNumber versionData)
-> ConnectionHandlerTrace versionNumber versionData -> m ()
forall (m :: * -> *) a. Tracer m a -> a -> m ()
traceWith Tracer m (ConnectionHandlerTrace versionNumber versionData)
tracer (ErrorContext
-> SomeException
-> ErrorCommand
-> ConnectionHandlerTrace versionNumber versionData
forall versionNumber versionData.
ErrorContext
-> SomeException
-> ErrorCommand
-> ConnectionHandlerTrace versionNumber versionData
TrConnectionHandlerError ErrorContext
ctx SomeException
err ErrorCommand
cmd)
case ErrorCommand
cmd of
ErrorCommand
ShutdownNode -> do
ThreadId m -> ExceptionInHandler -> m ()
forall e. Exception e => ThreadId m -> e -> m ()
forall (m :: * -> *) e.
(MonadFork m, Exception e) =>
ThreadId m -> e -> m ()
throwTo ThreadId m
mainThreadId (peerAddr -> SomeException -> ExceptionInHandler
forall peerAddr.
(Typeable peerAddr, Show peerAddr) =>
peerAddr -> SomeException -> ExceptionInHandler
ExceptionInHandler peerAddr
remoteAddress SomeException
err)
SomeException -> m x
forall e a. Exception e => e -> m a
forall (m :: * -> *) e a. (MonadThrow m, Exception e) => e -> m a
throwIO SomeException
err
ErrorCommand
ShutdownPeer ->
ExceptionInHandler -> m x
forall e a. Exception e => e -> m a
forall (m :: * -> *) e a. (MonadThrow m, Exception e) => e -> m a
throwIO (peerAddr -> SomeException -> ExceptionInHandler
forall peerAddr.
(Typeable peerAddr, Show peerAddr) =>
peerAddr -> SomeException -> ExceptionInHandler
ExceptionInHandler peerAddr
remoteAddress SomeException
err)
outboundConnectionHandler
:: HasInitiator muxMode ~ True
=> ConnectionHandlerFn (ConnectionHandlerTrace versionNumber versionData)
socket
peerAddr
(Handle muxMode initiatorCtx responderCtx versionData ByteString m a b)
(HandleError muxMode versionNumber)
versionNumber
versionData
m
outboundConnectionHandler :: (HasInitiator muxMode ~ 'True) =>
ConnectionHandlerFn
(ConnectionHandlerTrace versionNumber versionData)
socket
peerAddr
(Handle
muxMode initiatorCtx responderCtx versionData ByteString m a b)
(HandleError muxMode versionNumber)
versionNumber
versionData
m
outboundConnectionHandler versionData -> versionData
versionDataFn
socket
socket
PromiseWriter { Either
(HandleError muxMode versionNumber)
(HandshakeConnectionResult
(Handle
muxMode initiatorCtx responderCtx versionData ByteString m a b)
(versionNumber, versionData))
-> STM m ()
writePromise :: Either
(HandleError muxMode versionNumber)
(HandshakeConnectionResult
(Handle
muxMode initiatorCtx responderCtx versionData ByteString m a b)
(versionNumber, versionData))
-> STM m ()
writePromise :: forall (m :: * -> *) a. PromiseWriter m a -> a -> STM m ()
writePromise }
Tracer m (ConnectionHandlerTrace versionNumber versionData)
tracer
connectionId :: ConnectionId peerAddr
connectionId@ConnectionId { peerAddr
localAddress :: peerAddr
localAddress :: forall addr. ConnectionId addr -> addr
localAddress
, peerAddr
remoteAddress :: peerAddr
remoteAddress :: forall addr. ConnectionId addr -> addr
remoteAddress }
DiffTime -> socket -> m (Bearer m)
mkMuxBearer
= MaskedAction { (forall x. m x -> m x) -> m ()
runWithUnmask :: (forall x. m x -> m x) -> m ()
runWithUnmask :: (forall x. m x -> m x) -> m ()
runWithUnmask }
where
runWithUnmask :: (forall x. m x -> m x) -> m ()
runWithUnmask :: (forall x. m x -> m x) -> m ()
runWithUnmask forall x. m x -> m x
unmask =
Tracer m (ConnectionHandlerTrace versionNumber versionData)
-> peerAddr -> ErrorContext -> m () -> m ()
forall x.
Tracer m (ConnectionHandlerTrace versionNumber versionData)
-> peerAddr -> ErrorContext -> m x -> m x
classifyExceptions Tracer m (ConnectionHandlerTrace versionNumber versionData)
tracer peerAddr
remoteAddress ErrorContext
OutboundError (m () -> m ()) -> m () -> m ()
forall a b. (a -> b) -> a -> b
$ do
String -> m ()
forall (m :: * -> *). MonadThread m => String -> m ()
labelThisThread ([String] -> String
forall (t :: * -> *) a. Foldable t => t [a] -> [a]
concat [String
"out-conn-hndlr-"
, peerAddr -> String
forall a. Show a => a -> String
show peerAddr
localAddress
, String
"-"
, peerAddr -> String
forall a. Show a => a -> String
show peerAddr
remoteAddress
])
handshakeBearer <- DiffTime -> socket -> m (Bearer m)
mkMuxBearer DiffTime
sduHandshakeTimeout socket
socket
hsResult <-
unmask (runHandshakeClient handshakeBearer
connectionId
handshakeArguments
(Handshake.updateVersionData versionDataFn versionedApplication))
`catch` \(SomeException
err :: SomeException) -> do
STM m () -> m ()
forall a. HasCallStack => STM m a -> m a
forall (m :: * -> *) a.
(MonadSTM m, HasCallStack) =>
STM m a -> m a
atomically (STM m () -> m ()) -> STM m () -> m ()
forall a b. (a -> b) -> a -> b
$ Either
(HandleError muxMode versionNumber)
(HandshakeConnectionResult
(Handle
muxMode initiatorCtx responderCtx versionData ByteString m a b)
(versionNumber, versionData))
-> STM m ()
writePromise (HandleError muxMode versionNumber
-> Either
(HandleError muxMode versionNumber)
(HandshakeConnectionResult
(Handle
muxMode initiatorCtx responderCtx versionData ByteString m a b)
(versionNumber, versionData))
forall a b. a -> Either a b
Left (SomeException -> HandleError muxMode versionNumber
forall (muxMode :: Mode) versionNumber.
SomeException -> HandleError muxMode versionNumber
HandleError SomeException
err))
SomeException
-> m (Either
(HandshakeException versionNumber)
(HandshakeResult
(OuroborosBundle
muxMode initiatorCtx responderCtx ByteString m a b)
versionNumber
versionData))
forall e a. Exception e => e -> m a
forall (m :: * -> *) e a. (MonadThrow m, Exception e) => e -> m a
throwIO SomeException
err
case hsResult of
Left !HandshakeException versionNumber
err -> do
STM m () -> m ()
forall a. HasCallStack => STM m a -> m a
forall (m :: * -> *) a.
(MonadSTM m, HasCallStack) =>
STM m a -> m a
atomically (STM m () -> m ()) -> STM m () -> m ()
forall a b. (a -> b) -> a -> b
$ Either
(HandleError muxMode versionNumber)
(HandshakeConnectionResult
(Handle
muxMode initiatorCtx responderCtx versionData ByteString m a b)
(versionNumber, versionData))
-> STM m ()
writePromise (HandleError muxMode versionNumber
-> Either
(HandleError muxMode versionNumber)
(HandshakeConnectionResult
(Handle
muxMode initiatorCtx responderCtx versionData ByteString m a b)
(versionNumber, versionData))
forall a b. a -> Either a b
Left (HandshakeException versionNumber
-> HandleError muxMode versionNumber
forall (muxMode :: Mode) versionNumber.
(HasInitiator muxMode ~ 'True) =>
HandshakeException versionNumber
-> HandleError muxMode versionNumber
HandleHandshakeClientError HandshakeException versionNumber
err))
Tracer m (ConnectionHandlerTrace versionNumber versionData)
-> ConnectionHandlerTrace versionNumber versionData -> m ()
forall (m :: * -> *) a. Tracer m a -> a -> m ()
traceWith Tracer m (ConnectionHandlerTrace versionNumber versionData)
tracer (HandshakeException versionNumber
-> ConnectionHandlerTrace versionNumber versionData
forall versionNumber versionData.
HandshakeException versionNumber
-> ConnectionHandlerTrace versionNumber versionData
TrHandshakeClientError HandshakeException versionNumber
err)
Right (HandshakeNegotiationResult OuroborosBundle muxMode initiatorCtx responderCtx ByteString m a b
app versionNumber
versionNumber versionData
agreedOptions) ->
m () -> m ()
forall x. m x -> m x
unmask (m () -> m ()) -> m () -> m ()
forall a b. (a -> b) -> a -> b
$ do
Tracer m (ConnectionHandlerTrace versionNumber versionData)
-> ConnectionHandlerTrace versionNumber versionData -> m ()
forall (m :: * -> *) a. Tracer m a -> a -> m ()
traceWith Tracer m (ConnectionHandlerTrace versionNumber versionData)
tracer (versionNumber
-> versionData -> ConnectionHandlerTrace versionNumber versionData
forall versionNumber versionData.
versionNumber
-> versionData -> ConnectionHandlerTrace versionNumber versionData
TrHandshakeSuccess versionNumber
versionNumber versionData
agreedOptions)
controlMessageBundle
<- (\StrictTVar m ControlMessage
a StrictTVar m ControlMessage
b StrictTVar m ControlMessage
c -> WithProtocolTemperature 'Hot (StrictTVar m ControlMessage)
-> WithProtocolTemperature 'Warm (StrictTVar m ControlMessage)
-> WithProtocolTemperature
'Established (StrictTVar m ControlMessage)
-> TemperatureBundle (StrictTVar m ControlMessage)
forall a.
WithProtocolTemperature 'Hot a
-> WithProtocolTemperature 'Warm a
-> WithProtocolTemperature 'Established a
-> TemperatureBundle a
TemperatureBundle (StrictTVar m ControlMessage
-> WithProtocolTemperature 'Hot (StrictTVar m ControlMessage)
forall a. a -> WithProtocolTemperature 'Hot a
WithHot StrictTVar m ControlMessage
a) (StrictTVar m ControlMessage
-> WithProtocolTemperature 'Warm (StrictTVar m ControlMessage)
forall a. a -> WithProtocolTemperature 'Warm a
WithWarm StrictTVar m ControlMessage
b) (StrictTVar m ControlMessage
-> WithProtocolTemperature
'Established (StrictTVar m ControlMessage)
forall a. a -> WithProtocolTemperature 'Established a
WithEstablished StrictTVar m ControlMessage
c))
(StrictTVar m ControlMessage
-> StrictTVar m ControlMessage
-> StrictTVar m ControlMessage
-> TemperatureBundle (StrictTVar m ControlMessage))
-> m (StrictTVar m ControlMessage)
-> m (StrictTVar m ControlMessage
-> StrictTVar m ControlMessage
-> TemperatureBundle (StrictTVar m ControlMessage))
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> ControlMessage -> m (StrictTVar m ControlMessage)
forall (m :: * -> *) a. MonadSTM m => a -> m (StrictTVar m a)
newTVarIO ControlMessage
Continue
m (StrictTVar m ControlMessage
-> StrictTVar m ControlMessage
-> TemperatureBundle (StrictTVar m ControlMessage))
-> m (StrictTVar m ControlMessage)
-> m (StrictTVar m ControlMessage
-> TemperatureBundle (StrictTVar m ControlMessage))
forall a b. m (a -> b) -> m a -> m b
forall (f :: * -> *) a b. Applicative f => f (a -> b) -> f a -> f b
<*> ControlMessage -> m (StrictTVar m ControlMessage)
forall (m :: * -> *) a. MonadSTM m => a -> m (StrictTVar m a)
newTVarIO ControlMessage
Continue
m (StrictTVar m ControlMessage
-> TemperatureBundle (StrictTVar m ControlMessage))
-> m (StrictTVar m ControlMessage)
-> m (TemperatureBundle (StrictTVar m ControlMessage))
forall a b. m (a -> b) -> m a -> m b
forall (f :: * -> *) a b. Applicative f => f (a -> b) -> f a -> f b
<*> ControlMessage -> m (StrictTVar m ControlMessage)
forall (m :: * -> *) a. MonadSTM m => a -> m (StrictTVar m a)
newTVarIO ControlMessage
Continue
mux <- Mx.new (mkMiniProtocolInfos app)
let !handle = Handle {
hMux :: Mux muxMode m
hMux = Mux muxMode m
mux,
hMuxBundle :: OuroborosBundle muxMode initiatorCtx responderCtx ByteString m a b
hMuxBundle = OuroborosBundle muxMode initiatorCtx responderCtx ByteString m a b
app,
hControlMessage :: TemperatureBundle (StrictTVar m ControlMessage)
hControlMessage = TemperatureBundle (StrictTVar m ControlMessage)
controlMessageBundle,
hVersionData :: versionData
hVersionData = versionData
agreedOptions
}
atomically $ writePromise (Right $ HandshakeConnectionResult handle (versionNumber, agreedOptions))
bearer <- mkMuxBearer sduTimeout socket
Mx.run (Mx.WithBearer connectionId `contramap` muxTracer)
mux bearer
Right (HandshakeQueryResult Map versionNumber (Either Text versionData)
vMap) -> do
STM m () -> m ()
forall a. HasCallStack => STM m a -> m a
forall (m :: * -> *) a.
(MonadSTM m, HasCallStack) =>
STM m a -> m a
atomically (STM m () -> m ()) -> STM m () -> m ()
forall a b. (a -> b) -> a -> b
$ Either
(HandleError muxMode versionNumber)
(HandshakeConnectionResult
(Handle
muxMode initiatorCtx responderCtx versionData ByteString m a b)
(versionNumber, versionData))
-> STM m ()
writePromise (HandshakeConnectionResult
(Handle
muxMode initiatorCtx responderCtx versionData ByteString m a b)
(versionNumber, versionData)
-> Either
(HandleError muxMode versionNumber)
(HandshakeConnectionResult
(Handle
muxMode initiatorCtx responderCtx versionData ByteString m a b)
(versionNumber, versionData))
forall a b. b -> Either a b
Right HandshakeConnectionResult
(Handle
muxMode initiatorCtx responderCtx versionData ByteString m a b)
(versionNumber, versionData)
forall handle version. HandshakeConnectionResult handle version
HandshakeConnectionQuery)
Tracer m (ConnectionHandlerTrace versionNumber versionData)
-> ConnectionHandlerTrace versionNumber versionData -> m ()
forall (m :: * -> *) a. Tracer m a -> a -> m ()
traceWith Tracer m (ConnectionHandlerTrace versionNumber versionData)
tracer (ConnectionHandlerTrace versionNumber versionData -> m ())
-> ConnectionHandlerTrace versionNumber versionData -> m ()
forall a b. (a -> b) -> a -> b
$ Map versionNumber (Either Text versionData)
-> ConnectionHandlerTrace versionNumber versionData
forall versionNumber versionData.
Map versionNumber (Either Text versionData)
-> ConnectionHandlerTrace versionNumber versionData
TrHandshakeQuery Map versionNumber (Either Text versionData)
vMap
inboundConnectionHandler
:: HasResponder muxMode ~ True
=> ConnectionHandlerFn (ConnectionHandlerTrace versionNumber versionData)
socket
peerAddr
(Handle muxMode initiatorCtx responderCtx versionData ByteString m a b)
(HandleError muxMode versionNumber)
versionNumber
versionData
m
inboundConnectionHandler :: (HasResponder muxMode ~ 'True) =>
ConnectionHandlerFn
(ConnectionHandlerTrace versionNumber versionData)
socket
peerAddr
(Handle
muxMode initiatorCtx responderCtx versionData ByteString m a b)
(HandleError muxMode versionNumber)
versionNumber
versionData
m
inboundConnectionHandler versionData -> versionData
updateVersionDataFn
socket
socket
PromiseWriter { Either
(HandleError muxMode versionNumber)
(HandshakeConnectionResult
(Handle
muxMode initiatorCtx responderCtx versionData ByteString m a b)
(versionNumber, versionData))
-> STM m ()
writePromise :: forall (m :: * -> *) a. PromiseWriter m a -> a -> STM m ()
writePromise :: Either
(HandleError muxMode versionNumber)
(HandshakeConnectionResult
(Handle
muxMode initiatorCtx responderCtx versionData ByteString m a b)
(versionNumber, versionData))
-> STM m ()
writePromise }
Tracer m (ConnectionHandlerTrace versionNumber versionData)
tracer
connectionId :: ConnectionId peerAddr
connectionId@ConnectionId { peerAddr
localAddress :: forall addr. ConnectionId addr -> addr
localAddress :: peerAddr
localAddress
, peerAddr
remoteAddress :: forall addr. ConnectionId addr -> addr
remoteAddress :: peerAddr
remoteAddress }
DiffTime -> socket -> m (Bearer m)
mkMuxBearer
= MaskedAction { (forall x. m x -> m x) -> m ()
runWithUnmask :: (forall x. m x -> m x) -> m ()
runWithUnmask :: (forall x. m x -> m x) -> m ()
runWithUnmask }
where
runWithUnmask :: (forall x. m x -> m x) -> m ()
runWithUnmask :: (forall x. m x -> m x) -> m ()
runWithUnmask forall x. m x -> m x
unmask =
Tracer m (ConnectionHandlerTrace versionNumber versionData)
-> peerAddr -> ErrorContext -> m () -> m ()
forall x.
Tracer m (ConnectionHandlerTrace versionNumber versionData)
-> peerAddr -> ErrorContext -> m x -> m x
classifyExceptions Tracer m (ConnectionHandlerTrace versionNumber versionData)
tracer peerAddr
remoteAddress ErrorContext
InboundError (m () -> m ()) -> m () -> m ()
forall a b. (a -> b) -> a -> b
$ do
String -> m ()
forall (m :: * -> *). MonadThread m => String -> m ()
labelThisThread ([String] -> String
forall (t :: * -> *) a. Foldable t => t [a] -> [a]
concat [String
"in-conn-hndlr-"
, peerAddr -> String
forall a. Show a => a -> String
show peerAddr
localAddress
, String
"-"
, peerAddr -> String
forall a. Show a => a -> String
show peerAddr
remoteAddress
])
handshakeBearer <- DiffTime -> socket -> m (Bearer m)
mkMuxBearer DiffTime
sduHandshakeTimeout socket
socket
hsResult <-
unmask (runHandshakeServer handshakeBearer
connectionId
handshakeArguments
(Handshake.updateVersionData updateVersionDataFn versionedApplication))
`catch` \(SomeException
err :: SomeException) -> do
STM m () -> m ()
forall a. HasCallStack => STM m a -> m a
forall (m :: * -> *) a.
(MonadSTM m, HasCallStack) =>
STM m a -> m a
atomically (STM m () -> m ()) -> STM m () -> m ()
forall a b. (a -> b) -> a -> b
$ Either
(HandleError muxMode versionNumber)
(HandshakeConnectionResult
(Handle
muxMode initiatorCtx responderCtx versionData ByteString m a b)
(versionNumber, versionData))
-> STM m ()
writePromise (HandleError muxMode versionNumber
-> Either
(HandleError muxMode versionNumber)
(HandshakeConnectionResult
(Handle
muxMode initiatorCtx responderCtx versionData ByteString m a b)
(versionNumber, versionData))
forall a b. a -> Either a b
Left (SomeException -> HandleError muxMode versionNumber
forall (muxMode :: Mode) versionNumber.
SomeException -> HandleError muxMode versionNumber
HandleError SomeException
err))
SomeException
-> m (Either
(HandshakeException versionNumber)
(HandshakeResult
(OuroborosBundle
muxMode initiatorCtx responderCtx ByteString m a b)
versionNumber
versionData))
forall e a. Exception e => e -> m a
forall (m :: * -> *) e a. (MonadThrow m, Exception e) => e -> m a
throwIO SomeException
err
case hsResult of
Left !HandshakeException versionNumber
err -> do
STM m () -> m ()
forall a. HasCallStack => STM m a -> m a
forall (m :: * -> *) a.
(MonadSTM m, HasCallStack) =>
STM m a -> m a
atomically (STM m () -> m ()) -> STM m () -> m ()
forall a b. (a -> b) -> a -> b
$ Either
(HandleError muxMode versionNumber)
(HandshakeConnectionResult
(Handle
muxMode initiatorCtx responderCtx versionData ByteString m a b)
(versionNumber, versionData))
-> STM m ()
writePromise (HandleError muxMode versionNumber
-> Either
(HandleError muxMode versionNumber)
(HandshakeConnectionResult
(Handle
muxMode initiatorCtx responderCtx versionData ByteString m a b)
(versionNumber, versionData))
forall a b. a -> Either a b
Left (HandshakeException versionNumber
-> HandleError muxMode versionNumber
forall (muxMode :: Mode) versionNumber.
(HasResponder muxMode ~ 'True) =>
HandshakeException versionNumber
-> HandleError muxMode versionNumber
HandleHandshakeServerError HandshakeException versionNumber
err))
Tracer m (ConnectionHandlerTrace versionNumber versionData)
-> ConnectionHandlerTrace versionNumber versionData -> m ()
forall (m :: * -> *) a. Tracer m a -> a -> m ()
traceWith Tracer m (ConnectionHandlerTrace versionNumber versionData)
tracer (HandshakeException versionNumber
-> ConnectionHandlerTrace versionNumber versionData
forall versionNumber versionData.
HandshakeException versionNumber
-> ConnectionHandlerTrace versionNumber versionData
TrHandshakeServerError HandshakeException versionNumber
err)
Right (HandshakeNegotiationResult OuroborosBundle muxMode initiatorCtx responderCtx ByteString m a b
app versionNumber
versionNumber versionData
agreedOptions) ->
m () -> m ()
forall x. m x -> m x
unmask (m () -> m ()) -> m () -> m ()
forall a b. (a -> b) -> a -> b
$ do
Tracer m (ConnectionHandlerTrace versionNumber versionData)
-> ConnectionHandlerTrace versionNumber versionData -> m ()
forall (m :: * -> *) a. Tracer m a -> a -> m ()
traceWith Tracer m (ConnectionHandlerTrace versionNumber versionData)
tracer (versionNumber
-> versionData -> ConnectionHandlerTrace versionNumber versionData
forall versionNumber versionData.
versionNumber
-> versionData -> ConnectionHandlerTrace versionNumber versionData
TrHandshakeSuccess versionNumber
versionNumber versionData
agreedOptions)
controlMessageBundle
<- (\StrictTVar m ControlMessage
a StrictTVar m ControlMessage
b StrictTVar m ControlMessage
c -> WithProtocolTemperature 'Hot (StrictTVar m ControlMessage)
-> WithProtocolTemperature 'Warm (StrictTVar m ControlMessage)
-> WithProtocolTemperature
'Established (StrictTVar m ControlMessage)
-> TemperatureBundle (StrictTVar m ControlMessage)
forall a.
WithProtocolTemperature 'Hot a
-> WithProtocolTemperature 'Warm a
-> WithProtocolTemperature 'Established a
-> TemperatureBundle a
TemperatureBundle (StrictTVar m ControlMessage
-> WithProtocolTemperature 'Hot (StrictTVar m ControlMessage)
forall a. a -> WithProtocolTemperature 'Hot a
WithHot StrictTVar m ControlMessage
a) (StrictTVar m ControlMessage
-> WithProtocolTemperature 'Warm (StrictTVar m ControlMessage)
forall a. a -> WithProtocolTemperature 'Warm a
WithWarm StrictTVar m ControlMessage
b) (StrictTVar m ControlMessage
-> WithProtocolTemperature
'Established (StrictTVar m ControlMessage)
forall a. a -> WithProtocolTemperature 'Established a
WithEstablished StrictTVar m ControlMessage
c))
(StrictTVar m ControlMessage
-> StrictTVar m ControlMessage
-> StrictTVar m ControlMessage
-> TemperatureBundle (StrictTVar m ControlMessage))
-> m (StrictTVar m ControlMessage)
-> m (StrictTVar m ControlMessage
-> StrictTVar m ControlMessage
-> TemperatureBundle (StrictTVar m ControlMessage))
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> ControlMessage -> m (StrictTVar m ControlMessage)
forall (m :: * -> *) a. MonadSTM m => a -> m (StrictTVar m a)
newTVarIO ControlMessage
Continue
m (StrictTVar m ControlMessage
-> StrictTVar m ControlMessage
-> TemperatureBundle (StrictTVar m ControlMessage))
-> m (StrictTVar m ControlMessage)
-> m (StrictTVar m ControlMessage
-> TemperatureBundle (StrictTVar m ControlMessage))
forall a b. m (a -> b) -> m a -> m b
forall (f :: * -> *) a b. Applicative f => f (a -> b) -> f a -> f b
<*> ControlMessage -> m (StrictTVar m ControlMessage)
forall (m :: * -> *) a. MonadSTM m => a -> m (StrictTVar m a)
newTVarIO ControlMessage
Continue
m (StrictTVar m ControlMessage
-> TemperatureBundle (StrictTVar m ControlMessage))
-> m (StrictTVar m ControlMessage)
-> m (TemperatureBundle (StrictTVar m ControlMessage))
forall a b. m (a -> b) -> m a -> m b
forall (f :: * -> *) a b. Applicative f => f (a -> b) -> f a -> f b
<*> ControlMessage -> m (StrictTVar m ControlMessage)
forall (m :: * -> *) a. MonadSTM m => a -> m (StrictTVar m a)
newTVarIO ControlMessage
Continue
mux <- Mx.new (mkMiniProtocolInfos app)
let !handle = Handle {
hMux :: Mux muxMode m
hMux = Mux muxMode m
mux,
hMuxBundle :: OuroborosBundle muxMode initiatorCtx responderCtx ByteString m a b
hMuxBundle = OuroborosBundle muxMode initiatorCtx responderCtx ByteString m a b
app,
hControlMessage :: TemperatureBundle (StrictTVar m ControlMessage)
hControlMessage = TemperatureBundle (StrictTVar m ControlMessage)
controlMessageBundle,
hVersionData :: versionData
hVersionData = versionData
agreedOptions
}
atomically $ writePromise (Right $ HandshakeConnectionResult handle (versionNumber, agreedOptions))
bearer <- mkMuxBearer sduTimeout socket
Mx.run (Mx.WithBearer connectionId `contramap` muxTracer)
mux bearer
Right (HandshakeQueryResult Map versionNumber (Either Text versionData)
vMap) -> do
STM m () -> m ()
forall a. HasCallStack => STM m a -> m a
forall (m :: * -> *) a.
(MonadSTM m, HasCallStack) =>
STM m a -> m a
atomically (STM m () -> m ()) -> STM m () -> m ()
forall a b. (a -> b) -> a -> b
$ Either
(HandleError muxMode versionNumber)
(HandshakeConnectionResult
(Handle
muxMode initiatorCtx responderCtx versionData ByteString m a b)
(versionNumber, versionData))
-> STM m ()
writePromise (HandshakeConnectionResult
(Handle
muxMode initiatorCtx responderCtx versionData ByteString m a b)
(versionNumber, versionData)
-> Either
(HandleError muxMode versionNumber)
(HandshakeConnectionResult
(Handle
muxMode initiatorCtx responderCtx versionData ByteString m a b)
(versionNumber, versionData))
forall a b. b -> Either a b
Right HandshakeConnectionResult
(Handle
muxMode initiatorCtx responderCtx versionData ByteString m a b)
(versionNumber, versionData)
forall handle version. HandshakeConnectionResult handle version
HandshakeConnectionQuery)
Tracer m (ConnectionHandlerTrace versionNumber versionData)
-> ConnectionHandlerTrace versionNumber versionData -> m ()
forall (m :: * -> *) a. Tracer m a -> a -> m ()
traceWith Tracer m (ConnectionHandlerTrace versionNumber versionData)
tracer (ConnectionHandlerTrace versionNumber versionData -> m ())
-> ConnectionHandlerTrace versionNumber versionData -> m ()
forall a b. (a -> b) -> a -> b
$ Map versionNumber (Either Text versionData)
-> ConnectionHandlerTrace versionNumber versionData
forall versionNumber versionData.
Map versionNumber (Either Text versionData)
-> ConnectionHandlerTrace versionNumber versionData
TrHandshakeQuery Map versionNumber (Either Text versionData)
vMap
DiffTime -> m ()
forall (m :: * -> *). MonadDelay m => DiffTime -> m ()
threadDelay DiffTime
handshake_QUERY_SHUTDOWN_DELAY
data ConnectionHandlerTrace versionNumber versionData =
TrHandshakeSuccess versionNumber versionData
| TrHandshakeQuery (Map versionNumber (Either Text versionData))
| TrHandshakeClientError
(HandshakeException versionNumber)
| TrHandshakeServerError
(HandshakeException versionNumber)
| TrConnectionHandlerError ErrorContext SomeException ErrorCommand
deriving Int -> ConnectionHandlerTrace versionNumber versionData -> ShowS
[ConnectionHandlerTrace versionNumber versionData] -> ShowS
ConnectionHandlerTrace versionNumber versionData -> String
(Int -> ConnectionHandlerTrace versionNumber versionData -> ShowS)
-> (ConnectionHandlerTrace versionNumber versionData -> String)
-> ([ConnectionHandlerTrace versionNumber versionData] -> ShowS)
-> Show (ConnectionHandlerTrace versionNumber versionData)
forall a.
(Int -> a -> ShowS) -> (a -> String) -> ([a] -> ShowS) -> Show a
forall versionNumber versionData.
(Show versionNumber, Show versionData) =>
Int -> ConnectionHandlerTrace versionNumber versionData -> ShowS
forall versionNumber versionData.
(Show versionNumber, Show versionData) =>
[ConnectionHandlerTrace versionNumber versionData] -> ShowS
forall versionNumber versionData.
(Show versionNumber, Show versionData) =>
ConnectionHandlerTrace versionNumber versionData -> String
$cshowsPrec :: forall versionNumber versionData.
(Show versionNumber, Show versionData) =>
Int -> ConnectionHandlerTrace versionNumber versionData -> ShowS
showsPrec :: Int -> ConnectionHandlerTrace versionNumber versionData -> ShowS
$cshow :: forall versionNumber versionData.
(Show versionNumber, Show versionData) =>
ConnectionHandlerTrace versionNumber versionData -> String
show :: ConnectionHandlerTrace versionNumber versionData -> String
$cshowList :: forall versionNumber versionData.
(Show versionNumber, Show versionData) =>
[ConnectionHandlerTrace versionNumber versionData] -> ShowS
showList :: [ConnectionHandlerTrace versionNumber versionData] -> ShowS
Show