{-# LANGUAGE BlockArguments #-}
{-# LANGUAGE CPP #-}
{-# LANGUAGE ConstraintKinds #-}
{-# LANGUAGE DataKinds #-}
{-# LANGUAGE DisambiguateRecordFields #-}
{-# LANGUAGE FlexibleContexts #-}
{-# LANGUAGE GADTs #-}
{-# LANGUAGE LambdaCase #-}
{-# LANGUAGE NamedFieldPuns #-}
{-# LANGUAGE PolyKinds #-}
{-# LANGUAGE RankNTypes #-}
{-# LANGUAGE ScopedTypeVariables #-}
{-# LANGUAGE TupleSections #-}
{-# LANGUAGE TypeApplications #-}
{-# LANGUAGE TypeOperators #-}
{-# OPTIONS_GHC -Wno-redundant-constraints #-}
#if __GLASGOW_HASKELL__ >= 908
{-# OPTIONS_GHC -Wno-x-partial #-}
#endif
module Test.Ouroboros.Network.ConnectionManager.Experiments
( ClientAndServerData (..)
, unidirectionalExperiment
, bidirectionalExperiment
, ConnectionManagerMonad
, withInitiatorOnlyConnectionManager
, withBidirectionalConnectionManager
, runInitiatorProtocols
, oneshotNextRequests
, WithNameAndBearer
) where
import Control.Applicative (Alternative)
import Control.Concurrent.Class.MonadSTM.Strict
import Control.Exception (AssertionFailed)
import Control.Monad (replicateM, (>=>))
import Control.Monad.Class.MonadAsync
import Control.Monad.Class.MonadFork
import Control.Monad.Class.MonadSay
import Control.Monad.Class.MonadST (MonadST)
import Control.Monad.Class.MonadThrow
import Control.Monad.Class.MonadTime.SI
import Control.Monad.Class.MonadTimer.SI
import Control.Monad.Fix (MonadFix)
import Control.Tracer (Tracer (..), contramap, nullTracer)
import Codec.Serialise.Class (Serialise)
import Data.ByteString.Lazy (ByteString)
import Data.ByteString.Lazy qualified as LBS
import Data.Functor (($>), (<&>))
import Data.Functor.Compose
import Data.Hashable
import Data.List (mapAccumL)
import Data.List.NonEmpty (NonEmpty (..))
import Data.Proxy (Proxy (..))
import Data.Typeable (Typeable)
import Data.Void (Void)
import System.Random (StdGen)
import System.Random qualified as Random
import Test.QuickCheck
import Codec.CBOR.Term (Term)
import Network.Mux qualified as Mx
import Network.TypedProtocol.Core
import Network.TypedProtocol.Peer.Client
import Network.TypedProtocol.ReqResp.Client
import Network.TypedProtocol.ReqResp.Codec.CBOR
import Network.TypedProtocol.ReqResp.Examples
import Network.TypedProtocol.ReqResp.Server
import Network.TypedProtocol.ReqResp.Type as ReqResp
import Ouroboros.Network.ConnectionHandler
import Ouroboros.Network.ConnectionId
import Ouroboros.Network.ConnectionManager.Core qualified as CM
import Ouroboros.Network.ConnectionManager.State qualified as CM
import Ouroboros.Network.ConnectionManager.Types
import Ouroboros.Network.Context
import Ouroboros.Network.ControlMessage
import Ouroboros.Network.Driver.Limits
import Ouroboros.Network.InboundGovernor qualified as InboundGovernor
import Ouroboros.Network.InboundGovernor.InformationChannel
(newInformationChannel)
import Ouroboros.Network.Mux
import Ouroboros.Network.MuxMode
import Ouroboros.Network.NodeToNode.Version (DiffusionMode (..))
import Ouroboros.Network.Protocol.Handshake
import Ouroboros.Network.Protocol.Handshake.Unversioned
import Ouroboros.Network.RethrowPolicy
import Ouroboros.Network.Server (RemoteTransitionTrace)
import Ouroboros.Network.Server qualified as Server
import Ouroboros.Network.Server.RateLimiting (AcceptedConnectionsLimit (..))
import Ouroboros.Network.Snocket (Snocket)
import Ouroboros.Network.Snocket qualified as Snocket
import Test.Ouroboros.Network.ConnectionManager.Timeouts
import Test.Ouroboros.Network.Orphans ()
import Test.Ouroboros.Network.Utils (WithName (..))
data ClientAndServerData req = ClientAndServerData {
forall req. ClientAndServerData req -> req
accumulatorInit :: req,
forall req. ClientAndServerData req -> [[req]]
hotInitiatorRequests :: [[req]],
forall req. ClientAndServerData req -> [[req]]
warmInitiatorRequests :: [[req]],
forall req. ClientAndServerData req -> [[req]]
establishedInitiatorRequests :: [[req]]
}
deriving Int -> ClientAndServerData req -> ShowS
[ClientAndServerData req] -> ShowS
ClientAndServerData req -> String
(Int -> ClientAndServerData req -> ShowS)
-> (ClientAndServerData req -> String)
-> ([ClientAndServerData req] -> ShowS)
-> Show (ClientAndServerData req)
forall req. Show req => Int -> ClientAndServerData req -> ShowS
forall req. Show req => [ClientAndServerData req] -> ShowS
forall req. Show req => ClientAndServerData req -> String
forall a.
(Int -> a -> ShowS) -> (a -> String) -> ([a] -> ShowS) -> Show a
$cshowsPrec :: forall req. Show req => Int -> ClientAndServerData req -> ShowS
showsPrec :: Int -> ClientAndServerData req -> ShowS
$cshow :: forall req. Show req => ClientAndServerData req -> String
show :: ClientAndServerData req -> String
$cshowList :: forall req. Show req => [ClientAndServerData req] -> ShowS
showList :: [ClientAndServerData req] -> ShowS
Show
numberOfRounds :: ClientAndServerData req -> Int
numberOfRounds :: forall req. ClientAndServerData req -> Int
numberOfRounds ClientAndServerData {
[[req]]
hotInitiatorRequests :: forall req. ClientAndServerData req -> [[req]]
hotInitiatorRequests :: [[req]]
hotInitiatorRequests,
[[req]]
warmInitiatorRequests :: forall req. ClientAndServerData req -> [[req]]
warmInitiatorRequests :: [[req]]
warmInitiatorRequests,
[[req]]
establishedInitiatorRequests :: forall req. ClientAndServerData req -> [[req]]
establishedInitiatorRequests :: [[req]]
establishedInitiatorRequests
} =
[[req]] -> Int
forall a. [a] -> Int
forall (t :: * -> *) a. Foldable t => t a -> Int
length [[req]]
hotInitiatorRequests
Int -> Int -> Int
forall a. Ord a => a -> a -> a
`max`
[[req]] -> Int
forall a. [a] -> Int
forall (t :: * -> *) a. Foldable t => t a -> Int
length [[req]]
warmInitiatorRequests
Int -> Int -> Int
forall a. Ord a => a -> a -> a
`max`
[[req]] -> Int
forall a. [a] -> Int
forall (t :: * -> *) a. Foldable t => t a -> Int
length [[req]]
establishedInitiatorRequests
arbitraryList :: Arbitrary a => Gen [[a]]
arbitraryList :: forall a. Arbitrary a => Gen [[a]]
arbitraryList =
Int -> Gen [[a]] -> Gen [[a]]
forall a. HasCallStack => Int -> Gen a -> Gen a
resize Int
3 (Gen [a] -> Gen [[a]]
forall a. Gen a -> Gen [a]
listOf (Int -> Gen [a] -> Gen [a]
forall a. HasCallStack => Int -> Gen a -> Gen a
resize Int
3 (Gen a -> Gen [a]
forall a. Gen a -> Gen [a]
listOf (Int -> Gen a -> Gen a
forall a. HasCallStack => Int -> Gen a -> Gen a
resize Int
100 Gen a
forall a. Arbitrary a => Gen a
arbitrary))))
instance Arbitrary req => Arbitrary (ClientAndServerData req) where
arbitrary :: Gen (ClientAndServerData req)
arbitrary =
req -> [[req]] -> [[req]] -> [[req]] -> ClientAndServerData req
forall req.
req -> [[req]] -> [[req]] -> [[req]] -> ClientAndServerData req
ClientAndServerData (req -> [[req]] -> [[req]] -> [[req]] -> ClientAndServerData req)
-> Gen req
-> Gen ([[req]] -> [[req]] -> [[req]] -> ClientAndServerData req)
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> Gen req
forall a. Arbitrary a => Gen a
arbitrary
Gen ([[req]] -> [[req]] -> [[req]] -> ClientAndServerData req)
-> Gen [[req]]
-> Gen ([[req]] -> [[req]] -> ClientAndServerData req)
forall a b. Gen (a -> b) -> Gen a -> Gen b
forall (f :: * -> *) a b. Applicative f => f (a -> b) -> f a -> f b
<*> Gen [[req]]
forall a. Arbitrary a => Gen [[a]]
arbitraryList
Gen ([[req]] -> [[req]] -> ClientAndServerData req)
-> Gen [[req]] -> Gen ([[req]] -> ClientAndServerData req)
forall a b. Gen (a -> b) -> Gen a -> Gen b
forall (f :: * -> *) a b. Applicative f => f (a -> b) -> f a -> f b
<*> Gen [[req]]
forall a. Arbitrary a => Gen [[a]]
arbitraryList
Gen ([[req]] -> ClientAndServerData req)
-> Gen [[req]] -> Gen (ClientAndServerData req)
forall a b. Gen (a -> b) -> Gen a -> Gen b
forall (f :: * -> *) a b. Applicative f => f (a -> b) -> f a -> f b
<*> Gen [[req]]
forall a. Arbitrary a => Gen [[a]]
arbitraryList
shrink :: ClientAndServerData req -> [ClientAndServerData req]
shrink (ClientAndServerData req
ini [[req]]
hot [[req]]
warm [[req]]
est) = [[ClientAndServerData req]] -> [ClientAndServerData req]
forall (t :: * -> *) a. Foldable t => t [a] -> [a]
concat
[ req -> [req]
forall a. Arbitrary a => a -> [a]
shrink req
ini [req]
-> (req -> ClientAndServerData req) -> [ClientAndServerData req]
forall (f :: * -> *) a b. Functor f => f a -> (a -> b) -> f b
<&> \ req
ini' -> req -> [[req]] -> [[req]] -> [[req]] -> ClientAndServerData req
forall req.
req -> [[req]] -> [[req]] -> [[req]] -> ClientAndServerData req
ClientAndServerData req
ini' [[req]]
hot [[req]]
warm [[req]]
est
, [[req]] -> [[[req]]]
forall a. Arbitrary a => a -> [a]
shrink [[req]]
hot [[[req]]]
-> ([[req]] -> ClientAndServerData req)
-> [ClientAndServerData req]
forall (f :: * -> *) a b. Functor f => f a -> (a -> b) -> f b
<&> \ [[req]]
hot' -> req -> [[req]] -> [[req]] -> [[req]] -> ClientAndServerData req
forall req.
req -> [[req]] -> [[req]] -> [[req]] -> ClientAndServerData req
ClientAndServerData req
ini [[req]]
hot' [[req]]
warm [[req]]
est
, [[req]] -> [[[req]]]
forall a. Arbitrary a => a -> [a]
shrink [[req]]
warm [[[req]]]
-> ([[req]] -> ClientAndServerData req)
-> [ClientAndServerData req]
forall (f :: * -> *) a b. Functor f => f a -> (a -> b) -> f b
<&> \ [[req]]
warm' -> req -> [[req]] -> [[req]] -> [[req]] -> ClientAndServerData req
forall req.
req -> [[req]] -> [[req]] -> [[req]] -> ClientAndServerData req
ClientAndServerData req
ini [[req]]
hot [[req]]
warm' [[req]]
est
, [[req]] -> [[[req]]]
forall a. Arbitrary a => a -> [a]
shrink [[req]]
est [[[req]]]
-> ([[req]] -> ClientAndServerData req)
-> [ClientAndServerData req]
forall (f :: * -> *) a b. Functor f => f a -> (a -> b) -> f b
<&> \ [[req]]
est' -> req -> [[req]] -> [[req]] -> [[req]] -> ClientAndServerData req
forall req.
req -> [[req]] -> [[req]] -> [[req]] -> ClientAndServerData req
ClientAndServerData req
ini [[req]]
hot [[req]]
warm [[req]]
est'
]
expectedResult :: ClientAndServerData req
-> ClientAndServerData req
-> [TemperatureBundle [[req]]]
expectedResult :: forall req.
ClientAndServerData req
-> ClientAndServerData req -> [TemperatureBundle [[req]]]
expectedResult client :: ClientAndServerData req
client@ClientAndServerData
{ [[req]]
hotInitiatorRequests :: forall req. ClientAndServerData req -> [[req]]
hotInitiatorRequests :: [[req]]
hotInitiatorRequests
, [[req]]
warmInitiatorRequests :: forall req. ClientAndServerData req -> [[req]]
warmInitiatorRequests :: [[req]]
warmInitiatorRequests
, [[req]]
establishedInitiatorRequests :: forall req. ClientAndServerData req -> [[req]]
establishedInitiatorRequests :: [[req]]
establishedInitiatorRequests
}
ClientAndServerData { req
accumulatorInit :: forall req. ClientAndServerData req -> req
accumulatorInit :: req
accumulatorInit
} =
[[req]] -> [[req]] -> [[req]] -> [TemperatureBundle [[req]]]
go
(Int -> [[req]] -> [[req]]
forall a. Int -> [a] -> [a]
take Int
rounds ([[req]] -> [[req]]) -> [[req]] -> [[req]]
forall a b. (a -> b) -> a -> b
$ [[req]]
hotInitiatorRequests [[req]] -> [[req]] -> [[req]]
forall a. [a] -> [a] -> [a]
++ [req] -> [[req]]
forall a. a -> [a]
repeat [])
(Int -> [[req]] -> [[req]]
forall a. Int -> [a] -> [a]
take Int
rounds ([[req]] -> [[req]]) -> [[req]] -> [[req]]
forall a b. (a -> b) -> a -> b
$ [[req]]
warmInitiatorRequests [[req]] -> [[req]] -> [[req]]
forall a. [a] -> [a] -> [a]
++ [req] -> [[req]]
forall a. a -> [a]
repeat [])
(Int -> [[req]] -> [[req]]
forall a. Int -> [a] -> [a]
take Int
rounds ([[req]] -> [[req]]) -> [[req]] -> [[req]]
forall a b. (a -> b) -> a -> b
$ [[req]]
establishedInitiatorRequests [[req]] -> [[req]] -> [[req]]
forall a. [a] -> [a] -> [a]
++ [req] -> [[req]]
forall a. a -> [a]
repeat [])
where
rounds :: Int
rounds = ClientAndServerData req -> Int
forall req. ClientAndServerData req -> Int
numberOfRounds ClientAndServerData req
client
fn :: [a] -> a -> ([a], [a])
fn [a]
acc a
x = (a
x a -> [a] -> [a]
forall a. a -> [a] -> [a]
: [a]
acc, a
x a -> [a] -> [a]
forall a. a -> [a] -> [a]
: [a]
acc)
go :: [[req]] -> [[req]] -> [[req]] -> [TemperatureBundle [[req]]]
go ([req]
a : [[req]]
as) ([req]
b : [[req]]
bs) ([req]
c : [[req]]
cs) =
WithProtocolTemperature 'Hot [[req]]
-> WithProtocolTemperature 'Warm [[req]]
-> WithProtocolTemperature 'Established [[req]]
-> TemperatureBundle [[req]]
forall a.
WithProtocolTemperature 'Hot a
-> WithProtocolTemperature 'Warm a
-> WithProtocolTemperature 'Established a
-> TemperatureBundle a
TemperatureBundle
([[req]] -> WithProtocolTemperature 'Hot [[req]]
forall a. a -> WithProtocolTemperature 'Hot a
WithHot (([req], [[req]]) -> [[req]]
forall a b. (a, b) -> b
snd (([req], [[req]]) -> [[req]]) -> ([req], [[req]]) -> [[req]]
forall a b. (a -> b) -> a -> b
$ ([req] -> req -> ([req], [req]))
-> [req] -> [req] -> ([req], [[req]])
forall (t :: * -> *) s a b.
Traversable t =>
(s -> a -> (s, b)) -> s -> t a -> (s, t b)
mapAccumL [req] -> req -> ([req], [req])
forall {a}. [a] -> a -> ([a], [a])
fn [req
accumulatorInit] [req]
a))
([[req]] -> WithProtocolTemperature 'Warm [[req]]
forall a. a -> WithProtocolTemperature 'Warm a
WithWarm (([req], [[req]]) -> [[req]]
forall a b. (a, b) -> b
snd (([req], [[req]]) -> [[req]]) -> ([req], [[req]]) -> [[req]]
forall a b. (a -> b) -> a -> b
$ ([req] -> req -> ([req], [req]))
-> [req] -> [req] -> ([req], [[req]])
forall (t :: * -> *) s a b.
Traversable t =>
(s -> a -> (s, b)) -> s -> t a -> (s, t b)
mapAccumL [req] -> req -> ([req], [req])
forall {a}. [a] -> a -> ([a], [a])
fn [req
accumulatorInit] [req]
b))
([[req]] -> WithProtocolTemperature 'Established [[req]]
forall a. a -> WithProtocolTemperature 'Established a
WithEstablished (([req], [[req]]) -> [[req]]
forall a b. (a, b) -> b
snd (([req], [[req]]) -> [[req]]) -> ([req], [[req]]) -> [[req]]
forall a b. (a -> b) -> a -> b
$ ([req] -> req -> ([req], [req]))
-> [req] -> [req] -> ([req], [[req]])
forall (t :: * -> *) s a b.
Traversable t =>
(s -> a -> (s, b)) -> s -> t a -> (s, t b)
mapAccumL [req] -> req -> ([req], [req])
forall {a}. [a] -> a -> ([a], [a])
fn [req
accumulatorInit] [req]
c))
TemperatureBundle [[req]]
-> [TemperatureBundle [[req]]] -> [TemperatureBundle [[req]]]
forall a. a -> [a] -> [a]
: [[req]] -> [[req]] -> [[req]] -> [TemperatureBundle [[req]]]
go [[req]]
as [[req]]
bs [[req]]
cs
go [] [] [] = []
go [[req]]
_ [[req]]
_ [[req]]
_ = String -> [TemperatureBundle [[req]]]
forall a. HasCallStack => String -> a
error String
"expectedResult: impossible happened"
noNextRequests :: forall stm req peerAddr. Applicative stm => TemperatureBundle (ConnectionId peerAddr -> stm [req])
noNextRequests :: forall (stm :: * -> *) req peerAddr.
Applicative stm =>
TemperatureBundle (ConnectionId peerAddr -> stm [req])
noNextRequests = (ConnectionId peerAddr -> stm [req])
-> TemperatureBundle (ConnectionId peerAddr -> stm [req])
forall a. a -> TemperatureBundle a
forall (f :: * -> *) a. Applicative f => a -> f a
pure ((ConnectionId peerAddr -> stm [req])
-> TemperatureBundle (ConnectionId peerAddr -> stm [req]))
-> (ConnectionId peerAddr -> stm [req])
-> TemperatureBundle (ConnectionId peerAddr -> stm [req])
forall a b. (a -> b) -> a -> b
$ \ConnectionId peerAddr
_ -> [req] -> stm [req]
forall a. a -> stm a
forall (f :: * -> *) a. Applicative f => a -> f a
pure []
oneshotNextRequests
:: forall req peerAddr m. MonadSTM m
=> ClientAndServerData req
-> m (TemperatureBundle (ConnectionId peerAddr -> STM m [req]))
oneshotNextRequests :: forall req peerAddr (m :: * -> *).
MonadSTM m =>
ClientAndServerData req
-> m (TemperatureBundle (ConnectionId peerAddr -> STM m [req]))
oneshotNextRequests ClientAndServerData {
[[req]]
hotInitiatorRequests :: forall req. ClientAndServerData req -> [[req]]
hotInitiatorRequests :: [[req]]
hotInitiatorRequests,
[[req]]
warmInitiatorRequests :: forall req. ClientAndServerData req -> [[req]]
warmInitiatorRequests :: [[req]]
warmInitiatorRequests,
[[req]]
establishedInitiatorRequests :: forall req. ClientAndServerData req -> [[req]]
establishedInitiatorRequests :: [[req]]
establishedInitiatorRequests
} = do
hotRequestsVar <- [[req]] -> m (StrictTVar m [[req]])
forall (m :: * -> *) a. MonadSTM m => a -> m (StrictTVar m a)
newTVarIO [[req]]
hotInitiatorRequests
warmRequestsVar <- newTVarIO warmInitiatorRequests
establishedRequestsVar <- newTVarIO establishedInitiatorRequests
return $ TemperatureBundle
(WithHot hotRequestsVar)
(WithWarm warmRequestsVar)
(WithEstablished establishedRequestsVar)
<&> \ StrictTVar m [[req]]
reqVar ConnectionId peerAddr
_ -> StrictTVar m [[req]] -> STM m [req]
forall {m :: * -> *} {a}.
MonadSTM m =>
StrictTVar m [[a]] -> STM m [a]
popRequests StrictTVar m [[req]]
reqVar
where
popRequests :: StrictTVar m [[a]] -> STM m [a]
popRequests StrictTVar m [[a]]
requestsVar = do
requests <- StrictTVar m [[a]] -> STM m [[a]]
forall (m :: * -> *) a. MonadSTM m => StrictTVar m a -> STM m a
readTVar StrictTVar m [[a]]
requestsVar
case requests of
[a]
reqs : [[a]]
rest -> StrictTVar m [[a]] -> [[a]] -> STM m ()
forall (m :: * -> *) a.
MonadSTM m =>
StrictTVar m a -> a -> STM m ()
writeTVar StrictTVar m [[a]]
requestsVar [[a]]
rest STM m () -> [a] -> STM m [a]
forall (f :: * -> *) a b. Functor f => f a -> b -> f b
$> [a]
reqs
[] -> [a] -> STM m [a]
forall a. a -> STM m a
forall (f :: * -> *) a. Applicative f => a -> f a
pure []
type ConnectionManagerMonad m =
( Alternative (STM m), MonadAsync m, MonadCatch m, MonadEvaluate m,
MonadFork m, MonadMask m, MonadST m, MonadTime m, MonadTimer m,
MonadThrow m, MonadThrow (STM m)
)
withInitiatorOnlyConnectionManager
:: forall name peerAddr socket req resp m a.
( ConnectionManagerMonad m
, resp ~ [req]
, Ord peerAddr, Show peerAddr, Typeable peerAddr
, Serialise req, Typeable req
, MonadAsync m
, MonadDelay m
, MonadFix m
, MonadLabelledSTM m
, MonadTraceSTM m
, MonadSay m, Show req
, Show name
)
=> name
-> Timeouts
-> Tracer m (WithName name (AbstractTransitionTrace CM.ConnStateId))
-> Tracer m (WithName name
(CM.Trace
peerAddr
(ConnectionHandlerTrace UnversionedProtocol DataFlowProtocolData)))
-> StdGen
-> Snocket m socket peerAddr
-> Mx.MakeBearer m socket
-> CM.ConnStateIdSupply m
-> Maybe peerAddr
-> TemperatureBundle (ConnectionId peerAddr -> STM m [req])
-> ProtocolTimeLimits (Handshake UnversionedProtocol Term)
-> AcceptedConnectionsLimit
-> (ConnectionManagerWithExpandedCtx
Mx.InitiatorMode socket peerAddr
DataFlowProtocolData UnversionedProtocol ByteString m [resp] Void
-> m a)
-> m a
withInitiatorOnlyConnectionManager :: forall name peerAddr socket req resp (m :: * -> *) a.
(ConnectionManagerMonad m, resp ~ [req], Ord peerAddr,
Show peerAddr, Typeable peerAddr, Serialise req, Typeable req,
MonadAsync m, MonadDelay m, MonadFix m, MonadLabelledSTM m,
MonadTraceSTM m, MonadSay m, Show req, Show name) =>
name
-> Timeouts
-> Tracer m (WithName name (AbstractTransitionTrace ConnStateId))
-> Tracer
m
(WithName
name
(Trace
peerAddr
(ConnectionHandlerTrace UnversionedProtocol DataFlowProtocolData)))
-> StdGen
-> Snocket m socket peerAddr
-> MakeBearer m socket
-> ConnStateIdSupply m
-> Maybe peerAddr
-> TemperatureBundle (ConnectionId peerAddr -> STM m [req])
-> ProtocolTimeLimits (Handshake UnversionedProtocol Term)
-> AcceptedConnectionsLimit
-> (ConnectionManagerWithExpandedCtx
'InitiatorMode
socket
peerAddr
DataFlowProtocolData
UnversionedProtocol
ByteString
m
[resp]
Void
-> m a)
-> m a
withInitiatorOnlyConnectionManager name
name Timeouts
timeouts Tracer m (WithName name (AbstractTransitionTrace ConnStateId))
trTracer Tracer
m
(WithName
name
(Trace
peerAddr
(ConnectionHandlerTrace UnversionedProtocol DataFlowProtocolData)))
tracer StdGen
stdGen Snocket m socket peerAddr
snocket MakeBearer m socket
makeBearer ConnStateIdSupply m
connStateIdSupply
Maybe peerAddr
localAddr TemperatureBundle (ConnectionId peerAddr -> STM m [req])
nextRequests ProtocolTimeLimits (Handshake UnversionedProtocol Term)
handshakeTimeLimits AcceptedConnectionsLimit
acceptedConnLimit ConnectionManagerWithExpandedCtx
'InitiatorMode
socket
peerAddr
DataFlowProtocolData
UnversionedProtocol
ByteString
m
[resp]
Void
-> m a
k = do
mainThreadId <- m (ThreadId m)
forall (m :: * -> *). MonadThread m => m (ThreadId m)
myThreadId
let muxTracers :: Mx.TracersWithBearer (ConnectionId peerAddr) m
muxTracers = Mx.Tracers {
tracer :: Tracer m (WithBearer (ConnectionId peerAddr) Trace)
Mx.tracer = name
-> WithBearer (ConnectionId peerAddr) Trace
-> WithName name (WithBearer (ConnectionId peerAddr) Trace)
forall name event. name -> event -> WithName name event
WithName name
name (WithBearer (ConnectionId peerAddr) Trace
-> WithName name (WithBearer (ConnectionId peerAddr) Trace))
-> Tracer
m (WithName name (WithBearer (ConnectionId peerAddr) Trace))
-> Tracer m (WithBearer (ConnectionId peerAddr) Trace)
forall a' a. (a' -> a) -> Tracer m a -> Tracer m a'
forall (f :: * -> *) a' a.
Contravariant f =>
(a' -> a) -> f a -> f a'
`contramap` Tracer m (WithName name (WithBearer (ConnectionId peerAddr) Trace))
forall (m :: * -> *) a. Applicative m => Tracer m a
nullTracer,
channelTracer :: Tracer m (WithBearer (ConnectionId peerAddr) ChannelTrace)
Mx.channelTracer = name
-> WithBearer (ConnectionId peerAddr) ChannelTrace
-> WithName name (WithBearer (ConnectionId peerAddr) ChannelTrace)
forall name event. name -> event -> WithName name event
WithName name
name (WithBearer (ConnectionId peerAddr) ChannelTrace
-> WithName name (WithBearer (ConnectionId peerAddr) ChannelTrace))
-> Tracer
m (WithName name (WithBearer (ConnectionId peerAddr) ChannelTrace))
-> Tracer m (WithBearer (ConnectionId peerAddr) ChannelTrace)
forall a' a. (a' -> a) -> Tracer m a -> Tracer m a'
forall (f :: * -> *) a' a.
Contravariant f =>
(a' -> a) -> f a -> f a'
`contramap` Tracer
m (WithName name (WithBearer (ConnectionId peerAddr) ChannelTrace))
forall (m :: * -> *) a. Applicative m => Tracer m a
nullTracer,
bearerTracer :: Tracer m (WithBearer (ConnectionId peerAddr) BearerTrace)
Mx.bearerTracer = name
-> WithBearer (ConnectionId peerAddr) BearerTrace
-> WithName name (WithBearer (ConnectionId peerAddr) BearerTrace)
forall name event. name -> event -> WithName name event
WithName name
name (WithBearer (ConnectionId peerAddr) BearerTrace
-> WithName name (WithBearer (ConnectionId peerAddr) BearerTrace))
-> Tracer
m (WithName name (WithBearer (ConnectionId peerAddr) BearerTrace))
-> Tracer m (WithBearer (ConnectionId peerAddr) BearerTrace)
forall a' a. (a' -> a) -> Tracer m a -> Tracer m a'
forall (f :: * -> *) a' a.
Contravariant f =>
(a' -> a) -> f a -> f a'
`contramap` Tracer
m (WithName name (WithBearer (ConnectionId peerAddr) BearerTrace))
forall (m :: * -> *) a. Applicative m => Tracer m a
nullTracer
}
mkConnectionHandler =
TracersWithBearer (ConnectionId peerAddr) m
-> ForkPolicy peerAddr
-> HandshakeArguments
(ConnectionId peerAddr) UnversionedProtocol DataFlowProtocolData m
-> Versions
UnversionedProtocol
DataFlowProtocolData
(OuroborosBundle
'InitiatorMode
(ExpandedInitiatorContext peerAddr m)
(ResponderContext peerAddr)
ByteString
m
[resp]
Void)
-> (ThreadId m, RethrowPolicy)
-> MkMuxConnectionHandler
'InitiatorMode
socket
(ExpandedInitiatorContext peerAddr m)
(ResponderContext peerAddr)
peerAddr
UnversionedProtocol
DataFlowProtocolData
ByteString
m
[resp]
Void
-> MuxConnectionHandler
'InitiatorMode
socket
(ExpandedInitiatorContext peerAddr m)
(ResponderContext peerAddr)
peerAddr
UnversionedProtocol
DataFlowProtocolData
ByteString
m
[resp]
Void
forall initiatorCtx responderCtx peerAddr (muxMode :: Mode) socket
versionNumber versionData (m :: * -> *) a b.
(Alternative (STM m), MonadAsync m, MonadDelay m, MonadFork m,
MonadLabelledSTM m, MonadThrow (STM m), MonadTimer m, MonadMask m,
Ord versionNumber, Show peerAddr, Typeable peerAddr) =>
TracersWithBearer (ConnectionId peerAddr) m
-> ForkPolicy peerAddr
-> HandshakeArguments
(ConnectionId peerAddr) versionNumber versionData m
-> Versions
versionNumber
versionData
(OuroborosBundle
muxMode initiatorCtx responderCtx ByteString m a b)
-> (ThreadId m, RethrowPolicy)
-> MkMuxConnectionHandler
muxMode
socket
initiatorCtx
responderCtx
peerAddr
versionNumber
versionData
ByteString
m
a
b
-> MuxConnectionHandler
muxMode
socket
initiatorCtx
responderCtx
peerAddr
versionNumber
versionData
ByteString
m
a
b
makeConnectionHandler
TracersWithBearer (ConnectionId peerAddr) m
muxTracers
ForkPolicy peerAddr
forall peerAddr. ForkPolicy peerAddr
noBindForkPolicy
HandshakeArguments {
haHandshakeTracer :: Tracer
m
(WithBearer
(ConnectionId peerAddr)
(TraceSendRecv (Handshake UnversionedProtocol Term)))
haHandshakeTracer = name
-> WithBearer
(ConnectionId peerAddr)
(TraceSendRecv (Handshake UnversionedProtocol Term))
-> WithName
name
(WithBearer
(ConnectionId peerAddr)
(TraceSendRecv (Handshake UnversionedProtocol Term)))
forall name event. name -> event -> WithName name event
WithName name
name (WithBearer
(ConnectionId peerAddr)
(TraceSendRecv (Handshake UnversionedProtocol Term))
-> WithName
name
(WithBearer
(ConnectionId peerAddr)
(TraceSendRecv (Handshake UnversionedProtocol Term))))
-> Tracer
m
(WithName
name
(WithBearer
(ConnectionId peerAddr)
(TraceSendRecv (Handshake UnversionedProtocol Term))))
-> Tracer
m
(WithBearer
(ConnectionId peerAddr)
(TraceSendRecv (Handshake UnversionedProtocol Term)))
forall a' a. (a' -> a) -> Tracer m a -> Tracer m a'
forall (f :: * -> *) a' a.
Contravariant f =>
(a' -> a) -> f a -> f a'
`contramap` Tracer
m
(WithName
name
(WithBearer
(ConnectionId peerAddr)
(TraceSendRecv (Handshake UnversionedProtocol Term))))
forall (m :: * -> *) a. Applicative m => Tracer m a
nullTracer,
haBearerTracer :: Tracer m (WithBearer (ConnectionId peerAddr) BearerTrace)
haBearerTracer = name
-> WithBearer (ConnectionId peerAddr) BearerTrace
-> WithName name (WithBearer (ConnectionId peerAddr) BearerTrace)
forall name event. name -> event -> WithName name event
WithName name
name (WithBearer (ConnectionId peerAddr) BearerTrace
-> WithName name (WithBearer (ConnectionId peerAddr) BearerTrace))
-> Tracer
m (WithName name (WithBearer (ConnectionId peerAddr) BearerTrace))
-> Tracer m (WithBearer (ConnectionId peerAddr) BearerTrace)
forall a' a. (a' -> a) -> Tracer m a -> Tracer m a'
forall (f :: * -> *) a' a.
Contravariant f =>
(a' -> a) -> f a -> f a'
`contramap` Tracer
m (WithName name (WithBearer (ConnectionId peerAddr) BearerTrace))
forall (m :: * -> *) a. Applicative m => Tracer m a
nullTracer,
haHandshakeCodec :: Codec
(Handshake UnversionedProtocol Term)
DeserialiseFailure
m
ByteString
haHandshakeCodec = Codec
(Handshake UnversionedProtocol Term)
DeserialiseFailure
m
ByteString
forall (m :: * -> *).
MonadST m =>
Codec
(Handshake UnversionedProtocol Term)
DeserialiseFailure
m
ByteString
unversionedHandshakeCodec,
haVersionDataCodec :: VersionDataCodec Term UnversionedProtocol DataFlowProtocolData
haVersionDataCodec = (UnversionedProtocol -> CodecCBORTerm Text DataFlowProtocolData)
-> VersionDataCodec Term UnversionedProtocol DataFlowProtocolData
forall vNumber vData.
(vNumber -> CodecCBORTerm Text vData)
-> VersionDataCodec Term vNumber vData
cborTermVersionDataCodec UnversionedProtocol -> CodecCBORTerm Text DataFlowProtocolData
dataFlowProtocolDataCodec,
haAcceptVersion :: DataFlowProtocolData
-> DataFlowProtocolData -> Accept DataFlowProtocolData
haAcceptVersion = DataFlowProtocolData
-> DataFlowProtocolData -> Accept DataFlowProtocolData
forall v. Acceptable v => v -> v -> Accept v
acceptableVersion,
haQueryVersion :: DataFlowProtocolData -> Bool
haQueryVersion = DataFlowProtocolData -> Bool
forall v. Queryable v => v -> Bool
queryVersion,
haTimeLimits :: ProtocolTimeLimits (Handshake UnversionedProtocol Term)
haTimeLimits = ProtocolTimeLimits (Handshake UnversionedProtocol Term)
handshakeTimeLimits
}
(DataFlow
-> OuroborosBundle
'InitiatorMode
(ExpandedInitiatorContext peerAddr m)
(ResponderContext peerAddr)
ByteString
m
[resp]
Void
-> Versions
UnversionedProtocol
DataFlowProtocolData
(OuroborosBundle
'InitiatorMode
(ExpandedInitiatorContext peerAddr m)
(ResponderContext peerAddr)
ByteString
m
[resp]
Void)
forall app.
DataFlow
-> app -> Versions UnversionedProtocol DataFlowProtocolData app
dataFlowProtocol DataFlow
Unidirectional OuroborosBundle
'InitiatorMode
(ExpandedInitiatorContext peerAddr m)
(ResponderContext peerAddr)
ByteString
m
[resp]
Void
clientApplication)
(ThreadId m
mainThreadId, RethrowPolicy
debugMuxErrorRethrowPolicy
RethrowPolicy -> RethrowPolicy -> RethrowPolicy
forall a. Semigroup a => a -> a -> a
<> RethrowPolicy
debugMuxRuntimeErrorRethrowPolicy
RethrowPolicy -> RethrowPolicy -> RethrowPolicy
forall a. Semigroup a => a -> a -> a
<> RethrowPolicy
debugIOErrorRethrowPolicy
RethrowPolicy -> RethrowPolicy -> RethrowPolicy
forall a. Semigroup a => a -> a -> a
<> RethrowPolicy
assertRethrowPolicy)
MkMuxConnectionHandler
'InitiatorMode
socket
(ExpandedInitiatorContext peerAddr m)
(ResponderContext peerAddr)
peerAddr
UnversionedProtocol
DataFlowProtocolData
ByteString
m
[resp]
Void
forall socket initiatorCtx responderCtx peerAddr versionNumber
versionData bytes (m :: * -> *) a b.
MkMuxConnectionHandler
'InitiatorMode
socket
initiatorCtx
responderCtx
peerAddr
versionNumber
versionData
bytes
m
a
b
MuxInitiatorConnectionHandler
CM.with CM.Arguments {
tracer = WithName name
`contramap` tracer,
trTracer = (WithName name . fmap CM.abstractState)
`contramap` trTracer,
ipv4Address = localAddr,
ipv6Address = Nothing,
addressType = \peerAddr
_ -> AddressType -> Maybe AddressType
forall a. a -> Maybe a
Just AddressType
IPv4Address,
snocket,
makeBearer,
withBuffer = \Maybe (ReadBuffer m) -> m ()
f -> Maybe (ReadBuffer m) -> m ()
f Maybe (ReadBuffer m)
forall a. Maybe a
Nothing,
configureSocket = \socket
_ Maybe peerAddr
_ -> () -> m ()
forall a. a -> m a
forall (m :: * -> *) a. Monad m => a -> m a
return (),
connectionDataFlow = \(DataFlowProtocolData DataFlow
df PeerSharing
_) -> DataFlow
df,
timeWaitTimeout = tTimeWaitTimeout timeouts,
outboundIdleTimeout = tOutboundIdleTimeout timeouts,
prunePolicy = simplePrunePolicy,
stdGen,
connectionsLimits = acceptedConnLimit,
updateVersionData = \DataFlowProtocolData
a DiffusionMode
_ -> DataFlowProtocolData
a,
connStateIdSupply,
classifyHandlerError = \HandlerError UnversionedProtocol
_ -> HandlerErrorType
HandshakeFailure
}
NotInResponderMode
mkConnectionHandler
\ConnectionManagerWithExpandedCtx
'InitiatorMode
socket
peerAddr
DataFlowProtocolData
UnversionedProtocol
ByteString
m
[resp]
Void
cm ->
ConnectionManagerWithExpandedCtx
'InitiatorMode
socket
peerAddr
DataFlowProtocolData
UnversionedProtocol
ByteString
m
[resp]
Void
-> m a
k ConnectionManagerWithExpandedCtx
'InitiatorMode
socket
peerAddr
DataFlowProtocolData
UnversionedProtocol
ByteString
m
[resp]
Void
cm m a -> (SomeException -> m a) -> m a
forall e a. Exception e => m a -> (e -> m a) -> m a
forall (m :: * -> *) e a.
(MonadCatch m, Exception e) =>
m a -> (e -> m a) -> m a
`catch` \(SomeException
e :: SomeException) -> SomeException -> m a
forall e a. Exception e => e -> m a
forall (m :: * -> *) e a. (MonadThrow m, Exception e) => e -> m a
throwIO SomeException
e
where
clientApplication :: TemperatureBundle
[MiniProtocol Mx.InitiatorMode
(ExpandedInitiatorContext peerAddr m)
(ResponderContext peerAddr)
ByteString m [resp] Void]
clientApplication :: OuroborosBundle
'InitiatorMode
(ExpandedInitiatorContext peerAddr m)
(ResponderContext peerAddr)
ByteString
m
[resp]
Void
clientApplication = MiniProtocolNum
-> (ConnectionId peerAddr -> STM m [req])
-> [MiniProtocol
'InitiatorMode
(ExpandedInitiatorContext peerAddr m)
(ResponderContext peerAddr)
ByteString
m
[resp]
Void]
mkProto (MiniProtocolNum
-> (ConnectionId peerAddr -> STM m [req])
-> [MiniProtocol
'InitiatorMode
(ExpandedInitiatorContext peerAddr m)
(ResponderContext peerAddr)
ByteString
m
[resp]
Void])
-> TemperatureBundle MiniProtocolNum
-> TemperatureBundle
((ConnectionId peerAddr -> STM m [req])
-> [MiniProtocol
'InitiatorMode
(ExpandedInitiatorContext peerAddr m)
(ResponderContext peerAddr)
ByteString
m
[resp]
Void])
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> (Word16 -> MiniProtocolNum
Mx.MiniProtocolNum (Word16 -> MiniProtocolNum)
-> TemperatureBundle Word16 -> TemperatureBundle MiniProtocolNum
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> TemperatureBundle Word16
nums)
TemperatureBundle
((ConnectionId peerAddr -> STM m [req])
-> [MiniProtocol
'InitiatorMode
(ExpandedInitiatorContext peerAddr m)
(ResponderContext peerAddr)
ByteString
m
[resp]
Void])
-> TemperatureBundle (ConnectionId peerAddr -> STM m [req])
-> OuroborosBundle
'InitiatorMode
(ExpandedInitiatorContext peerAddr m)
(ResponderContext peerAddr)
ByteString
m
[resp]
Void
forall a b.
TemperatureBundle (a -> b)
-> TemperatureBundle a -> TemperatureBundle b
forall (f :: * -> *) a b. Applicative f => f (a -> b) -> f a -> f b
<*> TemperatureBundle (ConnectionId peerAddr -> STM m [req])
nextRequests
where nums :: TemperatureBundle Word16
nums = WithProtocolTemperature 'Hot Word16
-> WithProtocolTemperature 'Warm Word16
-> WithProtocolTemperature 'Established Word16
-> TemperatureBundle Word16
forall a.
WithProtocolTemperature 'Hot a
-> WithProtocolTemperature 'Warm a
-> WithProtocolTemperature 'Established a
-> TemperatureBundle a
TemperatureBundle (Word16 -> WithProtocolTemperature 'Hot Word16
forall a. a -> WithProtocolTemperature 'Hot a
WithHot Word16
1) (Word16 -> WithProtocolTemperature 'Warm Word16
forall a. a -> WithProtocolTemperature 'Warm a
WithWarm Word16
2) (Word16 -> WithProtocolTemperature 'Established Word16
forall a. a -> WithProtocolTemperature 'Established a
WithEstablished Word16
3)
mkProto :: MiniProtocolNum
-> (ConnectionId peerAddr -> STM m [req])
-> [MiniProtocol
'InitiatorMode
(ExpandedInitiatorContext peerAddr m)
(ResponderContext peerAddr)
ByteString
m
[resp]
Void]
mkProto MiniProtocolNum
miniProtocolNum ConnectionId peerAddr -> STM m [req]
nextRequest =
[MiniProtocol {
MiniProtocolNum
miniProtocolNum :: MiniProtocolNum
miniProtocolNum :: MiniProtocolNum
miniProtocolNum,
miniProtocolStart :: StartOnDemandOrEagerly
miniProtocolStart = StartOnDemandOrEagerly
StartOnDemand,
miniProtocolLimits :: MiniProtocolLimits
miniProtocolLimits = Int -> MiniProtocolLimits
Mx.MiniProtocolLimits Int
forall a. Bounded a => a
maxBound,
miniProtocolRun :: RunMiniProtocol
'InitiatorMode
(ExpandedInitiatorContext peerAddr m)
(ResponderContext peerAddr)
ByteString
m
[resp]
Void
miniProtocolRun = MiniProtocolNum
-> (ConnectionId peerAddr -> STM m [req])
-> RunMiniProtocol
'InitiatorMode
(ExpandedInitiatorContext peerAddr m)
(ResponderContext peerAddr)
ByteString
m
[resp]
Void
reqRespInitiator MiniProtocolNum
miniProtocolNum
ConnectionId peerAddr -> STM m [req]
nextRequest
}]
reqRespInitiator :: Mx.MiniProtocolNum
-> (ConnectionId peerAddr -> STM m [req])
-> RunMiniProtocol Mx.InitiatorMode
(ExpandedInitiatorContext peerAddr m)
(ResponderContext peerAddr)
ByteString m [resp] Void
reqRespInitiator :: MiniProtocolNum
-> (ConnectionId peerAddr -> STM m [req])
-> RunMiniProtocol
'InitiatorMode
(ExpandedInitiatorContext peerAddr m)
(ResponderContext peerAddr)
ByteString
m
[resp]
Void
reqRespInitiator MiniProtocolNum
protocolNum ConnectionId peerAddr -> STM m [req]
nextRequest =
MiniProtocolCb
(ExpandedInitiatorContext peerAddr m) ByteString m [resp]
-> RunMiniProtocol
'InitiatorMode
(ExpandedInitiatorContext peerAddr m)
(ResponderContext peerAddr)
ByteString
m
[resp]
Void
forall initiatorCtx bytes (m :: * -> *) a responderCtx.
MiniProtocolCb initiatorCtx bytes m a
-> RunMiniProtocol
'InitiatorMode initiatorCtx responderCtx bytes m a Void
InitiatorProtocolOnly
((ExpandedInitiatorContext peerAddr m
-> Channel m ByteString -> m ([resp], Maybe ByteString))
-> MiniProtocolCb
(ExpandedInitiatorContext peerAddr m) ByteString m [resp]
forall ctx bytes (m :: * -> *) a.
(ctx -> Channel m bytes -> m (a, Maybe bytes))
-> MiniProtocolCb ctx bytes m a
MiniProtocolCb ((ExpandedInitiatorContext peerAddr m
-> Channel m ByteString -> m ([resp], Maybe ByteString))
-> MiniProtocolCb
(ExpandedInitiatorContext peerAddr m) ByteString m [resp])
-> (ExpandedInitiatorContext peerAddr m
-> Channel m ByteString -> m ([resp], Maybe ByteString))
-> MiniProtocolCb
(ExpandedInitiatorContext peerAddr m) ByteString m [resp]
forall a b. (a -> b) -> a -> b
$ \ExpandedInitiatorContext { eicConnectionId :: forall addr (m :: * -> *).
ExpandedInitiatorContext addr m -> ConnectionId addr
eicConnectionId = ConnectionId peerAddr
connId } Channel m ByteString
channel ->
Tracer m (TraceSendRecv (ReqResp req resp))
-> Codec (ReqResp req resp) DeserialiseFailure m ByteString
-> ProtocolSizeLimits (ReqResp req resp) ByteString
-> ProtocolTimeLimits (ReqResp req resp)
-> Channel m ByteString
-> Peer (ReqResp req resp) 'AsClient 'NonPipelined 'StIdle m [resp]
-> m ([resp], Maybe ByteString)
forall ps (st :: ps) (pr :: PeerRole) failure bytes (m :: * -> *)
a.
(MonadAsync m, MonadFork m, MonadMask m, MonadThrow (STM m),
MonadTimer m, ShowProxy ps,
forall (st' :: ps) stok. (stok ~ StateToken st') => Show stok,
Show failure) =>
Tracer m (TraceSendRecv ps)
-> Codec ps failure m bytes
-> ProtocolSizeLimits ps bytes
-> ProtocolTimeLimits ps
-> Channel m bytes
-> Peer ps pr 'NonPipelined st m a
-> m (a, Maybe bytes)
runPeerWithLimits
((name, String, MiniProtocolNum)
-> TraceSendRecv (ReqResp req resp)
-> WithName
(name, String, MiniProtocolNum) (TraceSendRecv (ReqResp req resp))
forall name event. name -> event -> WithName name event
WithName (name
name,String
"Initiator",MiniProtocolNum
protocolNum) (TraceSendRecv (ReqResp req resp)
-> WithName
(name, String, MiniProtocolNum) (TraceSendRecv (ReqResp req resp)))
-> Tracer
m
(WithName
(name, String, MiniProtocolNum) (TraceSendRecv (ReqResp req resp)))
-> Tracer m (TraceSendRecv (ReqResp req resp))
forall a' a. (a' -> a) -> Tracer m a -> Tracer m a'
forall (f :: * -> *) a' a.
Contravariant f =>
(a' -> a) -> f a -> f a'
`contramap` Tracer
m
(WithName
(name, String, MiniProtocolNum) (TraceSendRecv (ReqResp req resp)))
forall (m :: * -> *) a. Applicative m => Tracer m a
nullTracer)
Codec (ReqResp req resp) DeserialiseFailure m ByteString
forall req resp (m :: * -> *).
(MonadST m, Serialise req, Serialise resp) =>
Codec (ReqResp req resp) DeserialiseFailure m ByteString
codecReqResp
ProtocolSizeLimits (ReqResp req resp) ByteString
forall {k} {k1} (req :: k) (resp :: k1).
ProtocolSizeLimits (ReqResp req resp) ByteString
reqRespSizeLimits
ProtocolTimeLimits (ReqResp req resp)
forall {k} {k1} (req :: k) (resp :: k1).
ProtocolTimeLimits (ReqResp req resp)
reqRespTimeLimits
Channel m ByteString
channel
(m (Peer
(ReqResp req resp) 'AsClient 'NonPipelined 'StIdle m [resp])
-> Peer (ReqResp req resp) 'AsClient 'NonPipelined 'StIdle m [resp]
forall ps (pl :: IsPipelined) (st :: ps) (m :: * -> *) a.
m (Client ps pl st m a) -> Client ps pl st m a
Effect (m (Peer
(ReqResp req resp) 'AsClient 'NonPipelined 'StIdle m [resp])
-> Peer
(ReqResp req resp) 'AsClient 'NonPipelined 'StIdle m [resp])
-> m (Peer
(ReqResp req resp) 'AsClient 'NonPipelined 'StIdle m [resp])
-> Peer (ReqResp req resp) 'AsClient 'NonPipelined 'StIdle m [resp]
forall a b. (a -> b) -> a -> b
$ do
reqs <- STM m [req] -> m [req]
forall a. HasCallStack => STM m a -> m a
forall (m :: * -> *) a.
(MonadSTM m, HasCallStack) =>
STM m a -> m a
atomically (ConnectionId peerAddr -> STM m [req]
nextRequest ConnectionId peerAddr
connId)
pure $ reqRespClientPeer (reqRespClientMap reqs)))
debugMuxErrorRethrowPolicy :: RethrowPolicy
debugMuxErrorRethrowPolicy :: RethrowPolicy
debugMuxErrorRethrowPolicy =
(ErrorContext -> Error -> ErrorCommand) -> RethrowPolicy
forall e.
Exception e =>
(ErrorContext -> e -> ErrorCommand) -> RethrowPolicy
mkRethrowPolicy ((ErrorContext -> Error -> ErrorCommand) -> RethrowPolicy)
-> (ErrorContext -> Error -> ErrorCommand) -> RethrowPolicy
forall a b. (a -> b) -> a -> b
$
\ErrorContext
_ Error
e ->
case Error
e of
Mx.IOException {} -> ErrorCommand
ShutdownPeer
Mx.BearerClosed {} -> ErrorCommand
ShutdownPeer
Error
Mx.SDUReadTimeout -> ErrorCommand
ShutdownPeer
Error
Mx.SDUWriteTimeout -> ErrorCommand
ShutdownPeer
Error
_ -> ErrorCommand
ShutdownNode
debugMuxRuntimeErrorRethrowPolicy :: RethrowPolicy
debugMuxRuntimeErrorRethrowPolicy :: RethrowPolicy
debugMuxRuntimeErrorRethrowPolicy =
(ErrorContext -> RuntimeError -> ErrorCommand) -> RethrowPolicy
forall e.
Exception e =>
(ErrorContext -> e -> ErrorCommand) -> RethrowPolicy
mkRethrowPolicy ((ErrorContext -> RuntimeError -> ErrorCommand) -> RethrowPolicy)
-> (ErrorContext -> RuntimeError -> ErrorCommand) -> RethrowPolicy
forall a b. (a -> b) -> a -> b
$
\ErrorContext
_ (RuntimeError
_ :: Mx.RuntimeError) -> ErrorCommand
ShutdownPeer
debugIOErrorRethrowPolicy :: RethrowPolicy
debugIOErrorRethrowPolicy :: RethrowPolicy
debugIOErrorRethrowPolicy =
(ErrorContext -> IOError -> ErrorCommand) -> RethrowPolicy
forall e.
Exception e =>
(ErrorContext -> e -> ErrorCommand) -> RethrowPolicy
mkRethrowPolicy ((ErrorContext -> IOError -> ErrorCommand) -> RethrowPolicy)
-> (ErrorContext -> IOError -> ErrorCommand) -> RethrowPolicy
forall a b. (a -> b) -> a -> b
$
\ErrorContext
_ (IOError
_ :: IOError) -> ErrorCommand
ShutdownPeer
assertRethrowPolicy :: RethrowPolicy
assertRethrowPolicy :: RethrowPolicy
assertRethrowPolicy =
(ErrorContext -> AssertionFailed -> ErrorCommand) -> RethrowPolicy
forall e.
Exception e =>
(ErrorContext -> e -> ErrorCommand) -> RethrowPolicy
mkRethrowPolicy ((ErrorContext -> AssertionFailed -> ErrorCommand)
-> RethrowPolicy)
-> (ErrorContext -> AssertionFailed -> ErrorCommand)
-> RethrowPolicy
forall a b. (a -> b) -> a -> b
$
\ErrorContext
_ (AssertionFailed
_ :: AssertionFailed) -> ErrorCommand
ShutdownNode
type WithNameAndBearer name addr = Compose (WithName name) (Mx.WithBearer (ConnectionId addr))
withBidirectionalConnectionManager
:: forall name peerAddr socket acc req resp m a.
( ConnectionManagerMonad m
, acc ~ [req], resp ~ [req]
, Ord peerAddr, Show peerAddr, Typeable peerAddr
, Serialise req, Typeable req
, MonadAsync m
, MonadDelay m
, MonadFix m
, MonadLabelledSTM m
, MonadTraceSTM m
, MonadSay m, Show req
, Show name
)
=> name
-> Timeouts
-> Tracer m (WithName name (RemoteTransitionTrace peerAddr))
-> Tracer m (WithName name (AbstractTransitionTrace CM.ConnStateId))
-> Tracer m (WithName name
(CM.Trace
peerAddr
(ConnectionHandlerTrace UnversionedProtocol DataFlowProtocolData)))
-> Tracer m (WithName name (InboundGovernor.Trace peerAddr))
-> Mx.Tracers' m (WithNameAndBearer name peerAddr)
-> Tracer m (WithName name (InboundGovernor.Debug peerAddr DataFlowProtocolData))
-> StdGen
-> Snocket m socket peerAddr
-> Mx.MakeBearer m socket
-> CM.ConnStateIdSupply m
-> (socket -> m ())
-> socket
-> Maybe peerAddr
-> acc
-> TemperatureBundle (ConnectionId peerAddr -> STM m [req])
-> ProtocolTimeLimits (Handshake UnversionedProtocol Term)
-> AcceptedConnectionsLimit
-> (ConnectionManagerWithExpandedCtx
Mx.InitiatorResponderMode socket peerAddr
DataFlowProtocolData UnversionedProtocol ByteString m [resp] acc
-> peerAddr
-> Async m Void
-> m a)
-> m a
withBidirectionalConnectionManager :: forall name peerAddr socket acc req resp (m :: * -> *) a.
(ConnectionManagerMonad m, acc ~ [req], resp ~ [req], Ord peerAddr,
Show peerAddr, Typeable peerAddr, Serialise req, Typeable req,
MonadAsync m, MonadDelay m, MonadFix m, MonadLabelledSTM m,
MonadTraceSTM m, MonadSay m, Show req, Show name) =>
name
-> Timeouts
-> Tracer m (WithName name (RemoteTransitionTrace peerAddr))
-> Tracer m (WithName name (AbstractTransitionTrace ConnStateId))
-> Tracer
m
(WithName
name
(Trace
peerAddr
(ConnectionHandlerTrace UnversionedProtocol DataFlowProtocolData)))
-> Tracer m (WithName name (Trace peerAddr))
-> Tracers' m (WithNameAndBearer name peerAddr)
-> Tracer m (WithName name (Debug peerAddr DataFlowProtocolData))
-> StdGen
-> Snocket m socket peerAddr
-> MakeBearer m socket
-> ConnStateIdSupply m
-> (socket -> m ())
-> socket
-> Maybe peerAddr
-> acc
-> TemperatureBundle (ConnectionId peerAddr -> STM m [req])
-> ProtocolTimeLimits (Handshake UnversionedProtocol Term)
-> AcceptedConnectionsLimit
-> (ConnectionManagerWithExpandedCtx
'InitiatorResponderMode
socket
peerAddr
DataFlowProtocolData
UnversionedProtocol
ByteString
m
[resp]
acc
-> peerAddr -> Async m Void -> m a)
-> m a
withBidirectionalConnectionManager name
name Timeouts
timeouts
Tracer m (WithName name (RemoteTransitionTrace peerAddr))
inboundTrTracer Tracer m (WithName name (AbstractTransitionTrace ConnStateId))
trTracer
Tracer
m
(WithName
name
(Trace
peerAddr
(ConnectionHandlerTrace UnversionedProtocol DataFlowProtocolData)))
tracer Tracer m (WithName name (Trace peerAddr))
inboundTracer Tracers' m (WithNameAndBearer name peerAddr)
muxTracer Tracer m (WithName name (Debug peerAddr DataFlowProtocolData))
debugTracer
StdGen
stdGen
Snocket m socket peerAddr
snocket MakeBearer m socket
makeBearer ConnStateIdSupply m
connStateIdSupply
socket -> m ()
confSock socket
socket
Maybe peerAddr
localAddress
acc
accumulatorInit TemperatureBundle (ConnectionId peerAddr -> STM m [req])
nextRequests
ProtocolTimeLimits (Handshake UnversionedProtocol Term)
handshakeTimeLimits
AcceptedConnectionsLimit
acceptedConnLimit ConnectionManagerWithExpandedCtx
'InitiatorResponderMode
socket
peerAddr
DataFlowProtocolData
UnversionedProtocol
ByteString
m
[resp]
acc
-> peerAddr -> Async m Void -> m a
k = do
mainThreadId <- m (ThreadId m)
forall (m :: * -> *). MonadThread m => m (ThreadId m)
myThreadId
inbgovInfoChannel <- newInformationChannel
let mkConnectionHandler =
TracersWithBearer (ConnectionId peerAddr) m
-> ForkPolicy peerAddr
-> HandshakeArguments
(ConnectionId peerAddr) UnversionedProtocol DataFlowProtocolData m
-> Versions
UnversionedProtocol
DataFlowProtocolData
(OuroborosBundle
'InitiatorResponderMode
(ExpandedInitiatorContext peerAddr m)
(ResponderContext peerAddr)
ByteString
m
[resp]
acc)
-> (ThreadId m, RethrowPolicy)
-> MkMuxConnectionHandler
'InitiatorResponderMode
socket
(ExpandedInitiatorContext peerAddr m)
(ResponderContext peerAddr)
peerAddr
UnversionedProtocol
DataFlowProtocolData
ByteString
m
[resp]
acc
-> MuxConnectionHandler
'InitiatorResponderMode
socket
(ExpandedInitiatorContext peerAddr m)
(ResponderContext peerAddr)
peerAddr
UnversionedProtocol
DataFlowProtocolData
ByteString
m
[resp]
acc
forall initiatorCtx responderCtx peerAddr (muxMode :: Mode) socket
versionNumber versionData (m :: * -> *) a b.
(Alternative (STM m), MonadAsync m, MonadDelay m, MonadFork m,
MonadLabelledSTM m, MonadThrow (STM m), MonadTimer m, MonadMask m,
Ord versionNumber, Show peerAddr, Typeable peerAddr) =>
TracersWithBearer (ConnectionId peerAddr) m
-> ForkPolicy peerAddr
-> HandshakeArguments
(ConnectionId peerAddr) versionNumber versionData m
-> Versions
versionNumber
versionData
(OuroborosBundle
muxMode initiatorCtx responderCtx ByteString m a b)
-> (ThreadId m, RethrowPolicy)
-> MkMuxConnectionHandler
muxMode
socket
initiatorCtx
responderCtx
peerAddr
versionNumber
versionData
ByteString
m
a
b
-> MuxConnectionHandler
muxMode
socket
initiatorCtx
responderCtx
peerAddr
versionNumber
versionData
ByteString
m
a
b
makeConnectionHandler
((WithName name (WithBearer (ConnectionId peerAddr) x)
-> WithNameAndBearer name peerAddr x
forall {k} {k1} (f :: k -> *) (g :: k1 -> k) (a :: k1).
f (g a) -> Compose f g a
Compose (WithName name (WithBearer (ConnectionId peerAddr) x)
-> WithNameAndBearer name peerAddr x)
-> (WithBearer (ConnectionId peerAddr) x
-> WithName name (WithBearer (ConnectionId peerAddr) x))
-> WithBearer (ConnectionId peerAddr) x
-> WithNameAndBearer name peerAddr x
forall b c a. (b -> c) -> (a -> b) -> a -> c
. name
-> WithBearer (ConnectionId peerAddr) x
-> WithName name (WithBearer (ConnectionId peerAddr) x)
forall name event. name -> event -> WithName name event
WithName name
name) (forall {x}.
WithBearer (ConnectionId peerAddr) x
-> WithNameAndBearer name peerAddr x)
-> Tracers' m (WithNameAndBearer name peerAddr)
-> TracersWithBearer (ConnectionId peerAddr) m
forall (f' :: * -> *) (f :: * -> *) (m :: * -> *).
(forall x. f' x -> f x) -> Tracers' m f -> Tracers' m f'
`Mx.contramapTracers'` Tracers' m (WithNameAndBearer name peerAddr)
muxTracer)
ForkPolicy peerAddr
forall peerAddr. ForkPolicy peerAddr
noBindForkPolicy
HandshakeArguments {
haHandshakeTracer :: Tracer
m
(WithBearer
(ConnectionId peerAddr)
(TraceSendRecv (Handshake UnversionedProtocol Term)))
haHandshakeTracer = name
-> WithBearer
(ConnectionId peerAddr)
(TraceSendRecv (Handshake UnversionedProtocol Term))
-> WithName
name
(WithBearer
(ConnectionId peerAddr)
(TraceSendRecv (Handshake UnversionedProtocol Term)))
forall name event. name -> event -> WithName name event
WithName name
name (WithBearer
(ConnectionId peerAddr)
(TraceSendRecv (Handshake UnversionedProtocol Term))
-> WithName
name
(WithBearer
(ConnectionId peerAddr)
(TraceSendRecv (Handshake UnversionedProtocol Term))))
-> Tracer
m
(WithName
name
(WithBearer
(ConnectionId peerAddr)
(TraceSendRecv (Handshake UnversionedProtocol Term))))
-> Tracer
m
(WithBearer
(ConnectionId peerAddr)
(TraceSendRecv (Handshake UnversionedProtocol Term)))
forall a' a. (a' -> a) -> Tracer m a -> Tracer m a'
forall (f :: * -> *) a' a.
Contravariant f =>
(a' -> a) -> f a -> f a'
`contramap` Tracer
m
(WithName
name
(WithBearer
(ConnectionId peerAddr)
(TraceSendRecv (Handshake UnversionedProtocol Term))))
forall (m :: * -> *) a. Applicative m => Tracer m a
nullTracer,
haBearerTracer :: Tracer m (WithBearer (ConnectionId peerAddr) BearerTrace)
haBearerTracer = WithBearer (ConnectionId peerAddr) BearerTrace
-> ZonkAny 0
-> WithName
(WithBearer (ConnectionId peerAddr) BearerTrace) (ZonkAny 0)
forall name event. name -> event -> WithName name event
WithName (WithBearer (ConnectionId peerAddr) BearerTrace
-> ZonkAny 0
-> WithName
(WithBearer (ConnectionId peerAddr) BearerTrace) (ZonkAny 0))
-> Tracer
m
(ZonkAny 0
-> WithName
(WithBearer (ConnectionId peerAddr) BearerTrace) (ZonkAny 0))
-> Tracer m (WithBearer (ConnectionId peerAddr) BearerTrace)
forall a' a. (a' -> a) -> Tracer m a -> Tracer m a'
forall (f :: * -> *) a' a.
Contravariant f =>
(a' -> a) -> f a -> f a'
`contramap` Tracer
m
(ZonkAny 0
-> WithName
(WithBearer (ConnectionId peerAddr) BearerTrace) (ZonkAny 0))
forall (m :: * -> *) a. Applicative m => Tracer m a
nullTracer,
haHandshakeCodec :: Codec
(Handshake UnversionedProtocol Term)
DeserialiseFailure
m
ByteString
haHandshakeCodec = Codec
(Handshake UnversionedProtocol Term)
DeserialiseFailure
m
ByteString
forall (m :: * -> *).
MonadST m =>
Codec
(Handshake UnversionedProtocol Term)
DeserialiseFailure
m
ByteString
unversionedHandshakeCodec,
haVersionDataCodec :: VersionDataCodec Term UnversionedProtocol DataFlowProtocolData
haVersionDataCodec = (UnversionedProtocol -> CodecCBORTerm Text DataFlowProtocolData)
-> VersionDataCodec Term UnversionedProtocol DataFlowProtocolData
forall vNumber vData.
(vNumber -> CodecCBORTerm Text vData)
-> VersionDataCodec Term vNumber vData
cborTermVersionDataCodec UnversionedProtocol -> CodecCBORTerm Text DataFlowProtocolData
dataFlowProtocolDataCodec,
haAcceptVersion :: DataFlowProtocolData
-> DataFlowProtocolData -> Accept DataFlowProtocolData
haAcceptVersion = DataFlowProtocolData
-> DataFlowProtocolData -> Accept DataFlowProtocolData
forall v. Acceptable v => v -> v -> Accept v
acceptableVersion,
haQueryVersion :: DataFlowProtocolData -> Bool
haQueryVersion = DataFlowProtocolData -> Bool
forall v. Queryable v => v -> Bool
queryVersion,
haTimeLimits :: ProtocolTimeLimits (Handshake UnversionedProtocol Term)
haTimeLimits = ProtocolTimeLimits (Handshake UnversionedProtocol Term)
handshakeTimeLimits
}
(DataFlow
-> OuroborosBundle
'InitiatorResponderMode
(ExpandedInitiatorContext peerAddr m)
(ResponderContext peerAddr)
ByteString
m
[resp]
acc
-> Versions
UnversionedProtocol
DataFlowProtocolData
(OuroborosBundle
'InitiatorResponderMode
(ExpandedInitiatorContext peerAddr m)
(ResponderContext peerAddr)
ByteString
m
[resp]
acc)
forall app.
DataFlow
-> app -> Versions UnversionedProtocol DataFlowProtocolData app
dataFlowProtocol DataFlow
Duplex OuroborosBundle
'InitiatorResponderMode
(ExpandedInitiatorContext peerAddr m)
(ResponderContext peerAddr)
ByteString
m
[resp]
acc
serverApplication)
(ThreadId m
mainThreadId, RethrowPolicy
debugMuxErrorRethrowPolicy
RethrowPolicy -> RethrowPolicy -> RethrowPolicy
forall a. Semigroup a => a -> a -> a
<> RethrowPolicy
debugMuxRuntimeErrorRethrowPolicy
RethrowPolicy -> RethrowPolicy -> RethrowPolicy
forall a. Semigroup a => a -> a -> a
<> RethrowPolicy
debugIOErrorRethrowPolicy
RethrowPolicy -> RethrowPolicy -> RethrowPolicy
forall a. Semigroup a => a -> a -> a
<> RethrowPolicy
assertRethrowPolicy)
withConnectionManager MuxConnectionHandler
'InitiatorResponderMode
socket
(ExpandedInitiatorContext peerAddr m)
(ResponderContext peerAddr)
peerAddr
UnversionedProtocol
DataFlowProtocolData
ByteString
m
[resp]
acc
connectionHandler ConnectionManagerWithExpandedCtx
'InitiatorResponderMode
socket
peerAddr
DataFlowProtocolData
UnversionedProtocol
ByteString
m
[resp]
acc
-> m a
k' =
Arguments
(ConnectionHandlerTrace UnversionedProtocol DataFlowProtocolData)
socket
peerAddr
(Handle
'InitiatorResponderMode
(ExpandedInitiatorContext peerAddr m)
(ResponderContext peerAddr)
DataFlowProtocolData
ByteString
m
[resp]
acc)
(HandlerError UnversionedProtocol)
UnversionedProtocol
DataFlowProtocolData
m
[resp]
acc
-> InResponderMode
'InitiatorResponderMode
(InformationChannel
(Event
'InitiatorResponderMode
(Handle
'InitiatorResponderMode
(ExpandedInitiatorContext peerAddr m)
(ResponderContext peerAddr)
DataFlowProtocolData
ByteString
m
[resp]
acc)
(ExpandedInitiatorContext peerAddr m)
peerAddr
DataFlowProtocolData
m
[resp]
acc)
m)
-> MuxConnectionHandler
'InitiatorResponderMode
socket
(ExpandedInitiatorContext peerAddr m)
(ResponderContext peerAddr)
peerAddr
UnversionedProtocol
DataFlowProtocolData
ByteString
m
[resp]
acc
-> (ConnectionManagerWithExpandedCtx
'InitiatorResponderMode
socket
peerAddr
DataFlowProtocolData
UnversionedProtocol
ByteString
m
[resp]
acc
-> m a)
-> m a
forall (muxMode :: Mode) peerAddr socket initiatorCtx handlerTrace
handle handleError version versionData (m :: * -> *) a b x.
(Alternative (STM m), MonadLabelledSTM m, MonadTraceSTM m,
MonadFork m, MonadAsync m, MonadDelay m, MonadEvaluate m,
MonadFix m, MonadMask m, MonadThrow (STM m), MonadTimer m,
Ord peerAddr, Show peerAddr, Typeable peerAddr) =>
Arguments
handlerTrace
socket
peerAddr
handle
handleError
version
versionData
m
a
b
-> InResponderMode
muxMode
(InformationChannel
(Event muxMode handle initiatorCtx peerAddr versionData m a b) m)
-> ConnectionHandler
muxMode
handlerTrace
socket
peerAddr
handle
handleError
version
versionData
m
-> (ConnectionManager muxMode socket peerAddr handle handleError m
-> m x)
-> m x
CM.with CM.Arguments {
tracer :: Tracer
m
(Trace
peerAddr
(ConnectionHandlerTrace UnversionedProtocol DataFlowProtocolData))
tracer = name
-> Trace
peerAddr
(ConnectionHandlerTrace UnversionedProtocol DataFlowProtocolData)
-> WithName
name
(Trace
peerAddr
(ConnectionHandlerTrace UnversionedProtocol DataFlowProtocolData))
forall name event. name -> event -> WithName name event
WithName name
name
(Trace
peerAddr
(ConnectionHandlerTrace UnversionedProtocol DataFlowProtocolData)
-> WithName
name
(Trace
peerAddr
(ConnectionHandlerTrace UnversionedProtocol DataFlowProtocolData)))
-> Tracer
m
(WithName
name
(Trace
peerAddr
(ConnectionHandlerTrace UnversionedProtocol DataFlowProtocolData)))
-> Tracer
m
(Trace
peerAddr
(ConnectionHandlerTrace UnversionedProtocol DataFlowProtocolData))
forall a' a. (a' -> a) -> Tracer m a -> Tracer m a'
forall (f :: * -> *) a' a.
Contravariant f =>
(a' -> a) -> f a -> f a'
`contramap` Tracer
m
(WithName
name
(Trace
peerAddr
(ConnectionHandlerTrace UnversionedProtocol DataFlowProtocolData)))
tracer,
trTracer :: Tracer
m
(TransitionTrace
ConnStateId
(ConnectionState
peerAddr
(Handle
'InitiatorResponderMode
(ExpandedInitiatorContext peerAddr m)
(ResponderContext peerAddr)
DataFlowProtocolData
ByteString
m
[resp]
acc)
(HandlerError UnversionedProtocol)
UnversionedProtocol
m))
trTracer = (name
-> AbstractTransitionTrace ConnStateId
-> WithName name (AbstractTransitionTrace ConnStateId)
forall name event. name -> event -> WithName name event
WithName name
name (AbstractTransitionTrace ConnStateId
-> WithName name (AbstractTransitionTrace ConnStateId))
-> (TransitionTrace
ConnStateId
(ConnectionState
peerAddr
(Handle
'InitiatorResponderMode
(ExpandedInitiatorContext peerAddr m)
(ResponderContext peerAddr)
DataFlowProtocolData
ByteString
m
[resp]
acc)
(HandlerError UnversionedProtocol)
UnversionedProtocol
m)
-> AbstractTransitionTrace ConnStateId)
-> TransitionTrace
ConnStateId
(ConnectionState
peerAddr
(Handle
'InitiatorResponderMode
(ExpandedInitiatorContext peerAddr m)
(ResponderContext peerAddr)
DataFlowProtocolData
ByteString
m
[resp]
acc)
(HandlerError UnversionedProtocol)
UnversionedProtocol
m)
-> WithName name (AbstractTransitionTrace ConnStateId)
forall b c a. (b -> c) -> (a -> b) -> a -> c
. (MaybeUnknown
(ConnectionState
peerAddr
(Handle
'InitiatorResponderMode
(ExpandedInitiatorContext peerAddr m)
(ResponderContext peerAddr)
DataFlowProtocolData
ByteString
m
[resp]
acc)
(HandlerError UnversionedProtocol)
UnversionedProtocol
m)
-> AbstractState)
-> TransitionTrace
ConnStateId
(ConnectionState
peerAddr
(Handle
'InitiatorResponderMode
(ExpandedInitiatorContext peerAddr m)
(ResponderContext peerAddr)
DataFlowProtocolData
ByteString
m
[resp]
acc)
(HandlerError UnversionedProtocol)
UnversionedProtocol
m)
-> AbstractTransitionTrace ConnStateId
forall a b.
(a -> b)
-> TransitionTrace' ConnStateId a -> TransitionTrace' ConnStateId b
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
fmap MaybeUnknown
(ConnectionState
peerAddr
(Handle
'InitiatorResponderMode
(ExpandedInitiatorContext peerAddr m)
(ResponderContext peerAddr)
DataFlowProtocolData
ByteString
m
[resp]
acc)
(HandlerError UnversionedProtocol)
UnversionedProtocol
m)
-> AbstractState
forall muxMode peerAddr m a (b :: * -> *).
MaybeUnknown (ConnectionState muxMode peerAddr m a b)
-> AbstractState
CM.abstractState)
(TransitionTrace
ConnStateId
(ConnectionState
peerAddr
(Handle
'InitiatorResponderMode
(ExpandedInitiatorContext peerAddr m)
(ResponderContext peerAddr)
DataFlowProtocolData
ByteString
m
[resp]
acc)
(HandlerError UnversionedProtocol)
UnversionedProtocol
m)
-> WithName name (AbstractTransitionTrace ConnStateId))
-> Tracer m (WithName name (AbstractTransitionTrace ConnStateId))
-> Tracer
m
(TransitionTrace
ConnStateId
(ConnectionState
peerAddr
(Handle
'InitiatorResponderMode
(ExpandedInitiatorContext peerAddr m)
(ResponderContext peerAddr)
DataFlowProtocolData
ByteString
m
[resp]
acc)
(HandlerError UnversionedProtocol)
UnversionedProtocol
m))
forall a' a. (a' -> a) -> Tracer m a -> Tracer m a'
forall (f :: * -> *) a' a.
Contravariant f =>
(a' -> a) -> f a -> f a'
`contramap` Tracer m (WithName name (AbstractTransitionTrace ConnStateId))
trTracer,
ipv4Address :: Maybe peerAddr
ipv4Address = Maybe peerAddr
localAddress,
ipv6Address :: Maybe peerAddr
ipv6Address = Maybe peerAddr
forall a. Maybe a
Nothing,
addressType :: peerAddr -> Maybe AddressType
addressType = \peerAddr
_ -> AddressType -> Maybe AddressType
forall a. a -> Maybe a
Just AddressType
IPv4Address,
Snocket m socket peerAddr
snocket :: Snocket m socket peerAddr
snocket :: Snocket m socket peerAddr
snocket,
MakeBearer m socket
makeBearer :: MakeBearer m socket
makeBearer :: MakeBearer m socket
makeBearer,
withBuffer :: (Maybe (ReadBuffer m) -> m ()) -> m ()
withBuffer = \Maybe (ReadBuffer m) -> m ()
f -> Maybe (ReadBuffer m) -> m ()
f Maybe (ReadBuffer m)
forall a. Maybe a
Nothing,
configureSocket :: socket -> Maybe peerAddr -> m ()
configureSocket = \socket
sock Maybe peerAddr
_ -> socket -> m ()
confSock socket
sock,
connectionDataFlow :: DataFlowProtocolData -> DataFlow
connectionDataFlow = \(DataFlowProtocolData DataFlow
df PeerSharing
_) -> DataFlow
df,
timeWaitTimeout :: DiffTime
timeWaitTimeout = Timeouts -> DiffTime
tTimeWaitTimeout Timeouts
timeouts,
outboundIdleTimeout :: DiffTime
outboundIdleTimeout = Timeouts -> DiffTime
tOutboundIdleTimeout Timeouts
timeouts,
prunePolicy :: PrunePolicy peerAddr
prunePolicy = PrunePolicy peerAddr
forall peerAddr. Ord peerAddr => PrunePolicy peerAddr
simplePrunePolicy,
StdGen
stdGen :: StdGen
stdGen :: StdGen
stdGen,
connectionsLimits :: AcceptedConnectionsLimit
connectionsLimits = AcceptedConnectionsLimit
acceptedConnLimit,
updateVersionData :: DataFlowProtocolData -> DiffusionMode -> DataFlowProtocolData
updateVersionData = \DataFlowProtocolData
versionData DiffusionMode
diffusionMode ->
DataFlowProtocolData
versionData { getProtocolDataFlow =
case diffusionMode of
DiffusionMode
InitiatorOnlyDiffusionMode -> DataFlow
Unidirectional
DiffusionMode
InitiatorAndResponderDiffusionMode -> DataFlow
Duplex
},
ConnStateIdSupply m
connStateIdSupply :: ConnStateIdSupply m
connStateIdSupply :: ConnStateIdSupply m
connStateIdSupply,
classifyHandlerError :: HandlerError UnversionedProtocol -> HandlerErrorType
classifyHandlerError = (\HandlerError UnversionedProtocol
_ -> HandlerErrorType
HandshakeFailure)
}
(InformationChannel
(Event
'InitiatorResponderMode
(Handle
'InitiatorResponderMode
(ExpandedInitiatorContext peerAddr m)
(ResponderContext peerAddr)
DataFlowProtocolData
ByteString
m
[resp]
acc)
(ExpandedInitiatorContext peerAddr m)
peerAddr
DataFlowProtocolData
m
[resp]
acc)
m
-> InResponderMode
'InitiatorResponderMode
(InformationChannel
(Event
'InitiatorResponderMode
(Handle
'InitiatorResponderMode
(ExpandedInitiatorContext peerAddr m)
(ResponderContext peerAddr)
DataFlowProtocolData
ByteString
m
[resp]
acc)
(ExpandedInitiatorContext peerAddr m)
peerAddr
DataFlowProtocolData
m
[resp]
acc)
m)
forall (mode :: Mode) a.
(HasResponder mode ~ 'True) =>
a -> InResponderMode mode a
InResponderMode InformationChannel
(Event
'InitiatorResponderMode
(Handle
'InitiatorResponderMode
(ExpandedInitiatorContext peerAddr m)
(ResponderContext peerAddr)
DataFlowProtocolData
ByteString
m
[resp]
acc)
(ExpandedInitiatorContext peerAddr m)
peerAddr
DataFlowProtocolData
m
[resp]
acc)
m
inbgovInfoChannel)
MuxConnectionHandler
'InitiatorResponderMode
socket
(ExpandedInitiatorContext peerAddr m)
(ResponderContext peerAddr)
peerAddr
UnversionedProtocol
DataFlowProtocolData
ByteString
m
[resp]
acc
connectionHandler
ConnectionManagerWithExpandedCtx
'InitiatorResponderMode
socket
peerAddr
DataFlowProtocolData
UnversionedProtocol
ByteString
m
[resp]
acc
-> m a
k'
serverAddr <- Snocket.getLocalAddr snocket socket
handle (\(SomeException
e :: SomeException) -> SomeException -> m a
forall e a. Exception e => e -> m a
forall (m :: * -> *) e a. (MonadThrow m, Exception e) => e -> m a
throwIO SomeException
e) $
Server.with
Server.Arguments {
sockets = socket :| [],
snocket = snocket,
tracer =
WithName name `contramap` nullTracer,
connectionLimits = acceptedConnLimit,
inboundGovernorArgs =
InboundGovernor.Arguments {
transitionTracer =
WithName name `contramap` inboundTrTracer,
tracer =
WithName name `contramap` inboundTracer,
debugTracer =
WithName name `contramap` debugTracer,
connectionDataFlow = \(DataFlowProtocolData DataFlow
df PeerSharing
_) -> DataFlow
df,
infoChannel = inbgovInfoChannel,
idleTimeout = Just (tProtocolIdleTimeout timeouts),
withConnectionManager,
mkConnectionHandler = mkConnectionHandler . MuxInitiatorResponderConnectionHandler (\(DataFlowProtocolData DataFlow
df PeerSharing
_) -> DataFlow
df)
}
}
(\Async m Void
inboundGovernorAsync m (PublicState peerAddr DataFlowProtocolData)
_ ConnectionManagerWithExpandedCtx
'InitiatorResponderMode
socket
peerAddr
DataFlowProtocolData
UnversionedProtocol
ByteString
m
[resp]
acc
connectionManager -> ConnectionManagerWithExpandedCtx
'InitiatorResponderMode
socket
peerAddr
DataFlowProtocolData
UnversionedProtocol
ByteString
m
[resp]
acc
-> peerAddr -> Async m Void -> m a
k ConnectionManagerWithExpandedCtx
'InitiatorResponderMode
socket
peerAddr
DataFlowProtocolData
UnversionedProtocol
ByteString
m
[resp]
acc
connectionManager peerAddr
serverAddr Async m Void
inboundGovernorAsync)
where
serverApplication :: TemperatureBundle
[MiniProtocol Mx.InitiatorResponderMode
(ExpandedInitiatorContext peerAddr m)
(ResponderContext peerAddr)
ByteString m [resp] acc]
serverApplication :: OuroborosBundle
'InitiatorResponderMode
(ExpandedInitiatorContext peerAddr m)
(ResponderContext peerAddr)
ByteString
m
[resp]
acc
serverApplication = MiniProtocolNum
-> (ConnectionId peerAddr -> STM m [req])
-> [MiniProtocol
'InitiatorResponderMode
(ExpandedInitiatorContext peerAddr m)
(ResponderContext peerAddr)
ByteString
m
[resp]
acc]
mkProto (MiniProtocolNum
-> (ConnectionId peerAddr -> STM m [req])
-> [MiniProtocol
'InitiatorResponderMode
(ExpandedInitiatorContext peerAddr m)
(ResponderContext peerAddr)
ByteString
m
[resp]
acc])
-> TemperatureBundle MiniProtocolNum
-> TemperatureBundle
((ConnectionId peerAddr -> STM m [req])
-> [MiniProtocol
'InitiatorResponderMode
(ExpandedInitiatorContext peerAddr m)
(ResponderContext peerAddr)
ByteString
m
[resp]
acc])
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> (Word16 -> MiniProtocolNum
Mx.MiniProtocolNum (Word16 -> MiniProtocolNum)
-> TemperatureBundle Word16 -> TemperatureBundle MiniProtocolNum
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> TemperatureBundle Word16
nums) TemperatureBundle
((ConnectionId peerAddr -> STM m [req])
-> [MiniProtocol
'InitiatorResponderMode
(ExpandedInitiatorContext peerAddr m)
(ResponderContext peerAddr)
ByteString
m
[resp]
acc])
-> TemperatureBundle (ConnectionId peerAddr -> STM m [req])
-> OuroborosBundle
'InitiatorResponderMode
(ExpandedInitiatorContext peerAddr m)
(ResponderContext peerAddr)
ByteString
m
[resp]
acc
forall a b.
TemperatureBundle (a -> b)
-> TemperatureBundle a -> TemperatureBundle b
forall (f :: * -> *) a b. Applicative f => f (a -> b) -> f a -> f b
<*> TemperatureBundle (ConnectionId peerAddr -> STM m [req])
nextRequests
where nums :: TemperatureBundle Word16
nums = WithProtocolTemperature 'Hot Word16
-> WithProtocolTemperature 'Warm Word16
-> WithProtocolTemperature 'Established Word16
-> TemperatureBundle Word16
forall a.
WithProtocolTemperature 'Hot a
-> WithProtocolTemperature 'Warm a
-> WithProtocolTemperature 'Established a
-> TemperatureBundle a
TemperatureBundle (Word16 -> WithProtocolTemperature 'Hot Word16
forall a. a -> WithProtocolTemperature 'Hot a
WithHot Word16
1) (Word16 -> WithProtocolTemperature 'Warm Word16
forall a. a -> WithProtocolTemperature 'Warm a
WithWarm Word16
2) (Word16 -> WithProtocolTemperature 'Established Word16
forall a. a -> WithProtocolTemperature 'Established a
WithEstablished Word16
3)
mkProto :: MiniProtocolNum
-> (ConnectionId peerAddr -> STM m [req])
-> [MiniProtocol
'InitiatorResponderMode
(ExpandedInitiatorContext peerAddr m)
(ResponderContext peerAddr)
ByteString
m
[resp]
acc]
mkProto MiniProtocolNum
miniProtocolNum ConnectionId peerAddr -> STM m [req]
nextRequest =
[MiniProtocol {
MiniProtocolNum
miniProtocolNum :: MiniProtocolNum
miniProtocolNum :: MiniProtocolNum
miniProtocolNum,
miniProtocolStart :: StartOnDemandOrEagerly
miniProtocolStart = StartOnDemandOrEagerly
Mx.StartOnDemand,
miniProtocolLimits :: MiniProtocolLimits
miniProtocolLimits = Int -> MiniProtocolLimits
Mx.MiniProtocolLimits Int
forall a. Bounded a => a
maxBound,
miniProtocolRun :: RunMiniProtocol
'InitiatorResponderMode
(ExpandedInitiatorContext peerAddr m)
(ResponderContext peerAddr)
ByteString
m
[resp]
acc
miniProtocolRun = MiniProtocolNum
-> acc
-> (ConnectionId peerAddr -> STM m [req])
-> RunMiniProtocol
'InitiatorResponderMode
(ExpandedInitiatorContext peerAddr m)
(ResponderContext peerAddr)
ByteString
m
[resp]
acc
reqRespInitiatorAndResponder
MiniProtocolNum
miniProtocolNum
acc
accumulatorInit
ConnectionId peerAddr -> STM m [req]
nextRequest
}]
reqRespInitiatorAndResponder
:: Mx.MiniProtocolNum
-> acc
-> (ConnectionId peerAddr -> STM m [req])
-> RunMiniProtocol Mx.InitiatorResponderMode
(ExpandedInitiatorContext peerAddr m)
(ResponderContext peerAddr)
ByteString m [resp] acc
reqRespInitiatorAndResponder :: MiniProtocolNum
-> acc
-> (ConnectionId peerAddr -> STM m [req])
-> RunMiniProtocol
'InitiatorResponderMode
(ExpandedInitiatorContext peerAddr m)
(ResponderContext peerAddr)
ByteString
m
[resp]
acc
reqRespInitiatorAndResponder MiniProtocolNum
protocolNum acc
accInit ConnectionId peerAddr -> STM m [req]
nextRequest =
MiniProtocolCb
(ExpandedInitiatorContext peerAddr m) ByteString m [resp]
-> MiniProtocolCb (ResponderContext peerAddr) ByteString m acc
-> RunMiniProtocol
'InitiatorResponderMode
(ExpandedInitiatorContext peerAddr m)
(ResponderContext peerAddr)
ByteString
m
[resp]
acc
forall initiatorCtx bytes (m :: * -> *) a responderCtx b.
MiniProtocolCb initiatorCtx bytes m a
-> MiniProtocolCb responderCtx bytes m b
-> RunMiniProtocol
'InitiatorResponderMode initiatorCtx responderCtx bytes m a b
InitiatorAndResponderProtocol
((ExpandedInitiatorContext peerAddr m
-> Channel m ByteString -> m ([resp], Maybe ByteString))
-> MiniProtocolCb
(ExpandedInitiatorContext peerAddr m) ByteString m [resp]
forall ctx bytes (m :: * -> *) a.
(ctx -> Channel m bytes -> m (a, Maybe bytes))
-> MiniProtocolCb ctx bytes m a
MiniProtocolCb ((ExpandedInitiatorContext peerAddr m
-> Channel m ByteString -> m ([resp], Maybe ByteString))
-> MiniProtocolCb
(ExpandedInitiatorContext peerAddr m) ByteString m [resp])
-> (ExpandedInitiatorContext peerAddr m
-> Channel m ByteString -> m ([resp], Maybe ByteString))
-> MiniProtocolCb
(ExpandedInitiatorContext peerAddr m) ByteString m [resp]
forall a b. (a -> b) -> a -> b
$ \ExpandedInitiatorContext { eicConnectionId :: forall addr (m :: * -> *).
ExpandedInitiatorContext addr m -> ConnectionId addr
eicConnectionId = ConnectionId peerAddr
connId } Channel m ByteString
channel ->
Tracer m (TraceSendRecv (ReqResp req resp))
-> Codec (ReqResp req resp) DeserialiseFailure m ByteString
-> ProtocolSizeLimits (ReqResp req resp) ByteString
-> ProtocolTimeLimits (ReqResp req resp)
-> Channel m ByteString
-> Peer (ReqResp req resp) 'AsClient 'NonPipelined 'StIdle m [resp]
-> m ([resp], Maybe ByteString)
forall ps (st :: ps) (pr :: PeerRole) failure bytes (m :: * -> *)
a.
(MonadAsync m, MonadFork m, MonadMask m, MonadThrow (STM m),
MonadTimer m, ShowProxy ps,
forall (st' :: ps) stok. (stok ~ StateToken st') => Show stok,
Show failure) =>
Tracer m (TraceSendRecv ps)
-> Codec ps failure m bytes
-> ProtocolSizeLimits ps bytes
-> ProtocolTimeLimits ps
-> Channel m bytes
-> Peer ps pr 'NonPipelined st m a
-> m (a, Maybe bytes)
runPeerWithLimits
((name, String, MiniProtocolNum)
-> TraceSendRecv (ReqResp req resp)
-> WithName
(name, String, MiniProtocolNum) (TraceSendRecv (ReqResp req resp))
forall name event. name -> event -> WithName name event
WithName (name
name,String
"Initiator",MiniProtocolNum
protocolNum) (TraceSendRecv (ReqResp req resp)
-> WithName
(name, String, MiniProtocolNum) (TraceSendRecv (ReqResp req resp)))
-> Tracer
m
(WithName
(name, String, MiniProtocolNum) (TraceSendRecv (ReqResp req resp)))
-> Tracer m (TraceSendRecv (ReqResp req resp))
forall a' a. (a' -> a) -> Tracer m a -> Tracer m a'
forall (f :: * -> *) a' a.
Contravariant f =>
(a' -> a) -> f a -> f a'
`contramap` Tracer
m
(WithName
(name, String, MiniProtocolNum) (TraceSendRecv (ReqResp req resp)))
forall (m :: * -> *) a. Applicative m => Tracer m a
nullTracer)
Codec (ReqResp req resp) DeserialiseFailure m ByteString
forall req resp (m :: * -> *).
(MonadST m, Serialise req, Serialise resp) =>
Codec (ReqResp req resp) DeserialiseFailure m ByteString
codecReqResp
ProtocolSizeLimits (ReqResp req resp) ByteString
forall {k} {k1} (req :: k) (resp :: k1).
ProtocolSizeLimits (ReqResp req resp) ByteString
reqRespSizeLimits
ProtocolTimeLimits (ReqResp req resp)
forall {k} {k1} (req :: k) (resp :: k1).
ProtocolTimeLimits (ReqResp req resp)
reqRespTimeLimits
Channel m ByteString
channel
(m (Peer
(ReqResp req resp) 'AsClient 'NonPipelined 'StIdle m [resp])
-> Peer (ReqResp req resp) 'AsClient 'NonPipelined 'StIdle m [resp]
forall ps (pl :: IsPipelined) (st :: ps) (m :: * -> *) a.
m (Client ps pl st m a) -> Client ps pl st m a
Effect (m (Peer
(ReqResp req resp) 'AsClient 'NonPipelined 'StIdle m [resp])
-> Peer
(ReqResp req resp) 'AsClient 'NonPipelined 'StIdle m [resp])
-> m (Peer
(ReqResp req resp) 'AsClient 'NonPipelined 'StIdle m [resp])
-> Peer (ReqResp req resp) 'AsClient 'NonPipelined 'StIdle m [resp]
forall a b. (a -> b) -> a -> b
$ do
reqs <- STM m [req] -> m [req]
forall a. HasCallStack => STM m a -> m a
forall (m :: * -> *) a.
(MonadSTM m, HasCallStack) =>
STM m a -> m a
atomically (ConnectionId peerAddr -> STM m [req]
nextRequest ConnectionId peerAddr
connId)
pure $ reqRespClientPeer (reqRespClientMap reqs)))
((ResponderContext peerAddr
-> Channel m ByteString -> m (acc, Maybe ByteString))
-> MiniProtocolCb (ResponderContext peerAddr) ByteString m acc
forall ctx bytes (m :: * -> *) a.
(ctx -> Channel m bytes -> m (a, Maybe bytes))
-> MiniProtocolCb ctx bytes m a
MiniProtocolCb ((ResponderContext peerAddr
-> Channel m ByteString -> m (acc, Maybe ByteString))
-> MiniProtocolCb (ResponderContext peerAddr) ByteString m acc)
-> (ResponderContext peerAddr
-> Channel m ByteString -> m (acc, Maybe ByteString))
-> MiniProtocolCb (ResponderContext peerAddr) ByteString m acc
forall a b. (a -> b) -> a -> b
$ \ResponderContext peerAddr
_ctx Channel m ByteString
channel ->
Tracer m (TraceSendRecv (ReqResp req resp))
-> Codec (ReqResp req resp) DeserialiseFailure m ByteString
-> ProtocolSizeLimits (ReqResp req resp) ByteString
-> ProtocolTimeLimits (ReqResp req resp)
-> Channel m ByteString
-> Peer (ReqResp req resp) 'AsServer 'NonPipelined 'StIdle m acc
-> m (acc, Maybe ByteString)
forall ps (st :: ps) (pr :: PeerRole) failure bytes (m :: * -> *)
a.
(MonadAsync m, MonadFork m, MonadMask m, MonadThrow (STM m),
MonadTimer m, ShowProxy ps,
forall (st' :: ps) stok. (stok ~ StateToken st') => Show stok,
Show failure) =>
Tracer m (TraceSendRecv ps)
-> Codec ps failure m bytes
-> ProtocolSizeLimits ps bytes
-> ProtocolTimeLimits ps
-> Channel m bytes
-> Peer ps pr 'NonPipelined st m a
-> m (a, Maybe bytes)
runPeerWithLimits
((name, String, MiniProtocolNum)
-> TraceSendRecv (ReqResp req resp)
-> WithName
(name, String, MiniProtocolNum) (TraceSendRecv (ReqResp req resp))
forall name event. name -> event -> WithName name event
WithName (name
name,String
"Responder",MiniProtocolNum
protocolNum) (TraceSendRecv (ReqResp req resp)
-> WithName
(name, String, MiniProtocolNum) (TraceSendRecv (ReqResp req resp)))
-> Tracer
m
(WithName
(name, String, MiniProtocolNum) (TraceSendRecv (ReqResp req resp)))
-> Tracer m (TraceSendRecv (ReqResp req resp))
forall a' a. (a' -> a) -> Tracer m a -> Tracer m a'
forall (f :: * -> *) a' a.
Contravariant f =>
(a' -> a) -> f a -> f a'
`contramap` Tracer
m
(WithName
(name, String, MiniProtocolNum) (TraceSendRecv (ReqResp req resp)))
forall (m :: * -> *) a. Applicative m => Tracer m a
nullTracer)
Codec (ReqResp req resp) DeserialiseFailure m ByteString
forall req resp (m :: * -> *).
(MonadST m, Serialise req, Serialise resp) =>
Codec (ReqResp req resp) DeserialiseFailure m ByteString
codecReqResp
ProtocolSizeLimits (ReqResp req resp) ByteString
forall {k} {k1} (req :: k) (resp :: k1).
ProtocolSizeLimits (ReqResp req resp) ByteString
reqRespSizeLimits
ProtocolTimeLimits (ReqResp req resp)
forall {k} {k1} (req :: k) (resp :: k1).
ProtocolTimeLimits (ReqResp req resp)
reqRespTimeLimits
Channel m ByteString
channel
(ReqRespServer req resp m acc
-> Peer (ReqResp req resp) 'AsServer 'NonPipelined 'StIdle m acc
forall (m :: * -> *) req resp a.
Monad m =>
ReqRespServer req resp m a
-> Server (ReqResp req resp) 'NonPipelined 'StIdle m a
reqRespServerPeer (ReqRespServer req resp m acc
-> Peer (ReqResp req resp) 'AsServer 'NonPipelined 'StIdle m acc)
-> ReqRespServer req resp m acc
-> Peer (ReqResp req resp) 'AsServer 'NonPipelined 'StIdle m acc
forall a b. (a -> b) -> a -> b
$ acc -> ReqRespServer req resp m acc
reqRespServerMapAccumL' acc
accInit))
reqRespServerMapAccumL' :: acc -> ReqRespServer req resp m acc
reqRespServerMapAccumL' :: acc -> ReqRespServer req resp m acc
reqRespServerMapAccumL' = acc -> ReqRespServer req resp m acc
[req] -> ReqRespServer req [req] m [req]
forall {m :: * -> *} {a}.
Monad m =>
[a] -> ReqRespServer a [a] m [a]
go
where
fn :: [a] -> a -> ([a], [a])
fn [a]
acc a
x = (a
x a -> [a] -> [a]
forall a. a -> [a] -> [a]
: [a]
acc, a
x a -> [a] -> [a]
forall a. a -> [a] -> [a]
: [a]
acc)
go :: [a] -> ReqRespServer a [a] m [a]
go [a]
acc =
ReqRespServer {
recvMsgReq :: a -> m ([a], ReqRespServer a [a] m [a])
recvMsgReq = \a
req ->
let ([a]
acc', [a]
resp) = [a] -> a -> ([a], [a])
forall {a}. [a] -> a -> ([a], [a])
fn [a]
acc a
req
in ([a], ReqRespServer a [a] m [a])
-> m ([a], ReqRespServer a [a] m [a])
forall a. a -> m a
forall (m :: * -> *) a. Monad m => a -> m a
return ([a]
resp, [a] -> ReqRespServer a [a] m [a]
go [a]
acc'),
recvMsgDone :: m [a]
recvMsgDone = [a] -> m [a]
forall a. a -> m a
forall (m :: * -> *) a. Monad m => a -> m a
return [a]
acc
}
reqRespSizeLimits :: forall req resp. ProtocolSizeLimits (ReqResp req resp)
ByteString
reqRespSizeLimits :: forall {k} {k1} (req :: k) (resp :: k1).
ProtocolSizeLimits (ReqResp req resp) ByteString
reqRespSizeLimits = ProtocolSizeLimits
{ StateToken st -> Word
forall (st :: ReqResp req resp).
ActiveState st =>
StateToken st -> Word
forall (st :: ReqResp req resp). StateToken st -> Word
sizeLimitForState :: forall (st :: ReqResp req resp). StateToken st -> Word
sizeLimitForState :: forall (st :: ReqResp req resp).
ActiveState st =>
StateToken st -> Word
sizeLimitForState
, dataSize :: ByteString -> Word
dataSize = Int64 -> Word
forall a b. (Integral a, Num b) => a -> b
fromIntegral (Int64 -> Word) -> (ByteString -> Int64) -> ByteString -> Word
forall b c a. (b -> c) -> (a -> b) -> a -> c
. ByteString -> Int64
LBS.length
}
where
sizeLimitForState :: forall (st :: ReqResp req resp).
StateToken st -> Word
sizeLimitForState :: forall (st :: ReqResp req resp). StateToken st -> Word
sizeLimitForState StateToken st
_ = Word
forall a. Bounded a => a
maxBound
reqRespTimeLimits :: forall req resp. ProtocolTimeLimits (ReqResp req resp)
reqRespTimeLimits :: forall {k} {k1} (req :: k) (resp :: k1).
ProtocolTimeLimits (ReqResp req resp)
reqRespTimeLimits = ProtocolTimeLimits { StateToken st -> Maybe DiffTime
forall (st :: ReqResp req resp).
ActiveState st =>
StateToken st -> Maybe DiffTime
timeLimitForState :: forall (st :: ReqResp req resp).
ActiveState st =>
StateToken st -> Maybe DiffTime
timeLimitForState :: forall (st :: ReqResp req resp).
ActiveState st =>
StateToken st -> Maybe DiffTime
timeLimitForState }
where
timeLimitForState :: forall (st :: ReqResp req resp).
ActiveState st
=> StateToken st -> Maybe DiffTime
timeLimitForState :: forall (st :: ReqResp req resp).
ActiveState st =>
StateToken st -> Maybe DiffTime
timeLimitForState StateToken st
SReqResp st
SingIdle = Maybe DiffTime
forall a. Maybe a
Nothing
timeLimitForState StateToken st
SReqResp st
SingBusy = DiffTime -> Maybe DiffTime
forall a. a -> Maybe a
Just DiffTime
60
timeLimitForState a :: StateToken st
a@StateToken st
SReqResp st
ReqResp.SingDone = StateToken 'StDone -> forall a. a
forall ps (st :: ps).
(StateAgency st ~ 'NobodyAgency, ActiveState st) =>
StateToken st -> forall a. a
notActiveState StateToken st
StateToken 'StDone
a
runInitiatorProtocols
:: forall muxMode addr m a b.
( Alternative (STM m)
, MonadAsync m
, MonadCatch m
, MonadLabelledSTM m
, MonadMask m
, MonadSTM m
, MonadThrow (STM m)
, HasInitiator muxMode ~ True
, MonadSay m
)
=> SingMuxMode muxMode
-> Mx.Mux muxMode m
-> OuroborosBundle muxMode (ExpandedInitiatorContext addr m)
(ResponderContext addr)
ByteString m a b
-> TemperatureBundle (StrictTVar m ControlMessage)
-> ConnectionId addr
-> m (TemperatureBundle a)
runInitiatorProtocols :: forall (muxMode :: Mode) addr (m :: * -> *) a b.
(Alternative (STM m), MonadAsync m, MonadCatch m,
MonadLabelledSTM m, MonadMask m, MonadSTM m, MonadThrow (STM m),
HasInitiator muxMode ~ 'True, MonadSay m) =>
SingMuxMode muxMode
-> Mux muxMode m
-> OuroborosBundle
muxMode
(ExpandedInitiatorContext addr m)
(ResponderContext addr)
ByteString
m
a
b
-> TemperatureBundle (StrictTVar m ControlMessage)
-> ConnectionId addr
-> m (TemperatureBundle a)
runInitiatorProtocols SingMuxMode muxMode
singMuxMode Mux muxMode m
mux OuroborosBundle
muxMode
(ExpandedInitiatorContext addr m)
(ResponderContext addr)
ByteString
m
a
b
bundle TemperatureBundle (StrictTVar m ControlMessage)
controlBundle ConnectionId addr
connId = do
bundle' <- ((MiniProtocolWithExpandedCtx muxMode addr ByteString m a b,
STM m ControlMessage)
-> m (STM m (Either SomeException a)))
-> TemperatureBundle
(MiniProtocolWithExpandedCtx muxMode addr ByteString m a b,
STM m ControlMessage)
-> m (TemperatureBundle (STM m (Either SomeException a)))
forall (t :: * -> *) (f :: * -> *) a b.
(Traversable t, Applicative f) =>
(a -> f b) -> t a -> f (t b)
forall (f :: * -> *) a b.
Applicative f =>
(a -> f b) -> TemperatureBundle a -> f (TemperatureBundle b)
traverse ((MiniProtocolWithExpandedCtx muxMode addr ByteString m a b
-> STM m ControlMessage -> m (STM m (Either SomeException a)))
-> (MiniProtocolWithExpandedCtx muxMode addr ByteString m a b,
STM m ControlMessage)
-> m (STM m (Either SomeException a))
forall a b c. (a -> b -> c) -> (a, b) -> c
uncurry MiniProtocolWithExpandedCtx muxMode addr ByteString m a b
-> STM m ControlMessage -> m (STM m (Either SomeException a))
runInitiator) ((,) (MiniProtocolWithExpandedCtx muxMode addr ByteString m a b
-> STM m ControlMessage
-> (MiniProtocolWithExpandedCtx muxMode addr ByteString m a b,
STM m ControlMessage))
-> TemperatureBundle
(MiniProtocolWithExpandedCtx muxMode addr ByteString m a b)
-> TemperatureBundle
(STM m ControlMessage
-> (MiniProtocolWithExpandedCtx muxMode addr ByteString m a b,
STM m ControlMessage))
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> ([MiniProtocolWithExpandedCtx muxMode addr ByteString m a b]
-> MiniProtocolWithExpandedCtx muxMode addr ByteString m a b
forall a. HasCallStack => [a] -> a
head ([MiniProtocolWithExpandedCtx muxMode addr ByteString m a b]
-> MiniProtocolWithExpandedCtx muxMode addr ByteString m a b)
-> OuroborosBundle
muxMode
(ExpandedInitiatorContext addr m)
(ResponderContext addr)
ByteString
m
a
b
-> TemperatureBundle
(MiniProtocolWithExpandedCtx muxMode addr ByteString m a b)
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> OuroborosBundle
muxMode
(ExpandedInitiatorContext addr m)
(ResponderContext addr)
ByteString
m
a
b
bundle)
TemperatureBundle
(STM m ControlMessage
-> (MiniProtocolWithExpandedCtx muxMode addr ByteString m a b,
STM m ControlMessage))
-> TemperatureBundle (STM m ControlMessage)
-> TemperatureBundle
(MiniProtocolWithExpandedCtx muxMode addr ByteString m a b,
STM m ControlMessage)
forall a b.
TemperatureBundle (a -> b)
-> TemperatureBundle a -> TemperatureBundle b
forall (f :: * -> *) a b. Applicative f => f (a -> b) -> f a -> f b
<*> (StrictTVar m ControlMessage -> STM m ControlMessage
forall (m :: * -> *) a. MonadSTM m => StrictTVar m a -> STM m a
readTVar (StrictTVar m ControlMessage -> STM m ControlMessage)
-> TemperatureBundle (StrictTVar m ControlMessage)
-> TemperatureBundle (STM m ControlMessage)
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> TemperatureBundle (StrictTVar m ControlMessage)
controlBundle))
traverse (atomically >=> either throwIO return)
bundle'
where
runInitiator :: MiniProtocolWithExpandedCtx muxMode addr ByteString m a b
-> ControlMessageSTM m
-> m (STM m (Either SomeException a))
runInitiator :: MiniProtocolWithExpandedCtx muxMode addr ByteString m a b
-> STM m ControlMessage -> m (STM m (Either SomeException a))
runInitiator MiniProtocolWithExpandedCtx muxMode addr ByteString m a b
ptcl STM m ControlMessage
controlMessage =
Mux muxMode m
-> MiniProtocolNum
-> MiniProtocolDirection muxMode
-> StartOnDemandOrEagerly
-> (ByteChannel m -> m (a, Maybe ByteString))
-> m (STM m (Either SomeException a))
forall (mode :: Mode) (m :: * -> *) a.
(Alternative (STM m), MonadSTM m, MonadThrow m,
MonadThrow (STM m)) =>
Mux mode m
-> MiniProtocolNum
-> MiniProtocolDirection mode
-> StartOnDemandOrEagerly
-> (ByteChannel m -> m (a, Maybe ByteString))
-> m (STM m (Either SomeException a))
Mx.runMiniProtocol
Mux muxMode m
mux
(MiniProtocolWithExpandedCtx muxMode addr ByteString m a b
-> MiniProtocolNum
forall (mode :: Mode) initiatorCtx responderCtx bytes (m :: * -> *)
a b.
MiniProtocol mode initiatorCtx responderCtx bytes m a b
-> MiniProtocolNum
miniProtocolNum MiniProtocolWithExpandedCtx muxMode addr ByteString m a b
ptcl)
(case SingMuxMode muxMode
singMuxMode of
SingMuxMode muxMode
SingInitiatorMode -> MiniProtocolDirection muxMode
MiniProtocolDirection 'InitiatorMode
Mx.InitiatorDirectionOnly
SingMuxMode muxMode
SingInitiatorResponderMode -> MiniProtocolDirection muxMode
MiniProtocolDirection 'InitiatorResponderMode
Mx.InitiatorDirection)
StartOnDemandOrEagerly
Mx.StartEagerly
(MiniProtocolCb (ExpandedInitiatorContext addr m) ByteString m a
-> ExpandedInitiatorContext addr m
-> ByteChannel m
-> m (a, Maybe ByteString)
forall ctx bytes (m :: * -> *) a.
MiniProtocolCb ctx bytes m a
-> ctx -> Channel m bytes -> m (a, Maybe bytes)
runMiniProtocolCb
(case MiniProtocolWithExpandedCtx muxMode addr ByteString m a b
-> RunMiniProtocol
muxMode
(ExpandedInitiatorContext addr m)
(ResponderContext addr)
ByteString
m
a
b
forall (mode :: Mode) initiatorCtx responderCtx bytes (m :: * -> *)
a b.
MiniProtocol mode initiatorCtx responderCtx bytes m a b
-> RunMiniProtocol mode initiatorCtx responderCtx bytes m a b
miniProtocolRun MiniProtocolWithExpandedCtx muxMode addr ByteString m a b
ptcl of
InitiatorProtocolOnly MiniProtocolCb (ExpandedInitiatorContext addr m) ByteString m a
initiator -> MiniProtocolCb (ExpandedInitiatorContext addr m) ByteString m a
initiator
InitiatorAndResponderProtocol MiniProtocolCb (ExpandedInitiatorContext addr m) ByteString m a
initiator MiniProtocolCb (ResponderContext addr) ByteString m b
_ -> MiniProtocolCb (ExpandedInitiatorContext addr m) ByteString m a
initiator)
ExpandedInitiatorContext addr m
initiatorCtx)
where
initiatorCtx :: ExpandedInitiatorContext addr m
initiatorCtx = ExpandedInitiatorContext {
eicConnectionId :: ConnectionId addr
eicConnectionId = ConnectionId addr
connId,
eicControlMessage :: STM m ControlMessage
eicControlMessage = STM m ControlMessage
controlMessage,
eicIsBigLedgerPeer :: IsBigLedgerPeer
eicIsBigLedgerPeer = IsBigLedgerPeer
IsNotBigLedgerPeer
}
maxAcceptedConnectionsLimit :: AcceptedConnectionsLimit
maxAcceptedConnectionsLimit :: AcceptedConnectionsLimit
maxAcceptedConnectionsLimit = Word32 -> Word32 -> DiffTime -> AcceptedConnectionsLimit
AcceptedConnectionsLimit Word32
forall a. Bounded a => a
maxBound Word32
forall a. Bounded a => a
maxBound DiffTime
0
unidirectionalExperiment
:: forall peerAddr socket acc req resp m.
( ConnectionManagerMonad m
, MonadAsync m
, MonadDelay m
, MonadFix m
, MonadLabelledSTM m
, MonadTraceSTM m
, MonadSay m
, acc ~ [req], resp ~ [req]
, Ord peerAddr, Show peerAddr, Typeable peerAddr, Eq peerAddr
, Hashable peerAddr
, Serialise req, Show req
, Serialise resp, Show resp, Eq resp
, Typeable req, Typeable resp
)
=> StdGen
-> Timeouts
-> Snocket m socket peerAddr
-> Mx.MakeBearer m socket
-> (socket -> m ())
-> socket
-> ClientAndServerData req
-> m Property
unidirectionalExperiment :: forall peerAddr socket acc req resp (m :: * -> *).
(ConnectionManagerMonad m, MonadAsync m, MonadDelay m, MonadFix m,
MonadLabelledSTM m, MonadTraceSTM m, MonadSay m, acc ~ [req],
resp ~ [req], Ord peerAddr, Show peerAddr, Typeable peerAddr,
Eq peerAddr, Hashable peerAddr, Serialise req, Show req,
Serialise resp, Show resp, Eq resp, Typeable req, Typeable resp) =>
StdGen
-> Timeouts
-> Snocket m socket peerAddr
-> MakeBearer m socket
-> (socket -> m ())
-> socket
-> ClientAndServerData req
-> m Property
unidirectionalExperiment StdGen
stdGen Timeouts
timeouts Snocket m socket peerAddr
snocket MakeBearer m socket
makeBearer socket -> m ()
confSock socket
socket ClientAndServerData req
clientAndServerData = do
let (StdGen
stdGen', StdGen
stdGen'') = StdGen -> (StdGen, StdGen)
forall g. RandomGen g => g -> (g, g)
Random.split StdGen
stdGen
nextReqs <- ClientAndServerData req
-> m (TemperatureBundle (ConnectionId peerAddr -> STM m [req]))
forall req peerAddr (m :: * -> *).
MonadSTM m =>
ClientAndServerData req
-> m (TemperatureBundle (ConnectionId peerAddr -> STM m [req]))
oneshotNextRequests ClientAndServerData req
clientAndServerData
connStateIdSupply <- atomically $ CM.newConnStateIdSupply (Proxy @m)
withInitiatorOnlyConnectionManager
"client" timeouts nullTracer nullTracer stdGen' snocket makeBearer connStateIdSupply Nothing nextReqs
timeLimitsHandshake maxAcceptedConnectionsLimit
$ \ConnectionManagerWithExpandedCtx
'InitiatorMode
socket
peerAddr
DataFlowProtocolData
UnversionedProtocol
ByteString
m
[resp]
Void
connectionManager ->
String
-> Timeouts
-> Tracer m (WithName String (RemoteTransitionTrace peerAddr))
-> Tracer m (WithName String (AbstractTransitionTrace ConnStateId))
-> Tracer
m
(WithName
String
(Trace
peerAddr
(ConnectionHandlerTrace UnversionedProtocol DataFlowProtocolData)))
-> Tracer m (WithName String (Trace peerAddr))
-> Tracers' m (WithNameAndBearer String peerAddr)
-> Tracer m (WithName String (Debug peerAddr DataFlowProtocolData))
-> StdGen
-> Snocket m socket peerAddr
-> MakeBearer m socket
-> ConnStateIdSupply m
-> (socket -> m ())
-> socket
-> Maybe peerAddr
-> [req]
-> TemperatureBundle (ConnectionId peerAddr -> STM m [req])
-> ProtocolTimeLimits (Handshake UnversionedProtocol Term)
-> AcceptedConnectionsLimit
-> (ConnectionManager
'InitiatorResponderMode
socket
peerAddr
(Handle
'InitiatorResponderMode
(ExpandedInitiatorContext peerAddr m)
(ResponderContext peerAddr)
DataFlowProtocolData
ByteString
m
[[req]]
[req])
(HandlerError UnversionedProtocol)
m
-> peerAddr -> Async m Void -> m Property)
-> m Property
forall name peerAddr socket acc req resp (m :: * -> *) a.
(ConnectionManagerMonad m, acc ~ [req], resp ~ [req], Ord peerAddr,
Show peerAddr, Typeable peerAddr, Serialise req, Typeable req,
MonadAsync m, MonadDelay m, MonadFix m, MonadLabelledSTM m,
MonadTraceSTM m, MonadSay m, Show req, Show name) =>
name
-> Timeouts
-> Tracer m (WithName name (RemoteTransitionTrace peerAddr))
-> Tracer m (WithName name (AbstractTransitionTrace ConnStateId))
-> Tracer
m
(WithName
name
(Trace
peerAddr
(ConnectionHandlerTrace UnversionedProtocol DataFlowProtocolData)))
-> Tracer m (WithName name (Trace peerAddr))
-> Tracers' m (WithNameAndBearer name peerAddr)
-> Tracer m (WithName name (Debug peerAddr DataFlowProtocolData))
-> StdGen
-> Snocket m socket peerAddr
-> MakeBearer m socket
-> ConnStateIdSupply m
-> (socket -> m ())
-> socket
-> Maybe peerAddr
-> acc
-> TemperatureBundle (ConnectionId peerAddr -> STM m [req])
-> ProtocolTimeLimits (Handshake UnversionedProtocol Term)
-> AcceptedConnectionsLimit
-> (ConnectionManagerWithExpandedCtx
'InitiatorResponderMode
socket
peerAddr
DataFlowProtocolData
UnversionedProtocol
ByteString
m
[resp]
acc
-> peerAddr -> Async m Void -> m a)
-> m a
withBidirectionalConnectionManager String
"server" Timeouts
timeouts
Tracer m (WithName String (RemoteTransitionTrace peerAddr))
forall (m :: * -> *) a. Applicative m => Tracer m a
nullTracer Tracer m (WithName String (AbstractTransitionTrace ConnStateId))
forall (m :: * -> *) a. Applicative m => Tracer m a
nullTracer Tracer
m
(WithName
String
(Trace
peerAddr
(ConnectionHandlerTrace UnversionedProtocol DataFlowProtocolData)))
forall (m :: * -> *) a. Applicative m => Tracer m a
nullTracer
Tracer m (WithName String (Trace peerAddr))
forall (m :: * -> *) a. Applicative m => Tracer m a
nullTracer Tracers' m (WithNameAndBearer String peerAddr)
forall (m :: * -> *) (f :: * -> *). Applicative m => Tracers' m f
Mx.nullTracers Tracer m (WithName String (Debug peerAddr DataFlowProtocolData))
forall (m :: * -> *) a. Applicative m => Tracer m a
nullTracer
StdGen
stdGen''
Snocket m socket peerAddr
snocket MakeBearer m socket
makeBearer ConnStateIdSupply m
connStateIdSupply
socket -> m ()
confSock socket
socket Maybe peerAddr
forall a. Maybe a
Nothing
[ClientAndServerData req -> req
forall req. ClientAndServerData req -> req
accumulatorInit ClientAndServerData req
clientAndServerData]
TemperatureBundle (ConnectionId peerAddr -> STM m [req])
forall (stm :: * -> *) req peerAddr.
Applicative stm =>
TemperatureBundle (ConnectionId peerAddr -> stm [req])
noNextRequests
ProtocolTimeLimits (Handshake UnversionedProtocol Term)
forall {k} (vNumber :: k).
ProtocolTimeLimits (Handshake vNumber Term)
timeLimitsHandshake
AcceptedConnectionsLimit
maxAcceptedConnectionsLimit
((ConnectionManager
'InitiatorResponderMode
socket
peerAddr
(Handle
'InitiatorResponderMode
(ExpandedInitiatorContext peerAddr m)
(ResponderContext peerAddr)
DataFlowProtocolData
ByteString
m
[[req]]
[req])
(HandlerError UnversionedProtocol)
m
-> peerAddr -> Async m Void -> m Property)
-> m Property)
-> (ConnectionManager
'InitiatorResponderMode
socket
peerAddr
(Handle
'InitiatorResponderMode
(ExpandedInitiatorContext peerAddr m)
(ResponderContext peerAddr)
DataFlowProtocolData
ByteString
m
[[req]]
[req])
(HandlerError UnversionedProtocol)
m
-> peerAddr -> Async m Void -> m Property)
-> m Property
forall a b. (a -> b) -> a -> b
$ \ConnectionManager
'InitiatorResponderMode
socket
peerAddr
(Handle
'InitiatorResponderMode
(ExpandedInitiatorContext peerAddr m)
(ResponderContext peerAddr)
DataFlowProtocolData
ByteString
m
[[req]]
[req])
(HandlerError UnversionedProtocol)
m
_ peerAddr
serverAddr Async m Void
_serverAsync -> do
(rs :: [Either SomeException (TemperatureBundle [resp])]) <-
Int
-> m (Either SomeException (TemperatureBundle [resp]))
-> m [Either SomeException (TemperatureBundle [resp])]
forall (m :: * -> *) a. Applicative m => Int -> m a -> m [a]
replicateM
(ClientAndServerData req -> Int
forall req. ClientAndServerData req -> Int
numberOfRounds ClientAndServerData req
clientAndServerData)
(m (Connected
peerAddr
(HandleWithExpandedCtx
'InitiatorMode
peerAddr
DataFlowProtocolData
ByteString
m
[resp]
Void)
(HandlerError UnversionedProtocol))
-> (Connected
peerAddr
(HandleWithExpandedCtx
'InitiatorMode
peerAddr
DataFlowProtocolData
ByteString
m
[resp]
Void)
(HandlerError UnversionedProtocol)
-> m (OperationResult AbstractState))
-> (Connected
peerAddr
(HandleWithExpandedCtx
'InitiatorMode
peerAddr
DataFlowProtocolData
ByteString
m
[resp]
Void)
(HandlerError UnversionedProtocol)
-> m (Either SomeException (TemperatureBundle [resp])))
-> m (Either SomeException (TemperatureBundle [resp]))
forall a b c. m a -> (a -> m b) -> (a -> m c) -> m c
forall (m :: * -> *) a b c.
MonadThrow m =>
m a -> (a -> m b) -> (a -> m c) -> m c
bracket
(ConnectionManagerWithExpandedCtx
'InitiatorMode
socket
peerAddr
DataFlowProtocolData
UnversionedProtocol
ByteString
m
[resp]
Void
-> AcquireOutboundConnection
peerAddr
(HandleWithExpandedCtx
'InitiatorMode
peerAddr
DataFlowProtocolData
ByteString
m
[resp]
Void)
(HandlerError UnversionedProtocol)
m
forall (muxMode :: Mode) socket peerAddr handle handleError
(m :: * -> *).
(HasInitiator muxMode ~ 'True) =>
ConnectionManager muxMode socket peerAddr handle handleError m
-> AcquireOutboundConnection peerAddr handle handleError m
acquireOutboundConnection ConnectionManagerWithExpandedCtx
'InitiatorMode
socket
peerAddr
DataFlowProtocolData
UnversionedProtocol
ByteString
m
[resp]
Void
connectionManager DiffusionMode
InitiatorOnlyDiffusionMode peerAddr
serverAddr)
(\case
Connected ConnectionId peerAddr
connId DataFlow
_ HandleWithExpandedCtx
'InitiatorMode
peerAddr
DataFlowProtocolData
ByteString
m
[resp]
Void
_ -> ConnectionManagerWithExpandedCtx
'InitiatorMode
socket
peerAddr
DataFlowProtocolData
UnversionedProtocol
ByteString
m
[resp]
Void
-> ConnectionId peerAddr -> m (OperationResult AbstractState)
forall (muxMode :: Mode) socket peerAddr handle handleError
(m :: * -> *).
(HasInitiator muxMode ~ 'True) =>
ConnectionManager muxMode socket peerAddr handle handleError m
-> ConnectionId peerAddr -> m (OperationResult AbstractState)
releaseOutboundConnection ConnectionManagerWithExpandedCtx
'InitiatorMode
socket
peerAddr
DataFlowProtocolData
UnversionedProtocol
ByteString
m
[resp]
Void
connectionManager ConnectionId peerAddr
connId
Disconnected {} -> String -> m (OperationResult AbstractState)
forall a. HasCallStack => String -> a
error String
"unidirectionalExperiment: impossible happened")
(\Connected
peerAddr
(HandleWithExpandedCtx
'InitiatorMode
peerAddr
DataFlowProtocolData
ByteString
m
[resp]
Void)
(HandlerError UnversionedProtocol)
connHandle -> do
case Connected
peerAddr
(HandleWithExpandedCtx
'InitiatorMode
peerAddr
DataFlowProtocolData
ByteString
m
[resp]
Void)
(HandlerError UnversionedProtocol)
connHandle of
Connected ConnectionId peerAddr
connId DataFlow
_ (Handle Mux 'InitiatorMode m
mux OuroborosBundle
'InitiatorMode
(ExpandedInitiatorContext peerAddr m)
(ResponderContext peerAddr)
ByteString
m
[resp]
Void
muxBundle TemperatureBundle (StrictTVar m ControlMessage)
controlBundle DataFlowProtocolData
_
:: HandleWithExpandedCtx Mx.InitiatorMode peerAddr
DataFlowProtocolData ByteString m [resp] Void) ->
forall (m :: * -> *) e a.
(MonadCatch m, Exception e) =>
m a -> m (Either e a)
try @_ @SomeException (m (TemperatureBundle [resp])
-> m (Either SomeException (TemperatureBundle [resp])))
-> m (TemperatureBundle [resp])
-> m (Either SomeException (TemperatureBundle [resp]))
forall a b. (a -> b) -> a -> b
$
(SingMuxMode 'InitiatorMode
-> Mux 'InitiatorMode m
-> OuroborosBundle
'InitiatorMode
(ExpandedInitiatorContext peerAddr m)
(ResponderContext peerAddr)
ByteString
m
[resp]
Void
-> TemperatureBundle (StrictTVar m ControlMessage)
-> ConnectionId peerAddr
-> m (TemperatureBundle [resp])
forall (muxMode :: Mode) addr (m :: * -> *) a b.
(Alternative (STM m), MonadAsync m, MonadCatch m,
MonadLabelledSTM m, MonadMask m, MonadSTM m, MonadThrow (STM m),
HasInitiator muxMode ~ 'True, MonadSay m) =>
SingMuxMode muxMode
-> Mux muxMode m
-> OuroborosBundle
muxMode
(ExpandedInitiatorContext addr m)
(ResponderContext addr)
ByteString
m
a
b
-> TemperatureBundle (StrictTVar m ControlMessage)
-> ConnectionId addr
-> m (TemperatureBundle a)
runInitiatorProtocols
SingMuxMode 'InitiatorMode
SingInitiatorMode Mux 'InitiatorMode m
mux OuroborosBundle
'InitiatorMode
(ExpandedInitiatorContext peerAddr m)
(ResponderContext peerAddr)
ByteString
m
[resp]
Void
muxBundle TemperatureBundle (StrictTVar m ControlMessage)
controlBundle ConnectionId peerAddr
connId
:: m (TemperatureBundle [resp])
)
Disconnected ConnectionId peerAddr
_ DisconnectionException (HandlerError UnversionedProtocol)
err ->
IOError -> m (Either SomeException (TemperatureBundle [resp]))
forall e a. Exception e => e -> m a
forall (m :: * -> *) e a. (MonadThrow m, Exception e) => e -> m a
throwIO (String -> IOError
userError (String -> IOError) -> String -> IOError
forall a b. (a -> b) -> a -> b
$ String
"unidirectionalExperiment: " String -> ShowS
forall a. [a] -> [a] -> [a]
++ DisconnectionException (HandlerError UnversionedProtocol) -> String
forall a. Show a => a -> String
show DisconnectionException (HandlerError UnversionedProtocol)
err))
)
pure $
foldr
(\ (Either SomeException (TemperatureBundle [resp])
r, TemperatureBundle [resp]
expected) Property
acc ->
case Either SomeException (TemperatureBundle [resp])
r of
Left SomeException
err -> String -> Bool -> Property
forall prop. Testable prop => String -> prop -> Property
counterexample (SomeException -> String
forall a. Show a => a -> String
show SomeException
err) Bool
False
Right TemperatureBundle [resp]
a -> TemperatureBundle [resp]
a TemperatureBundle [resp] -> TemperatureBundle [resp] -> Property
forall a. (Eq a, Show a) => a -> a -> Property
=== TemperatureBundle [resp]
expected Property -> Property -> Property
forall prop1 prop2.
(Testable prop1, Testable prop2) =>
prop1 -> prop2 -> Property
.&&. Property
acc)
(property True)
$ zip rs (expectedResult clientAndServerData clientAndServerData)
bidirectionalExperiment
:: forall peerAddr socket acc req resp m.
( ConnectionManagerMonad m
, MonadAsync m
, MonadDelay m
, MonadFix m
, MonadLabelledSTM m
, MonadTraceSTM m
, MonadSay m
, acc ~ [req], resp ~ [req]
, Ord peerAddr, Show peerAddr, Typeable peerAddr, Eq peerAddr
, Hashable peerAddr
, Serialise req, Show req
, Serialise resp, Show resp, Eq resp
, Typeable req, Typeable resp
, Show acc
)
=> Bool
-> StdGen
-> Timeouts
-> Snocket m socket peerAddr
-> Mx.MakeBearer m socket
-> (socket -> m ())
-> socket
-> socket
-> peerAddr
-> peerAddr
-> ClientAndServerData req
-> ClientAndServerData req
-> m Property
bidirectionalExperiment :: forall peerAddr socket acc req resp (m :: * -> *).
(ConnectionManagerMonad m, MonadAsync m, MonadDelay m, MonadFix m,
MonadLabelledSTM m, MonadTraceSTM m, MonadSay m, acc ~ [req],
resp ~ [req], Ord peerAddr, Show peerAddr, Typeable peerAddr,
Eq peerAddr, Hashable peerAddr, Serialise req, Show req,
Serialise resp, Show resp, Eq resp, Typeable req, Typeable resp,
Show acc) =>
Bool
-> StdGen
-> Timeouts
-> Snocket m socket peerAddr
-> MakeBearer m socket
-> (socket -> m ())
-> socket
-> socket
-> peerAddr
-> peerAddr
-> ClientAndServerData req
-> ClientAndServerData req
-> m Property
bidirectionalExperiment
Bool
useLock StdGen
stdGen Timeouts
timeouts Snocket m socket peerAddr
snocket MakeBearer m socket
makeBearer socket -> m ()
confSock socket
socket0 socket
socket1 peerAddr
localAddr0 peerAddr
localAddr1
ClientAndServerData req
clientAndServerData0 ClientAndServerData req
clientAndServerData1 = do
let (StdGen
stdGen', StdGen
stdGen'') = StdGen -> (StdGen, StdGen)
forall g. RandomGen g => g -> (g, g)
Random.split StdGen
stdGen
lock <- () -> m (StrictTMVar m ())
forall (m :: * -> *) a. MonadSTM m => a -> m (StrictTMVar m a)
newTMVarIO ()
connStateIdSupply <- atomically $ CM.newConnStateIdSupply (Proxy @m)
nextRequests0 <- oneshotNextRequests clientAndServerData0
nextRequests1 <- oneshotNextRequests clientAndServerData1
withBidirectionalConnectionManager "node-0" timeouts
nullTracer nullTracer nullTracer nullTracer Mx.nullTracers
nullTracer stdGen' snocket makeBearer
connStateIdSupply confSock
socket0 (Just localAddr0)
[accumulatorInit clientAndServerData0]
nextRequests0
noTimeLimitsHandshake
maxAcceptedConnectionsLimit
(\ConnectionManagerWithExpandedCtx
'InitiatorResponderMode
socket
peerAddr
DataFlowProtocolData
UnversionedProtocol
ByteString
m
[resp]
[req]
connectionManager0 peerAddr
_serverAddr0 Async m Void
_serverAsync0 -> do
String
-> Timeouts
-> Tracer m (WithName String (RemoteTransitionTrace peerAddr))
-> Tracer m (WithName String (AbstractTransitionTrace ConnStateId))
-> Tracer
m
(WithName
String
(Trace
peerAddr
(ConnectionHandlerTrace UnversionedProtocol DataFlowProtocolData)))
-> Tracer m (WithName String (Trace peerAddr))
-> Tracers' m (WithNameAndBearer String peerAddr)
-> Tracer m (WithName String (Debug peerAddr DataFlowProtocolData))
-> StdGen
-> Snocket m socket peerAddr
-> MakeBearer m socket
-> ConnStateIdSupply m
-> (socket -> m ())
-> socket
-> Maybe peerAddr
-> [req]
-> TemperatureBundle (ConnectionId peerAddr -> STM m [req])
-> ProtocolTimeLimits (Handshake UnversionedProtocol Term)
-> AcceptedConnectionsLimit
-> (ConnectionManagerWithExpandedCtx
'InitiatorResponderMode
socket
peerAddr
DataFlowProtocolData
UnversionedProtocol
ByteString
m
[resp]
[req]
-> peerAddr -> Async m Void -> m Property)
-> m Property
forall name peerAddr socket acc req resp (m :: * -> *) a.
(ConnectionManagerMonad m, acc ~ [req], resp ~ [req], Ord peerAddr,
Show peerAddr, Typeable peerAddr, Serialise req, Typeable req,
MonadAsync m, MonadDelay m, MonadFix m, MonadLabelledSTM m,
MonadTraceSTM m, MonadSay m, Show req, Show name) =>
name
-> Timeouts
-> Tracer m (WithName name (RemoteTransitionTrace peerAddr))
-> Tracer m (WithName name (AbstractTransitionTrace ConnStateId))
-> Tracer
m
(WithName
name
(Trace
peerAddr
(ConnectionHandlerTrace UnversionedProtocol DataFlowProtocolData)))
-> Tracer m (WithName name (Trace peerAddr))
-> Tracers' m (WithNameAndBearer name peerAddr)
-> Tracer m (WithName name (Debug peerAddr DataFlowProtocolData))
-> StdGen
-> Snocket m socket peerAddr
-> MakeBearer m socket
-> ConnStateIdSupply m
-> (socket -> m ())
-> socket
-> Maybe peerAddr
-> acc
-> TemperatureBundle (ConnectionId peerAddr -> STM m [req])
-> ProtocolTimeLimits (Handshake UnversionedProtocol Term)
-> AcceptedConnectionsLimit
-> (ConnectionManagerWithExpandedCtx
'InitiatorResponderMode
socket
peerAddr
DataFlowProtocolData
UnversionedProtocol
ByteString
m
[resp]
acc
-> peerAddr -> Async m Void -> m a)
-> m a
withBidirectionalConnectionManager String
"node-1" Timeouts
timeouts
Tracer m (WithName String (RemoteTransitionTrace peerAddr))
forall (m :: * -> *) a. Applicative m => Tracer m a
nullTracer Tracer m (WithName String (AbstractTransitionTrace ConnStateId))
forall (m :: * -> *) a. Applicative m => Tracer m a
nullTracer Tracer
m
(WithName
String
(Trace
peerAddr
(ConnectionHandlerTrace UnversionedProtocol DataFlowProtocolData)))
forall (m :: * -> *) a. Applicative m => Tracer m a
nullTracer Tracer m (WithName String (Trace peerAddr))
forall (m :: * -> *) a. Applicative m => Tracer m a
nullTracer Tracers' m (WithNameAndBearer String peerAddr)
forall (m :: * -> *) (f :: * -> *). Applicative m => Tracers' m f
Mx.nullTracers
Tracer m (WithName String (Debug peerAddr DataFlowProtocolData))
forall (m :: * -> *) a. Applicative m => Tracer m a
nullTracer StdGen
stdGen'' Snocket m socket peerAddr
snocket MakeBearer m socket
makeBearer
ConnStateIdSupply m
connStateIdSupply socket -> m ()
confSock
socket
socket1 (peerAddr -> Maybe peerAddr
forall a. a -> Maybe a
Just peerAddr
localAddr1)
[ClientAndServerData req -> req
forall req. ClientAndServerData req -> req
accumulatorInit ClientAndServerData req
clientAndServerData1]
TemperatureBundle (ConnectionId peerAddr -> STM m [req])
nextRequests1
ProtocolTimeLimits (Handshake UnversionedProtocol Term)
forall {k} (vNumber :: k).
ProtocolTimeLimits (Handshake vNumber Term)
noTimeLimitsHandshake
AcceptedConnectionsLimit
maxAcceptedConnectionsLimit
(\ConnectionManagerWithExpandedCtx
'InitiatorResponderMode
socket
peerAddr
DataFlowProtocolData
UnversionedProtocol
ByteString
m
[resp]
[req]
connectionManager1 peerAddr
_serverAddr1 Async m Void
serverAsync1 -> do
Async m Void -> m ()
forall (m :: * -> *) a.
(MonadAsync m, MonadFork m, MonadMask m) =>
Async m a -> m ()
link Async m Void
serverAsync1
( rs0 :: [Either SomeException (TemperatureBundle [resp])]
, rs1 :: [Either SomeException (TemperatureBundle [resp])]
) <-
Int
-> m (Either SomeException (TemperatureBundle [resp]))
-> m [Either SomeException (TemperatureBundle [resp])]
forall (m :: * -> *) a. Applicative m => Int -> m a -> m [a]
replicateM
(ClientAndServerData req -> Int
forall req. ClientAndServerData req -> Int
numberOfRounds ClientAndServerData req
clientAndServerData0)
(m (Connected
peerAddr
(HandleWithExpandedCtx
'InitiatorResponderMode
peerAddr
DataFlowProtocolData
ByteString
m
[resp]
[req])
(HandlerError UnversionedProtocol))
-> (Connected
peerAddr
(HandleWithExpandedCtx
'InitiatorResponderMode
peerAddr
DataFlowProtocolData
ByteString
m
[resp]
[req])
(HandlerError UnversionedProtocol)
-> m (OperationResult AbstractState))
-> (Connected
peerAddr
(HandleWithExpandedCtx
'InitiatorResponderMode
peerAddr
DataFlowProtocolData
ByteString
m
[resp]
[req])
(HandlerError UnversionedProtocol)
-> m (Either SomeException (TemperatureBundle [resp])))
-> m (Either SomeException (TemperatureBundle [resp]))
forall a b c. m a -> (a -> m b) -> (a -> m c) -> m c
forall (m :: * -> *) a b c.
MonadThrow m =>
m a -> (a -> m b) -> (a -> m c) -> m c
bracket
(Bool
-> StrictTMVar m ()
-> m (Connected
peerAddr
(HandleWithExpandedCtx
'InitiatorResponderMode
peerAddr
DataFlowProtocolData
ByteString
m
[resp]
[req])
(HandlerError UnversionedProtocol))
-> m (Connected
peerAddr
(HandleWithExpandedCtx
'InitiatorResponderMode
peerAddr
DataFlowProtocolData
ByteString
m
[resp]
[req])
(HandlerError UnversionedProtocol))
forall (m :: * -> *) a.
(MonadSTM m, MonadThrow m) =>
Bool -> StrictTMVar m () -> m a -> m a
withLock Bool
useLock StrictTMVar m ()
lock
(ConnectionManagerWithExpandedCtx
'InitiatorResponderMode
socket
peerAddr
DataFlowProtocolData
UnversionedProtocol
ByteString
m
[resp]
[req]
-> AcquireOutboundConnection
peerAddr
(HandleWithExpandedCtx
'InitiatorResponderMode
peerAddr
DataFlowProtocolData
ByteString
m
[resp]
[req])
(HandlerError UnversionedProtocol)
m
forall (muxMode :: Mode) socket peerAddr handle handleError
(m :: * -> *).
(HasInitiator muxMode ~ 'True) =>
ConnectionManager muxMode socket peerAddr handle handleError m
-> AcquireOutboundConnection peerAddr handle handleError m
acquireOutboundConnection
ConnectionManagerWithExpandedCtx
'InitiatorResponderMode
socket
peerAddr
DataFlowProtocolData
UnversionedProtocol
ByteString
m
[resp]
[req]
connectionManager0
DiffusionMode
InitiatorAndResponderDiffusionMode
peerAddr
localAddr1))
(\case
Connected ConnectionId peerAddr
connId DataFlow
_ HandleWithExpandedCtx
'InitiatorResponderMode
peerAddr
DataFlowProtocolData
ByteString
m
[resp]
[req]
_ ->
ConnectionManagerWithExpandedCtx
'InitiatorResponderMode
socket
peerAddr
DataFlowProtocolData
UnversionedProtocol
ByteString
m
[resp]
[req]
-> ConnectionId peerAddr -> m (OperationResult AbstractState)
forall (muxMode :: Mode) socket peerAddr handle handleError
(m :: * -> *).
(HasInitiator muxMode ~ 'True) =>
ConnectionManager muxMode socket peerAddr handle handleError m
-> ConnectionId peerAddr -> m (OperationResult AbstractState)
releaseOutboundConnection
ConnectionManagerWithExpandedCtx
'InitiatorResponderMode
socket
peerAddr
DataFlowProtocolData
UnversionedProtocol
ByteString
m
[resp]
[req]
connectionManager0
ConnectionId peerAddr
connId
Disconnected {} ->
String -> m (OperationResult AbstractState)
forall a. HasCallStack => String -> a
error String
"bidirectionalExperiment: impossible happened")
(\Connected
peerAddr
(HandleWithExpandedCtx
'InitiatorResponderMode
peerAddr
DataFlowProtocolData
ByteString
m
[resp]
[req])
(HandlerError UnversionedProtocol)
connHandle ->
case Connected
peerAddr
(HandleWithExpandedCtx
'InitiatorResponderMode
peerAddr
DataFlowProtocolData
ByteString
m
[resp]
[req])
(HandlerError UnversionedProtocol)
connHandle of
Connected ConnectionId peerAddr
connId DataFlow
_ (Handle Mux 'InitiatorResponderMode m
mux OuroborosBundle
'InitiatorResponderMode
(ExpandedInitiatorContext peerAddr m)
(ResponderContext peerAddr)
ByteString
m
[resp]
[req]
muxBundle TemperatureBundle (StrictTVar m ControlMessage)
controlBundle DataFlowProtocolData
_) -> do
forall (m :: * -> *) e a.
(MonadCatch m, Exception e) =>
m a -> m (Either e a)
try @_ @SomeException (m (TemperatureBundle [resp])
-> m (Either SomeException (TemperatureBundle [resp])))
-> m (TemperatureBundle [resp])
-> m (Either SomeException (TemperatureBundle [resp]))
forall a b. (a -> b) -> a -> b
$
SingMuxMode 'InitiatorResponderMode
-> Mux 'InitiatorResponderMode m
-> OuroborosBundle
'InitiatorResponderMode
(ExpandedInitiatorContext peerAddr m)
(ResponderContext peerAddr)
ByteString
m
[resp]
[req]
-> TemperatureBundle (StrictTVar m ControlMessage)
-> ConnectionId peerAddr
-> m (TemperatureBundle [resp])
forall (muxMode :: Mode) addr (m :: * -> *) a b.
(Alternative (STM m), MonadAsync m, MonadCatch m,
MonadLabelledSTM m, MonadMask m, MonadSTM m, MonadThrow (STM m),
HasInitiator muxMode ~ 'True, MonadSay m) =>
SingMuxMode muxMode
-> Mux muxMode m
-> OuroborosBundle
muxMode
(ExpandedInitiatorContext addr m)
(ResponderContext addr)
ByteString
m
a
b
-> TemperatureBundle (StrictTVar m ControlMessage)
-> ConnectionId addr
-> m (TemperatureBundle a)
runInitiatorProtocols
SingMuxMode 'InitiatorResponderMode
SingInitiatorResponderMode
Mux 'InitiatorResponderMode m
mux OuroborosBundle
'InitiatorResponderMode
(ExpandedInitiatorContext peerAddr m)
(ResponderContext peerAddr)
ByteString
m
[resp]
[req]
muxBundle TemperatureBundle (StrictTVar m ControlMessage)
controlBundle ConnectionId peerAddr
connId
Disconnected ConnectionId peerAddr
_ DisconnectionException (HandlerError UnversionedProtocol)
err ->
IOError -> m (Either SomeException (TemperatureBundle [resp]))
forall e a. Exception e => e -> m a
forall (m :: * -> *) e a. (MonadThrow m, Exception e) => e -> m a
throwIO (String -> IOError
userError (String -> IOError) -> String -> IOError
forall a b. (a -> b) -> a -> b
$ String
"bidirectionalExperiment: " String -> ShowS
forall a. [a] -> [a] -> [a]
++ DisconnectionException (HandlerError UnversionedProtocol) -> String
forall a. Show a => a -> String
show DisconnectionException (HandlerError UnversionedProtocol)
err)
))
m [Either SomeException (TemperatureBundle [resp])]
-> m [Either SomeException (TemperatureBundle [resp])]
-> m ([Either SomeException (TemperatureBundle [resp])],
[Either SomeException (TemperatureBundle [resp])])
forall a b. m a -> m b -> m (a, b)
forall (m :: * -> *) a b. MonadAsync m => m a -> m b -> m (a, b)
`concurrently`
Int
-> m (Either SomeException (TemperatureBundle [resp]))
-> m [Either SomeException (TemperatureBundle [resp])]
forall (m :: * -> *) a. Applicative m => Int -> m a -> m [a]
replicateM
(ClientAndServerData req -> Int
forall req. ClientAndServerData req -> Int
numberOfRounds ClientAndServerData req
clientAndServerData1)
(m (Connected
peerAddr
(HandleWithExpandedCtx
'InitiatorResponderMode
peerAddr
DataFlowProtocolData
ByteString
m
[resp]
[req])
(HandlerError UnversionedProtocol))
-> (Connected
peerAddr
(HandleWithExpandedCtx
'InitiatorResponderMode
peerAddr
DataFlowProtocolData
ByteString
m
[resp]
[req])
(HandlerError UnversionedProtocol)
-> m (OperationResult AbstractState))
-> (Connected
peerAddr
(HandleWithExpandedCtx
'InitiatorResponderMode
peerAddr
DataFlowProtocolData
ByteString
m
[resp]
[req])
(HandlerError UnversionedProtocol)
-> m (Either SomeException (TemperatureBundle [resp])))
-> m (Either SomeException (TemperatureBundle [resp]))
forall a b c. m a -> (a -> m b) -> (a -> m c) -> m c
forall (m :: * -> *) a b c.
MonadThrow m =>
m a -> (a -> m b) -> (a -> m c) -> m c
bracket
(Bool
-> StrictTMVar m ()
-> m (Connected
peerAddr
(HandleWithExpandedCtx
'InitiatorResponderMode
peerAddr
DataFlowProtocolData
ByteString
m
[resp]
[req])
(HandlerError UnversionedProtocol))
-> m (Connected
peerAddr
(HandleWithExpandedCtx
'InitiatorResponderMode
peerAddr
DataFlowProtocolData
ByteString
m
[resp]
[req])
(HandlerError UnversionedProtocol))
forall (m :: * -> *) a.
(MonadSTM m, MonadThrow m) =>
Bool -> StrictTMVar m () -> m a -> m a
withLock Bool
useLock StrictTMVar m ()
lock
(ConnectionManagerWithExpandedCtx
'InitiatorResponderMode
socket
peerAddr
DataFlowProtocolData
UnversionedProtocol
ByteString
m
[resp]
[req]
-> AcquireOutboundConnection
peerAddr
(HandleWithExpandedCtx
'InitiatorResponderMode
peerAddr
DataFlowProtocolData
ByteString
m
[resp]
[req])
(HandlerError UnversionedProtocol)
m
forall (muxMode :: Mode) socket peerAddr handle handleError
(m :: * -> *).
(HasInitiator muxMode ~ 'True) =>
ConnectionManager muxMode socket peerAddr handle handleError m
-> AcquireOutboundConnection peerAddr handle handleError m
acquireOutboundConnection
ConnectionManagerWithExpandedCtx
'InitiatorResponderMode
socket
peerAddr
DataFlowProtocolData
UnversionedProtocol
ByteString
m
[resp]
[req]
connectionManager1
DiffusionMode
InitiatorAndResponderDiffusionMode
peerAddr
localAddr0))
(\case
Connected ConnectionId peerAddr
connId DataFlow
_ HandleWithExpandedCtx
'InitiatorResponderMode
peerAddr
DataFlowProtocolData
ByteString
m
[resp]
[req]
_ ->
ConnectionManagerWithExpandedCtx
'InitiatorResponderMode
socket
peerAddr
DataFlowProtocolData
UnversionedProtocol
ByteString
m
[resp]
[req]
-> ConnectionId peerAddr -> m (OperationResult AbstractState)
forall (muxMode :: Mode) socket peerAddr handle handleError
(m :: * -> *).
(HasInitiator muxMode ~ 'True) =>
ConnectionManager muxMode socket peerAddr handle handleError m
-> ConnectionId peerAddr -> m (OperationResult AbstractState)
releaseOutboundConnection
ConnectionManagerWithExpandedCtx
'InitiatorResponderMode
socket
peerAddr
DataFlowProtocolData
UnversionedProtocol
ByteString
m
[resp]
[req]
connectionManager1
ConnectionId peerAddr
connId
Disconnected {} ->
String -> m (OperationResult AbstractState)
forall a. HasCallStack => String -> a
error String
"ibidirectionalExperiment: impossible happened")
(\Connected
peerAddr
(HandleWithExpandedCtx
'InitiatorResponderMode
peerAddr
DataFlowProtocolData
ByteString
m
[resp]
[req])
(HandlerError UnversionedProtocol)
connHandle ->
case Connected
peerAddr
(HandleWithExpandedCtx
'InitiatorResponderMode
peerAddr
DataFlowProtocolData
ByteString
m
[resp]
[req])
(HandlerError UnversionedProtocol)
connHandle of
Connected ConnectionId peerAddr
connId DataFlow
_ (Handle Mux 'InitiatorResponderMode m
mux OuroborosBundle
'InitiatorResponderMode
(ExpandedInitiatorContext peerAddr m)
(ResponderContext peerAddr)
ByteString
m
[resp]
[req]
muxBundle TemperatureBundle (StrictTVar m ControlMessage)
controlBundle DataFlowProtocolData
_) -> do
forall (m :: * -> *) e a.
(MonadCatch m, Exception e) =>
m a -> m (Either e a)
try @_ @SomeException (m (TemperatureBundle [resp])
-> m (Either SomeException (TemperatureBundle [resp])))
-> m (TemperatureBundle [resp])
-> m (Either SomeException (TemperatureBundle [resp]))
forall a b. (a -> b) -> a -> b
$
SingMuxMode 'InitiatorResponderMode
-> Mux 'InitiatorResponderMode m
-> OuroborosBundle
'InitiatorResponderMode
(ExpandedInitiatorContext peerAddr m)
(ResponderContext peerAddr)
ByteString
m
[resp]
[req]
-> TemperatureBundle (StrictTVar m ControlMessage)
-> ConnectionId peerAddr
-> m (TemperatureBundle [resp])
forall (muxMode :: Mode) addr (m :: * -> *) a b.
(Alternative (STM m), MonadAsync m, MonadCatch m,
MonadLabelledSTM m, MonadMask m, MonadSTM m, MonadThrow (STM m),
HasInitiator muxMode ~ 'True, MonadSay m) =>
SingMuxMode muxMode
-> Mux muxMode m
-> OuroborosBundle
muxMode
(ExpandedInitiatorContext addr m)
(ResponderContext addr)
ByteString
m
a
b
-> TemperatureBundle (StrictTVar m ControlMessage)
-> ConnectionId addr
-> m (TemperatureBundle a)
runInitiatorProtocols
SingMuxMode 'InitiatorResponderMode
SingInitiatorResponderMode
Mux 'InitiatorResponderMode m
mux OuroborosBundle
'InitiatorResponderMode
(ExpandedInitiatorContext peerAddr m)
(ResponderContext peerAddr)
ByteString
m
[resp]
[req]
muxBundle TemperatureBundle (StrictTVar m ControlMessage)
controlBundle ConnectionId peerAddr
connId
Disconnected ConnectionId peerAddr
_ DisconnectionException (HandlerError UnversionedProtocol)
err ->
IOError -> m (Either SomeException (TemperatureBundle [resp]))
forall e a. Exception e => e -> m a
forall (m :: * -> *) e a. (MonadThrow m, Exception e) => e -> m a
throwIO (String -> IOError
userError (String -> IOError) -> String -> IOError
forall a b. (a -> b) -> a -> b
$ String
"bidirectionalExperiment: " String -> ShowS
forall a. [a] -> [a] -> [a]
++ DisconnectionException (HandlerError UnversionedProtocol) -> String
forall a. Show a => a -> String
show DisconnectionException (HandlerError UnversionedProtocol)
err)
))
pure $
foldr
(\ (Either SomeException (TemperatureBundle [resp])
r, TemperatureBundle [resp]
expected) Property
acc ->
case Either SomeException (TemperatureBundle [resp])
r of
Left SomeException
err -> String -> Bool -> Property
forall prop. Testable prop => String -> prop -> Property
counterexample (SomeException -> String
forall a. Show a => a -> String
show SomeException
err) Bool
False
Right TemperatureBundle [resp]
a -> TemperatureBundle [resp]
a TemperatureBundle [resp] -> TemperatureBundle [resp] -> Property
forall a. (Eq a, Show a) => a -> a -> Property
=== TemperatureBundle [resp]
expected Property -> Property -> Property
forall prop1 prop2.
(Testable prop1, Testable prop2) =>
prop1 -> prop2 -> Property
.&&. Property
acc)
(property True)
(zip rs0 (expectedResult clientAndServerData0 clientAndServerData1))
.&&.
foldr
(\ (Either SomeException (TemperatureBundle [resp])
r, TemperatureBundle [resp]
expected) Property
acc ->
case Either SomeException (TemperatureBundle [resp])
r of
Left SomeException
err -> String -> Bool -> Property
forall prop. Testable prop => String -> prop -> Property
counterexample (SomeException -> String
forall a. Show a => a -> String
show SomeException
err) Bool
False
Right TemperatureBundle [resp]
a -> TemperatureBundle [resp]
a TemperatureBundle [resp] -> TemperatureBundle [resp] -> Property
forall a. (Eq a, Show a) => a -> a -> Property
=== TemperatureBundle [resp]
expected Property -> Property -> Property
forall prop1 prop2.
(Testable prop1, Testable prop2) =>
prop1 -> prop2 -> Property
.&&. Property
acc)
(property True)
(zip rs1 (expectedResult clientAndServerData1 clientAndServerData0))
))
withLock :: ( MonadSTM m
, MonadThrow m
)
=> Bool
-> StrictTMVar m ()
-> m a
-> m a
withLock :: forall (m :: * -> *) a.
(MonadSTM m, MonadThrow m) =>
Bool -> StrictTMVar m () -> m a -> m a
withLock Bool
False StrictTMVar m ()
_v m a
m = m a
m
withLock Bool
True StrictTMVar m ()
v m a
m =
m () -> (() -> m ()) -> (() -> m a) -> m a
forall a b c. m a -> (a -> m b) -> (a -> m c) -> m c
forall (m :: * -> *) a b c.
MonadThrow m =>
m a -> (a -> m b) -> (a -> m c) -> m c
bracket (STM m () -> m ()
forall a. HasCallStack => STM m a -> m a
forall (m :: * -> *) a.
(MonadSTM m, HasCallStack) =>
STM m a -> m a
atomically (STM m () -> m ()) -> STM m () -> m ()
forall a b. (a -> b) -> a -> b
$ StrictTMVar m () -> STM m ()
forall (m :: * -> *) a. MonadSTM m => StrictTMVar m a -> STM m a
takeTMVar StrictTMVar m ()
v)
(STM m () -> m ()
forall a. HasCallStack => STM m a -> m a
forall (m :: * -> *) a.
(MonadSTM m, HasCallStack) =>
STM m a -> m a
atomically (STM m () -> m ()) -> (() -> STM m ()) -> () -> m ()
forall b c a. (b -> c) -> (a -> b) -> a -> c
. StrictTMVar m () -> () -> STM m ()
forall (m :: * -> *) a.
MonadSTM m =>
StrictTMVar m a -> a -> STM m ()
putTMVar StrictTMVar m ()
v)
(m a -> () -> m a
forall a b. a -> b -> a
const m a
m)