{-# LANGUAGE BlockArguments           #-}
{-# LANGUAGE CPP                      #-}
{-# LANGUAGE ConstraintKinds          #-}
{-# LANGUAGE DataKinds                #-}
{-# LANGUAGE DisambiguateRecordFields #-}
{-# LANGUAGE FlexibleContexts         #-}
{-# LANGUAGE GADTs                    #-}
{-# LANGUAGE LambdaCase               #-}
{-# LANGUAGE NamedFieldPuns           #-}
{-# LANGUAGE PolyKinds                #-}
{-# LANGUAGE RankNTypes               #-}
{-# LANGUAGE ScopedTypeVariables      #-}
{-# LANGUAGE TupleSections            #-}
{-# LANGUAGE TypeApplications         #-}
{-# LANGUAGE TypeOperators            #-}

-- for 'debugTracer'
{-# OPTIONS_GHC -Wno-redundant-constraints #-}
#if __GLASGOW_HASKELL__ >= 908
{-# OPTIONS_GHC -Wno-x-partial #-}
#endif

-- | This module contains experiments which can be executed either in `IO` or
-- in `IOSim`.
--
module Test.Ouroboros.Network.ConnectionManager.Experiments
  ( ClientAndServerData (..)
  , unidirectionalExperiment
  , bidirectionalExperiment
  , ConnectionManagerMonad
  , withInitiatorOnlyConnectionManager
  , withBidirectionalConnectionManager
  , runInitiatorProtocols
  , oneshotNextRequests
  , WithNameAndBearer
  ) where

import Control.Applicative (Alternative)
import Control.Concurrent.Class.MonadSTM.Strict
import Control.Exception (AssertionFailed)
import Control.Monad (replicateM, (>=>))
import Control.Monad.Class.MonadAsync
import Control.Monad.Class.MonadFork
import Control.Monad.Class.MonadSay
import Control.Monad.Class.MonadST (MonadST)
import Control.Monad.Class.MonadThrow
import Control.Monad.Class.MonadTime.SI
import Control.Monad.Class.MonadTimer.SI
import Control.Monad.Fix (MonadFix)
import Control.Tracer (Tracer (..), contramap, nullTracer)

import Codec.Serialise.Class (Serialise)
import Data.ByteString.Lazy (ByteString)
import Data.ByteString.Lazy qualified as LBS
import Data.Functor (($>), (<&>))
import Data.Functor.Compose
import Data.Hashable
import Data.List (mapAccumL)
import Data.List.NonEmpty (NonEmpty (..))
import Data.Proxy (Proxy (..))
import Data.Typeable (Typeable)
import Data.Void (Void)

import System.Random (StdGen)
import System.Random qualified as Random

import Test.QuickCheck

import Codec.CBOR.Term (Term)

import Network.Mux qualified as Mx
import Network.TypedProtocol.Core
import Network.TypedProtocol.Peer.Client

import Network.TypedProtocol.ReqResp.Client
import Network.TypedProtocol.ReqResp.Codec.CBOR
import Network.TypedProtocol.ReqResp.Examples
import Network.TypedProtocol.ReqResp.Server
import Network.TypedProtocol.ReqResp.Type as ReqResp

import Ouroboros.Network.ConnectionHandler
import Ouroboros.Network.ConnectionId
import Ouroboros.Network.ConnectionManager.Core qualified as CM
import Ouroboros.Network.ConnectionManager.State qualified as CM
import Ouroboros.Network.ConnectionManager.Types
import Ouroboros.Network.Context
import Ouroboros.Network.ControlMessage
import Ouroboros.Network.Driver.Limits
import Ouroboros.Network.InboundGovernor qualified as InboundGovernor
import Ouroboros.Network.InboundGovernor.InformationChannel
           (newInformationChannel)
import Ouroboros.Network.Mux
import Ouroboros.Network.MuxMode
import Ouroboros.Network.NodeToNode.Version (DiffusionMode (..))
import Ouroboros.Network.Protocol.Handshake
import Ouroboros.Network.Protocol.Handshake.Unversioned
import Ouroboros.Network.RethrowPolicy
import Ouroboros.Network.Server (RemoteTransitionTrace)
import Ouroboros.Network.Server qualified as Server
import Ouroboros.Network.Server.RateLimiting (AcceptedConnectionsLimit (..))
import Ouroboros.Network.Snocket (Snocket)
import Ouroboros.Network.Snocket qualified as Snocket

import Test.Ouroboros.Network.ConnectionManager.Timeouts
import Test.Ouroboros.Network.Orphans ()
import Test.Ouroboros.Network.Utils (WithName (..))


--
-- Server tests
--

-- | The protocol will run three instances of  `ReqResp` protocol; one for each
-- state: warm, hot and established.
--
data ClientAndServerData req = ClientAndServerData {
    forall req. ClientAndServerData req -> req
accumulatorInit              :: req,
    -- ^ Initial value. In for each request the server sends back a list received requests (in
    --   reverse order) terminating with the accumulatorInit.
    forall req. ClientAndServerData req -> [[req]]
hotInitiatorRequests         :: [[req]],
    -- ^ list of requests run by the hot initiator in each round; Running
    -- multiple rounds allows us to test restarting of responders.
    forall req. ClientAndServerData req -> [[req]]
warmInitiatorRequests        :: [[req]],
    -- ^ list of requests run by the warm initiator in each round
    forall req. ClientAndServerData req -> [[req]]
establishedInitiatorRequests :: [[req]]
    -- ^ list of requests run by the established initiator in each round
  }
  deriving Int -> ClientAndServerData req -> ShowS
[ClientAndServerData req] -> ShowS
ClientAndServerData req -> String
(Int -> ClientAndServerData req -> ShowS)
-> (ClientAndServerData req -> String)
-> ([ClientAndServerData req] -> ShowS)
-> Show (ClientAndServerData req)
forall req. Show req => Int -> ClientAndServerData req -> ShowS
forall req. Show req => [ClientAndServerData req] -> ShowS
forall req. Show req => ClientAndServerData req -> String
forall a.
(Int -> a -> ShowS) -> (a -> String) -> ([a] -> ShowS) -> Show a
$cshowsPrec :: forall req. Show req => Int -> ClientAndServerData req -> ShowS
showsPrec :: Int -> ClientAndServerData req -> ShowS
$cshow :: forall req. Show req => ClientAndServerData req -> String
show :: ClientAndServerData req -> String
$cshowList :: forall req. Show req => [ClientAndServerData req] -> ShowS
showList :: [ClientAndServerData req] -> ShowS
Show


-- Number of rounds to exhaust all the requests.
--
numberOfRounds :: ClientAndServerData req ->  Int
numberOfRounds :: forall req. ClientAndServerData req -> Int
numberOfRounds ClientAndServerData {
                  [[req]]
hotInitiatorRequests :: forall req. ClientAndServerData req -> [[req]]
hotInitiatorRequests :: [[req]]
hotInitiatorRequests,
                  [[req]]
warmInitiatorRequests :: forall req. ClientAndServerData req -> [[req]]
warmInitiatorRequests :: [[req]]
warmInitiatorRequests,
                  [[req]]
establishedInitiatorRequests :: forall req. ClientAndServerData req -> [[req]]
establishedInitiatorRequests :: [[req]]
establishedInitiatorRequests
                } =
    [[req]] -> Int
forall a. [a] -> Int
forall (t :: * -> *) a. Foldable t => t a -> Int
length [[req]]
hotInitiatorRequests
    Int -> Int -> Int
forall a. Ord a => a -> a -> a
`max`
    [[req]] -> Int
forall a. [a] -> Int
forall (t :: * -> *) a. Foldable t => t a -> Int
length [[req]]
warmInitiatorRequests
    Int -> Int -> Int
forall a. Ord a => a -> a -> a
`max`
    [[req]] -> Int
forall a. [a] -> Int
forall (t :: * -> *) a. Foldable t => t a -> Int
length [[req]]
establishedInitiatorRequests


-- | We use it to generate a list of messages for a list of rounds.  In each
-- round we connect to the same server, and run 'ReqResp' protocol.
--
arbitraryList :: Arbitrary a =>  Gen [[a]]
arbitraryList :: forall a. Arbitrary a => Gen [[a]]
arbitraryList =
    Int -> Gen [[a]] -> Gen [[a]]
forall a. HasCallStack => Int -> Gen a -> Gen a
resize Int
3 (Gen [a] -> Gen [[a]]
forall a. Gen a -> Gen [a]
listOf (Int -> Gen [a] -> Gen [a]
forall a. HasCallStack => Int -> Gen a -> Gen a
resize Int
3 (Gen a -> Gen [a]
forall a. Gen a -> Gen [a]
listOf (Int -> Gen a -> Gen a
forall a. HasCallStack => Int -> Gen a -> Gen a
resize Int
100 Gen a
forall a. Arbitrary a => Gen a
arbitrary))))

instance Arbitrary req => Arbitrary (ClientAndServerData req) where
    arbitrary :: Gen (ClientAndServerData req)
arbitrary =
      req -> [[req]] -> [[req]] -> [[req]] -> ClientAndServerData req
forall req.
req -> [[req]] -> [[req]] -> [[req]] -> ClientAndServerData req
ClientAndServerData (req -> [[req]] -> [[req]] -> [[req]] -> ClientAndServerData req)
-> Gen req
-> Gen ([[req]] -> [[req]] -> [[req]] -> ClientAndServerData req)
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> Gen req
forall a. Arbitrary a => Gen a
arbitrary
                          Gen ([[req]] -> [[req]] -> [[req]] -> ClientAndServerData req)
-> Gen [[req]]
-> Gen ([[req]] -> [[req]] -> ClientAndServerData req)
forall a b. Gen (a -> b) -> Gen a -> Gen b
forall (f :: * -> *) a b. Applicative f => f (a -> b) -> f a -> f b
<*> Gen [[req]]
forall a. Arbitrary a => Gen [[a]]
arbitraryList
                          Gen ([[req]] -> [[req]] -> ClientAndServerData req)
-> Gen [[req]] -> Gen ([[req]] -> ClientAndServerData req)
forall a b. Gen (a -> b) -> Gen a -> Gen b
forall (f :: * -> *) a b. Applicative f => f (a -> b) -> f a -> f b
<*> Gen [[req]]
forall a. Arbitrary a => Gen [[a]]
arbitraryList
                          Gen ([[req]] -> ClientAndServerData req)
-> Gen [[req]] -> Gen (ClientAndServerData req)
forall a b. Gen (a -> b) -> Gen a -> Gen b
forall (f :: * -> *) a b. Applicative f => f (a -> b) -> f a -> f b
<*> Gen [[req]]
forall a. Arbitrary a => Gen [[a]]
arbitraryList

    shrink :: ClientAndServerData req -> [ClientAndServerData req]
shrink (ClientAndServerData req
ini [[req]]
hot [[req]]
warm [[req]]
est) = [[ClientAndServerData req]] -> [ClientAndServerData req]
forall (t :: * -> *) a. Foldable t => t [a] -> [a]
concat
      [ req -> [req]
forall a. Arbitrary a => a -> [a]
shrink req
ini  [req]
-> (req -> ClientAndServerData req) -> [ClientAndServerData req]
forall (f :: * -> *) a b. Functor f => f a -> (a -> b) -> f b
<&> \ req
ini'  -> req -> [[req]] -> [[req]] -> [[req]] -> ClientAndServerData req
forall req.
req -> [[req]] -> [[req]] -> [[req]] -> ClientAndServerData req
ClientAndServerData req
ini' [[req]]
hot  [[req]]
warm  [[req]]
est
      , [[req]] -> [[[req]]]
forall a. Arbitrary a => a -> [a]
shrink [[req]]
hot  [[[req]]]
-> ([[req]] -> ClientAndServerData req)
-> [ClientAndServerData req]
forall (f :: * -> *) a b. Functor f => f a -> (a -> b) -> f b
<&> \ [[req]]
hot'  -> req -> [[req]] -> [[req]] -> [[req]] -> ClientAndServerData req
forall req.
req -> [[req]] -> [[req]] -> [[req]] -> ClientAndServerData req
ClientAndServerData req
ini  [[req]]
hot' [[req]]
warm  [[req]]
est
      , [[req]] -> [[[req]]]
forall a. Arbitrary a => a -> [a]
shrink [[req]]
warm [[[req]]]
-> ([[req]] -> ClientAndServerData req)
-> [ClientAndServerData req]
forall (f :: * -> *) a b. Functor f => f a -> (a -> b) -> f b
<&> \ [[req]]
warm' -> req -> [[req]] -> [[req]] -> [[req]] -> ClientAndServerData req
forall req.
req -> [[req]] -> [[req]] -> [[req]] -> ClientAndServerData req
ClientAndServerData req
ini  [[req]]
hot  [[req]]
warm' [[req]]
est
      , [[req]] -> [[[req]]]
forall a. Arbitrary a => a -> [a]
shrink [[req]]
est  [[[req]]]
-> ([[req]] -> ClientAndServerData req)
-> [ClientAndServerData req]
forall (f :: * -> *) a b. Functor f => f a -> (a -> b) -> f b
<&> \ [[req]]
est'  -> req -> [[req]] -> [[req]] -> [[req]] -> ClientAndServerData req
forall req.
req -> [[req]] -> [[req]] -> [[req]] -> ClientAndServerData req
ClientAndServerData req
ini  [[req]]
hot  [[req]]
warm  [[req]]
est'
      ]

expectedResult :: ClientAndServerData req
               -> ClientAndServerData req
               -> [TemperatureBundle [[req]]]
expectedResult :: forall req.
ClientAndServerData req
-> ClientAndServerData req -> [TemperatureBundle [[req]]]
expectedResult client :: ClientAndServerData req
client@ClientAndServerData
                                   { [[req]]
hotInitiatorRequests :: forall req. ClientAndServerData req -> [[req]]
hotInitiatorRequests :: [[req]]
hotInitiatorRequests
                                   , [[req]]
warmInitiatorRequests :: forall req. ClientAndServerData req -> [[req]]
warmInitiatorRequests :: [[req]]
warmInitiatorRequests
                                   , [[req]]
establishedInitiatorRequests :: forall req. ClientAndServerData req -> [[req]]
establishedInitiatorRequests :: [[req]]
establishedInitiatorRequests
                                   }
               ClientAndServerData { req
accumulatorInit :: forall req. ClientAndServerData req -> req
accumulatorInit :: req
accumulatorInit
                                   } =
    [[req]] -> [[req]] -> [[req]] -> [TemperatureBundle [[req]]]
go
      (Int -> [[req]] -> [[req]]
forall a. Int -> [a] -> [a]
take Int
rounds ([[req]] -> [[req]]) -> [[req]] -> [[req]]
forall a b. (a -> b) -> a -> b
$ [[req]]
hotInitiatorRequests         [[req]] -> [[req]] -> [[req]]
forall a. [a] -> [a] -> [a]
++ [req] -> [[req]]
forall a. a -> [a]
repeat [])
      (Int -> [[req]] -> [[req]]
forall a. Int -> [a] -> [a]
take Int
rounds ([[req]] -> [[req]]) -> [[req]] -> [[req]]
forall a b. (a -> b) -> a -> b
$ [[req]]
warmInitiatorRequests        [[req]] -> [[req]] -> [[req]]
forall a. [a] -> [a] -> [a]
++ [req] -> [[req]]
forall a. a -> [a]
repeat [])
      (Int -> [[req]] -> [[req]]
forall a. Int -> [a] -> [a]
take Int
rounds ([[req]] -> [[req]]) -> [[req]] -> [[req]]
forall a b. (a -> b) -> a -> b
$ [[req]]
establishedInitiatorRequests [[req]] -> [[req]] -> [[req]]
forall a. [a] -> [a] -> [a]
++ [req] -> [[req]]
forall a. a -> [a]
repeat [])
  where
    rounds :: Int
rounds = ClientAndServerData req -> Int
forall req. ClientAndServerData req -> Int
numberOfRounds ClientAndServerData req
client
    fn :: [a] -> a -> ([a], [a])
fn [a]
acc a
x = (a
x a -> [a] -> [a]
forall a. a -> [a] -> [a]
: [a]
acc, a
x a -> [a] -> [a]
forall a. a -> [a] -> [a]
: [a]
acc)
    go :: [[req]] -> [[req]] -> [[req]] -> [TemperatureBundle [[req]]]
go ([req]
a : [[req]]
as) ([req]
b : [[req]]
bs) ([req]
c : [[req]]
cs) =
      WithProtocolTemperature 'Hot [[req]]
-> WithProtocolTemperature 'Warm [[req]]
-> WithProtocolTemperature 'Established [[req]]
-> TemperatureBundle [[req]]
forall a.
WithProtocolTemperature 'Hot a
-> WithProtocolTemperature 'Warm a
-> WithProtocolTemperature 'Established a
-> TemperatureBundle a
TemperatureBundle
        ([[req]] -> WithProtocolTemperature 'Hot [[req]]
forall a. a -> WithProtocolTemperature 'Hot a
WithHot         (([req], [[req]]) -> [[req]]
forall a b. (a, b) -> b
snd (([req], [[req]]) -> [[req]]) -> ([req], [[req]]) -> [[req]]
forall a b. (a -> b) -> a -> b
$ ([req] -> req -> ([req], [req]))
-> [req] -> [req] -> ([req], [[req]])
forall (t :: * -> *) s a b.
Traversable t =>
(s -> a -> (s, b)) -> s -> t a -> (s, t b)
mapAccumL [req] -> req -> ([req], [req])
forall {a}. [a] -> a -> ([a], [a])
fn [req
accumulatorInit] [req]
a))
        ([[req]] -> WithProtocolTemperature 'Warm [[req]]
forall a. a -> WithProtocolTemperature 'Warm a
WithWarm        (([req], [[req]]) -> [[req]]
forall a b. (a, b) -> b
snd (([req], [[req]]) -> [[req]]) -> ([req], [[req]]) -> [[req]]
forall a b. (a -> b) -> a -> b
$ ([req] -> req -> ([req], [req]))
-> [req] -> [req] -> ([req], [[req]])
forall (t :: * -> *) s a b.
Traversable t =>
(s -> a -> (s, b)) -> s -> t a -> (s, t b)
mapAccumL [req] -> req -> ([req], [req])
forall {a}. [a] -> a -> ([a], [a])
fn [req
accumulatorInit] [req]
b))
        ([[req]] -> WithProtocolTemperature 'Established [[req]]
forall a. a -> WithProtocolTemperature 'Established a
WithEstablished (([req], [[req]]) -> [[req]]
forall a b. (a, b) -> b
snd (([req], [[req]]) -> [[req]]) -> ([req], [[req]]) -> [[req]]
forall a b. (a -> b) -> a -> b
$ ([req] -> req -> ([req], [req]))
-> [req] -> [req] -> ([req], [[req]])
forall (t :: * -> *) s a b.
Traversable t =>
(s -> a -> (s, b)) -> s -> t a -> (s, t b)
mapAccumL [req] -> req -> ([req], [req])
forall {a}. [a] -> a -> ([a], [a])
fn [req
accumulatorInit] [req]
c))
      TemperatureBundle [[req]]
-> [TemperatureBundle [[req]]] -> [TemperatureBundle [[req]]]
forall a. a -> [a] -> [a]
: [[req]] -> [[req]] -> [[req]] -> [TemperatureBundle [[req]]]
go [[req]]
as [[req]]
bs [[req]]
cs
    go [] [] [] = []
    go [[req]]
_  [[req]]
_  [[req]]
_  = String -> [TemperatureBundle [[req]]]
forall a. HasCallStack => String -> a
error String
"expectedResult: impossible happened"

noNextRequests :: forall stm req peerAddr. Applicative stm => TemperatureBundle (ConnectionId peerAddr -> stm [req])
noNextRequests :: forall (stm :: * -> *) req peerAddr.
Applicative stm =>
TemperatureBundle (ConnectionId peerAddr -> stm [req])
noNextRequests = (ConnectionId peerAddr -> stm [req])
-> TemperatureBundle (ConnectionId peerAddr -> stm [req])
forall a. a -> TemperatureBundle a
forall (f :: * -> *) a. Applicative f => a -> f a
pure ((ConnectionId peerAddr -> stm [req])
 -> TemperatureBundle (ConnectionId peerAddr -> stm [req]))
-> (ConnectionId peerAddr -> stm [req])
-> TemperatureBundle (ConnectionId peerAddr -> stm [req])
forall a b. (a -> b) -> a -> b
$ \ConnectionId peerAddr
_ -> [req] -> stm [req]
forall a. a -> stm a
forall (f :: * -> *) a. Applicative f => a -> f a
pure []

-- | Next requests bundle for bidirectional and unidirectional experiments.
oneshotNextRequests
  :: forall req peerAddr m. MonadSTM m
  => ClientAndServerData req
  -> m (TemperatureBundle (ConnectionId peerAddr -> STM m [req]))
oneshotNextRequests :: forall req peerAddr (m :: * -> *).
MonadSTM m =>
ClientAndServerData req
-> m (TemperatureBundle (ConnectionId peerAddr -> STM m [req]))
oneshotNextRequests ClientAndServerData {
                      [[req]]
hotInitiatorRequests :: forall req. ClientAndServerData req -> [[req]]
hotInitiatorRequests :: [[req]]
hotInitiatorRequests,
                      [[req]]
warmInitiatorRequests :: forall req. ClientAndServerData req -> [[req]]
warmInitiatorRequests :: [[req]]
warmInitiatorRequests,
                      [[req]]
establishedInitiatorRequests :: forall req. ClientAndServerData req -> [[req]]
establishedInitiatorRequests :: [[req]]
establishedInitiatorRequests
                    } = do
    -- we pass a `StrictTVar` with all the requests to each initiator.  This way
    -- the each round (which runs a single instance of `ReqResp` protocol) will
    -- use its own request list.
    hotRequestsVar         <- [[req]] -> m (StrictTVar m [[req]])
forall (m :: * -> *) a. MonadSTM m => a -> m (StrictTVar m a)
newTVarIO [[req]]
hotInitiatorRequests
    warmRequestsVar        <- newTVarIO warmInitiatorRequests
    establishedRequestsVar <- newTVarIO establishedInitiatorRequests
    return $ TemperatureBundle
               (WithHot hotRequestsVar)
               (WithWarm warmRequestsVar)
               (WithEstablished establishedRequestsVar)
              <&> \ StrictTVar m [[req]]
reqVar ConnectionId peerAddr
_ -> StrictTVar m [[req]] -> STM m [req]
forall {m :: * -> *} {a}.
MonadSTM m =>
StrictTVar m [[a]] -> STM m [a]
popRequests StrictTVar m [[req]]
reqVar
  where
    popRequests :: StrictTVar m [[a]] -> STM m [a]
popRequests StrictTVar m [[a]]
requestsVar = do
      requests <- StrictTVar m [[a]] -> STM m [[a]]
forall (m :: * -> *) a. MonadSTM m => StrictTVar m a -> STM m a
readTVar StrictTVar m [[a]]
requestsVar
      case requests of
        [a]
reqs : [[a]]
rest -> StrictTVar m [[a]] -> [[a]] -> STM m ()
forall (m :: * -> *) a.
MonadSTM m =>
StrictTVar m a -> a -> STM m ()
writeTVar StrictTVar m [[a]]
requestsVar [[a]]
rest STM m () -> [a] -> STM m [a]
forall (f :: * -> *) a b. Functor f => f a -> b -> f b
$> [a]
reqs
        []          -> [a] -> STM m [a]
forall a. a -> STM m a
forall (f :: * -> *) a. Applicative f => a -> f a
pure []


--
-- Various ConnectionManagers
--

type ConnectionManagerMonad m =
       ( Alternative (STM m), MonadAsync m, MonadCatch m, MonadEvaluate m,
         MonadFork m, MonadMask  m, MonadST m, MonadTime m, MonadTimer m,
         MonadThrow m, MonadThrow (STM m)
       )


withInitiatorOnlyConnectionManager
    :: forall name peerAddr socket req resp m a.
       ( ConnectionManagerMonad m

       , resp ~ [req]
       , Ord peerAddr, Show peerAddr, Typeable peerAddr
       , Serialise req, Typeable req
       , MonadAsync m
       , MonadDelay m
       , MonadFix m
       , MonadLabelledSTM m
       , MonadTraceSTM m
       , MonadSay m, Show req
       , Show name
       )
    => name
    -- ^ identifier (for logging)
    -> Timeouts
    -> Tracer m (WithName name (AbstractTransitionTrace CM.ConnStateId))
    -> Tracer m (WithName name
                          (CM.Trace
                            peerAddr
                            (ConnectionHandlerTrace UnversionedProtocol DataFlowProtocolData)))
    -> StdGen
    -> Snocket m socket peerAddr
    -> Mx.MakeBearer m socket
    -- ^ series of request possible to do with the bidirectional connection
    -- manager towards some peer.
    -> CM.ConnStateIdSupply m
    -> Maybe peerAddr
    -> TemperatureBundle (ConnectionId peerAddr -> STM m [req])
    -- ^ Functions to get the next requests for a given connection
    -> ProtocolTimeLimits (Handshake UnversionedProtocol Term)
    -- ^ Handshake time limits
    -> AcceptedConnectionsLimit
    -> (ConnectionManagerWithExpandedCtx
          Mx.InitiatorMode socket peerAddr
          DataFlowProtocolData UnversionedProtocol ByteString m [resp] Void
       -> m a)
    -> m a
withInitiatorOnlyConnectionManager :: forall name peerAddr socket req resp (m :: * -> *) a.
(ConnectionManagerMonad m, resp ~ [req], Ord peerAddr,
 Show peerAddr, Typeable peerAddr, Serialise req, Typeable req,
 MonadAsync m, MonadDelay m, MonadFix m, MonadLabelledSTM m,
 MonadTraceSTM m, MonadSay m, Show req, Show name) =>
name
-> Timeouts
-> Tracer m (WithName name (AbstractTransitionTrace ConnStateId))
-> Tracer
     m
     (WithName
        name
        (Trace
           peerAddr
           (ConnectionHandlerTrace UnversionedProtocol DataFlowProtocolData)))
-> StdGen
-> Snocket m socket peerAddr
-> MakeBearer m socket
-> ConnStateIdSupply m
-> Maybe peerAddr
-> TemperatureBundle (ConnectionId peerAddr -> STM m [req])
-> ProtocolTimeLimits (Handshake UnversionedProtocol Term)
-> AcceptedConnectionsLimit
-> (ConnectionManagerWithExpandedCtx
      'InitiatorMode
      socket
      peerAddr
      DataFlowProtocolData
      UnversionedProtocol
      ByteString
      m
      [resp]
      Void
    -> m a)
-> m a
withInitiatorOnlyConnectionManager name
name Timeouts
timeouts Tracer m (WithName name (AbstractTransitionTrace ConnStateId))
trTracer Tracer
  m
  (WithName
     name
     (Trace
        peerAddr
        (ConnectionHandlerTrace UnversionedProtocol DataFlowProtocolData)))
tracer StdGen
stdGen Snocket m socket peerAddr
snocket MakeBearer m socket
makeBearer ConnStateIdSupply m
connStateIdSupply
                                   Maybe peerAddr
localAddr TemperatureBundle (ConnectionId peerAddr -> STM m [req])
nextRequests ProtocolTimeLimits (Handshake UnversionedProtocol Term)
handshakeTimeLimits AcceptedConnectionsLimit
acceptedConnLimit ConnectionManagerWithExpandedCtx
  'InitiatorMode
  socket
  peerAddr
  DataFlowProtocolData
  UnversionedProtocol
  ByteString
  m
  [resp]
  Void
-> m a
k = do
  mainThreadId <- m (ThreadId m)
forall (m :: * -> *). MonadThread m => m (ThreadId m)
myThreadId
  let muxTracers :: Mx.TracersWithBearer (ConnectionId peerAddr) m
      muxTracers = Mx.Tracers {
        tracer :: Tracer m (WithBearer (ConnectionId peerAddr) Trace)
Mx.tracer        = name
-> WithBearer (ConnectionId peerAddr) Trace
-> WithName name (WithBearer (ConnectionId peerAddr) Trace)
forall name event. name -> event -> WithName name event
WithName name
name (WithBearer (ConnectionId peerAddr) Trace
 -> WithName name (WithBearer (ConnectionId peerAddr) Trace))
-> Tracer
     m (WithName name (WithBearer (ConnectionId peerAddr) Trace))
-> Tracer m (WithBearer (ConnectionId peerAddr) Trace)
forall a' a. (a' -> a) -> Tracer m a -> Tracer m a'
forall (f :: * -> *) a' a.
Contravariant f =>
(a' -> a) -> f a -> f a'
`contramap` Tracer m (WithName name (WithBearer (ConnectionId peerAddr) Trace))
forall (m :: * -> *) a. Applicative m => Tracer m a
nullTracer,
        channelTracer :: Tracer m (WithBearer (ConnectionId peerAddr) ChannelTrace)
Mx.channelTracer = name
-> WithBearer (ConnectionId peerAddr) ChannelTrace
-> WithName name (WithBearer (ConnectionId peerAddr) ChannelTrace)
forall name event. name -> event -> WithName name event
WithName name
name (WithBearer (ConnectionId peerAddr) ChannelTrace
 -> WithName name (WithBearer (ConnectionId peerAddr) ChannelTrace))
-> Tracer
     m (WithName name (WithBearer (ConnectionId peerAddr) ChannelTrace))
-> Tracer m (WithBearer (ConnectionId peerAddr) ChannelTrace)
forall a' a. (a' -> a) -> Tracer m a -> Tracer m a'
forall (f :: * -> *) a' a.
Contravariant f =>
(a' -> a) -> f a -> f a'
`contramap` Tracer
  m (WithName name (WithBearer (ConnectionId peerAddr) ChannelTrace))
forall (m :: * -> *) a. Applicative m => Tracer m a
nullTracer,
        bearerTracer :: Tracer m (WithBearer (ConnectionId peerAddr) BearerTrace)
Mx.bearerTracer  = name
-> WithBearer (ConnectionId peerAddr) BearerTrace
-> WithName name (WithBearer (ConnectionId peerAddr) BearerTrace)
forall name event. name -> event -> WithName name event
WithName name
name (WithBearer (ConnectionId peerAddr) BearerTrace
 -> WithName name (WithBearer (ConnectionId peerAddr) BearerTrace))
-> Tracer
     m (WithName name (WithBearer (ConnectionId peerAddr) BearerTrace))
-> Tracer m (WithBearer (ConnectionId peerAddr) BearerTrace)
forall a' a. (a' -> a) -> Tracer m a -> Tracer m a'
forall (f :: * -> *) a' a.
Contravariant f =>
(a' -> a) -> f a -> f a'
`contramap` Tracer
  m (WithName name (WithBearer (ConnectionId peerAddr) BearerTrace))
forall (m :: * -> *) a. Applicative m => Tracer m a
nullTracer
      }
      mkConnectionHandler =
        TracersWithBearer (ConnectionId peerAddr) m
-> ForkPolicy peerAddr
-> HandshakeArguments
     (ConnectionId peerAddr) UnversionedProtocol DataFlowProtocolData m
-> Versions
     UnversionedProtocol
     DataFlowProtocolData
     (OuroborosBundle
        'InitiatorMode
        (ExpandedInitiatorContext peerAddr m)
        (ResponderContext peerAddr)
        ByteString
        m
        [resp]
        Void)
-> (ThreadId m, RethrowPolicy)
-> MkMuxConnectionHandler
     'InitiatorMode
     socket
     (ExpandedInitiatorContext peerAddr m)
     (ResponderContext peerAddr)
     peerAddr
     UnversionedProtocol
     DataFlowProtocolData
     ByteString
     m
     [resp]
     Void
-> MuxConnectionHandler
     'InitiatorMode
     socket
     (ExpandedInitiatorContext peerAddr m)
     (ResponderContext peerAddr)
     peerAddr
     UnversionedProtocol
     DataFlowProtocolData
     ByteString
     m
     [resp]
     Void
forall initiatorCtx responderCtx peerAddr (muxMode :: Mode) socket
       versionNumber versionData (m :: * -> *) a b.
(Alternative (STM m), MonadAsync m, MonadDelay m, MonadFork m,
 MonadLabelledSTM m, MonadThrow (STM m), MonadTimer m, MonadMask m,
 Ord versionNumber, Show peerAddr, Typeable peerAddr) =>
TracersWithBearer (ConnectionId peerAddr) m
-> ForkPolicy peerAddr
-> HandshakeArguments
     (ConnectionId peerAddr) versionNumber versionData m
-> Versions
     versionNumber
     versionData
     (OuroborosBundle
        muxMode initiatorCtx responderCtx ByteString m a b)
-> (ThreadId m, RethrowPolicy)
-> MkMuxConnectionHandler
     muxMode
     socket
     initiatorCtx
     responderCtx
     peerAddr
     versionNumber
     versionData
     ByteString
     m
     a
     b
-> MuxConnectionHandler
     muxMode
     socket
     initiatorCtx
     responderCtx
     peerAddr
     versionNumber
     versionData
     ByteString
     m
     a
     b
makeConnectionHandler
          TracersWithBearer (ConnectionId peerAddr) m
muxTracers
          ForkPolicy peerAddr
forall peerAddr. ForkPolicy peerAddr
noBindForkPolicy
          HandshakeArguments {
              -- TraceSendRecv
              haHandshakeTracer :: Tracer
  m
  (WithBearer
     (ConnectionId peerAddr)
     (TraceSendRecv (Handshake UnversionedProtocol Term)))
haHandshakeTracer = name
-> WithBearer
     (ConnectionId peerAddr)
     (TraceSendRecv (Handshake UnversionedProtocol Term))
-> WithName
     name
     (WithBearer
        (ConnectionId peerAddr)
        (TraceSendRecv (Handshake UnversionedProtocol Term)))
forall name event. name -> event -> WithName name event
WithName name
name (WithBearer
   (ConnectionId peerAddr)
   (TraceSendRecv (Handshake UnversionedProtocol Term))
 -> WithName
      name
      (WithBearer
         (ConnectionId peerAddr)
         (TraceSendRecv (Handshake UnversionedProtocol Term))))
-> Tracer
     m
     (WithName
        name
        (WithBearer
           (ConnectionId peerAddr)
           (TraceSendRecv (Handshake UnversionedProtocol Term))))
-> Tracer
     m
     (WithBearer
        (ConnectionId peerAddr)
        (TraceSendRecv (Handshake UnversionedProtocol Term)))
forall a' a. (a' -> a) -> Tracer m a -> Tracer m a'
forall (f :: * -> *) a' a.
Contravariant f =>
(a' -> a) -> f a -> f a'
`contramap` Tracer
  m
  (WithName
     name
     (WithBearer
        (ConnectionId peerAddr)
        (TraceSendRecv (Handshake UnversionedProtocol Term))))
forall (m :: * -> *) a. Applicative m => Tracer m a
nullTracer,
              haBearerTracer :: Tracer m (WithBearer (ConnectionId peerAddr) BearerTrace)
haBearerTracer = name
-> WithBearer (ConnectionId peerAddr) BearerTrace
-> WithName name (WithBearer (ConnectionId peerAddr) BearerTrace)
forall name event. name -> event -> WithName name event
WithName name
name (WithBearer (ConnectionId peerAddr) BearerTrace
 -> WithName name (WithBearer (ConnectionId peerAddr) BearerTrace))
-> Tracer
     m (WithName name (WithBearer (ConnectionId peerAddr) BearerTrace))
-> Tracer m (WithBearer (ConnectionId peerAddr) BearerTrace)
forall a' a. (a' -> a) -> Tracer m a -> Tracer m a'
forall (f :: * -> *) a' a.
Contravariant f =>
(a' -> a) -> f a -> f a'
`contramap` Tracer
  m (WithName name (WithBearer (ConnectionId peerAddr) BearerTrace))
forall (m :: * -> *) a. Applicative m => Tracer m a
nullTracer,
              haHandshakeCodec :: Codec
  (Handshake UnversionedProtocol Term)
  DeserialiseFailure
  m
  ByteString
haHandshakeCodec = Codec
  (Handshake UnversionedProtocol Term)
  DeserialiseFailure
  m
  ByteString
forall (m :: * -> *).
MonadST m =>
Codec
  (Handshake UnversionedProtocol Term)
  DeserialiseFailure
  m
  ByteString
unversionedHandshakeCodec,
              haVersionDataCodec :: VersionDataCodec Term UnversionedProtocol DataFlowProtocolData
haVersionDataCodec = (UnversionedProtocol -> CodecCBORTerm Text DataFlowProtocolData)
-> VersionDataCodec Term UnversionedProtocol DataFlowProtocolData
forall vNumber vData.
(vNumber -> CodecCBORTerm Text vData)
-> VersionDataCodec Term vNumber vData
cborTermVersionDataCodec UnversionedProtocol -> CodecCBORTerm Text DataFlowProtocolData
dataFlowProtocolDataCodec,
              haAcceptVersion :: DataFlowProtocolData
-> DataFlowProtocolData -> Accept DataFlowProtocolData
haAcceptVersion = DataFlowProtocolData
-> DataFlowProtocolData -> Accept DataFlowProtocolData
forall v. Acceptable v => v -> v -> Accept v
acceptableVersion,
              haQueryVersion :: DataFlowProtocolData -> Bool
haQueryVersion = DataFlowProtocolData -> Bool
forall v. Queryable v => v -> Bool
queryVersion,
              haTimeLimits :: ProtocolTimeLimits (Handshake UnversionedProtocol Term)
haTimeLimits = ProtocolTimeLimits (Handshake UnversionedProtocol Term)
handshakeTimeLimits
            }
          (DataFlow
-> OuroborosBundle
     'InitiatorMode
     (ExpandedInitiatorContext peerAddr m)
     (ResponderContext peerAddr)
     ByteString
     m
     [resp]
     Void
-> Versions
     UnversionedProtocol
     DataFlowProtocolData
     (OuroborosBundle
        'InitiatorMode
        (ExpandedInitiatorContext peerAddr m)
        (ResponderContext peerAddr)
        ByteString
        m
        [resp]
        Void)
forall app.
DataFlow
-> app -> Versions UnversionedProtocol DataFlowProtocolData app
dataFlowProtocol DataFlow
Unidirectional OuroborosBundle
  'InitiatorMode
  (ExpandedInitiatorContext peerAddr m)
  (ResponderContext peerAddr)
  ByteString
  m
  [resp]
  Void
clientApplication)
          (ThreadId m
mainThreadId,   RethrowPolicy
debugMuxErrorRethrowPolicy
                        RethrowPolicy -> RethrowPolicy -> RethrowPolicy
forall a. Semigroup a => a -> a -> a
<> RethrowPolicy
debugMuxRuntimeErrorRethrowPolicy
                        RethrowPolicy -> RethrowPolicy -> RethrowPolicy
forall a. Semigroup a => a -> a -> a
<> RethrowPolicy
debugIOErrorRethrowPolicy
                        RethrowPolicy -> RethrowPolicy -> RethrowPolicy
forall a. Semigroup a => a -> a -> a
<> RethrowPolicy
assertRethrowPolicy)
          MkMuxConnectionHandler
  'InitiatorMode
  socket
  (ExpandedInitiatorContext peerAddr m)
  (ResponderContext peerAddr)
  peerAddr
  UnversionedProtocol
  DataFlowProtocolData
  ByteString
  m
  [resp]
  Void
forall socket initiatorCtx responderCtx peerAddr versionNumber
       versionData bytes (m :: * -> *) a b.
MkMuxConnectionHandler
  'InitiatorMode
  socket
  initiatorCtx
  responderCtx
  peerAddr
  versionNumber
  versionData
  bytes
  m
  a
  b
MuxInitiatorConnectionHandler


  CM.with CM.Arguments {
    -- ConnectionManagerTrace
    tracer    = WithName name
                  `contramap` tracer,
    trTracer  = (WithName name . fmap CM.abstractState)
                  `contramap` trTracer,
    -- This is actually the low level bearer tracer
    ipv4Address  = localAddr,
    ipv6Address  = Nothing,
    addressType  = \peerAddr
_ -> AddressType -> Maybe AddressType
forall a. a -> Maybe a
Just AddressType
IPv4Address,
    snocket,
    makeBearer,
    withBuffer = \Maybe (ReadBuffer m) -> m ()
f -> Maybe (ReadBuffer m) -> m ()
f Maybe (ReadBuffer m)
forall a. Maybe a
Nothing,
    configureSocket = \socket
_ Maybe peerAddr
_ -> () -> m ()
forall a. a -> m a
forall (m :: * -> *) a. Monad m => a -> m a
return (),
    connectionDataFlow = \(DataFlowProtocolData DataFlow
df PeerSharing
_) -> DataFlow
df,
    timeWaitTimeout = tTimeWaitTimeout timeouts,
    outboundIdleTimeout = tOutboundIdleTimeout timeouts,
    prunePolicy = simplePrunePolicy,
    stdGen,
    connectionsLimits = acceptedConnLimit,
    updateVersionData = \DataFlowProtocolData
a DiffusionMode
_ -> DataFlowProtocolData
a,
    connStateIdSupply,
    classifyHandlerError = \HandlerError UnversionedProtocol
_ -> HandlerErrorType
HandshakeFailure
    }
    NotInResponderMode
    mkConnectionHandler
    \ConnectionManagerWithExpandedCtx
  'InitiatorMode
  socket
  peerAddr
  DataFlowProtocolData
  UnversionedProtocol
  ByteString
  m
  [resp]
  Void
cm ->
      ConnectionManagerWithExpandedCtx
  'InitiatorMode
  socket
  peerAddr
  DataFlowProtocolData
  UnversionedProtocol
  ByteString
  m
  [resp]
  Void
-> m a
k ConnectionManagerWithExpandedCtx
  'InitiatorMode
  socket
  peerAddr
  DataFlowProtocolData
  UnversionedProtocol
  ByteString
  m
  [resp]
  Void
cm m a -> (SomeException -> m a) -> m a
forall e a. Exception e => m a -> (e -> m a) -> m a
forall (m :: * -> *) e a.
(MonadCatch m, Exception e) =>
m a -> (e -> m a) -> m a
`catch` \(SomeException
e :: SomeException) -> SomeException -> m a
forall e a. Exception e => e -> m a
forall (m :: * -> *) e a. (MonadThrow m, Exception e) => e -> m a
throwIO SomeException
e
  where
    clientApplication :: TemperatureBundle
                           [MiniProtocol Mx.InitiatorMode
                                         (ExpandedInitiatorContext peerAddr m)
                                         (ResponderContext peerAddr)
                                         ByteString m [resp] Void]
    clientApplication :: OuroborosBundle
  'InitiatorMode
  (ExpandedInitiatorContext peerAddr m)
  (ResponderContext peerAddr)
  ByteString
  m
  [resp]
  Void
clientApplication = MiniProtocolNum
-> (ConnectionId peerAddr -> STM m [req])
-> [MiniProtocol
      'InitiatorMode
      (ExpandedInitiatorContext peerAddr m)
      (ResponderContext peerAddr)
      ByteString
      m
      [resp]
      Void]
mkProto (MiniProtocolNum
 -> (ConnectionId peerAddr -> STM m [req])
 -> [MiniProtocol
       'InitiatorMode
       (ExpandedInitiatorContext peerAddr m)
       (ResponderContext peerAddr)
       ByteString
       m
       [resp]
       Void])
-> TemperatureBundle MiniProtocolNum
-> TemperatureBundle
     ((ConnectionId peerAddr -> STM m [req])
      -> [MiniProtocol
            'InitiatorMode
            (ExpandedInitiatorContext peerAddr m)
            (ResponderContext peerAddr)
            ByteString
            m
            [resp]
            Void])
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> (Word16 -> MiniProtocolNum
Mx.MiniProtocolNum (Word16 -> MiniProtocolNum)
-> TemperatureBundle Word16 -> TemperatureBundle MiniProtocolNum
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> TemperatureBundle Word16
nums)
                                TemperatureBundle
  ((ConnectionId peerAddr -> STM m [req])
   -> [MiniProtocol
         'InitiatorMode
         (ExpandedInitiatorContext peerAddr m)
         (ResponderContext peerAddr)
         ByteString
         m
         [resp]
         Void])
-> TemperatureBundle (ConnectionId peerAddr -> STM m [req])
-> OuroborosBundle
     'InitiatorMode
     (ExpandedInitiatorContext peerAddr m)
     (ResponderContext peerAddr)
     ByteString
     m
     [resp]
     Void
forall a b.
TemperatureBundle (a -> b)
-> TemperatureBundle a -> TemperatureBundle b
forall (f :: * -> *) a b. Applicative f => f (a -> b) -> f a -> f b
<*> TemperatureBundle (ConnectionId peerAddr -> STM m [req])
nextRequests

      where nums :: TemperatureBundle Word16
nums = WithProtocolTemperature 'Hot Word16
-> WithProtocolTemperature 'Warm Word16
-> WithProtocolTemperature 'Established Word16
-> TemperatureBundle Word16
forall a.
WithProtocolTemperature 'Hot a
-> WithProtocolTemperature 'Warm a
-> WithProtocolTemperature 'Established a
-> TemperatureBundle a
TemperatureBundle (Word16 -> WithProtocolTemperature 'Hot Word16
forall a. a -> WithProtocolTemperature 'Hot a
WithHot Word16
1) (Word16 -> WithProtocolTemperature 'Warm Word16
forall a. a -> WithProtocolTemperature 'Warm a
WithWarm Word16
2) (Word16 -> WithProtocolTemperature 'Established Word16
forall a. a -> WithProtocolTemperature 'Established a
WithEstablished Word16
3)
            mkProto :: MiniProtocolNum
-> (ConnectionId peerAddr -> STM m [req])
-> [MiniProtocol
      'InitiatorMode
      (ExpandedInitiatorContext peerAddr m)
      (ResponderContext peerAddr)
      ByteString
      m
      [resp]
      Void]
mkProto MiniProtocolNum
miniProtocolNum ConnectionId peerAddr -> STM m [req]
nextRequest =
              [MiniProtocol {
                  MiniProtocolNum
miniProtocolNum :: MiniProtocolNum
miniProtocolNum :: MiniProtocolNum
miniProtocolNum,
                  miniProtocolStart :: StartOnDemandOrEagerly
miniProtocolStart  = StartOnDemandOrEagerly
StartOnDemand,
                  miniProtocolLimits :: MiniProtocolLimits
miniProtocolLimits = Int -> MiniProtocolLimits
Mx.MiniProtocolLimits Int
forall a. Bounded a => a
maxBound,
                  miniProtocolRun :: RunMiniProtocol
  'InitiatorMode
  (ExpandedInitiatorContext peerAddr m)
  (ResponderContext peerAddr)
  ByteString
  m
  [resp]
  Void
miniProtocolRun = MiniProtocolNum
-> (ConnectionId peerAddr -> STM m [req])
-> RunMiniProtocol
     'InitiatorMode
     (ExpandedInitiatorContext peerAddr m)
     (ResponderContext peerAddr)
     ByteString
     m
     [resp]
     Void
reqRespInitiator MiniProtocolNum
miniProtocolNum
                                                     ConnectionId peerAddr -> STM m [req]
nextRequest
                }]

    reqRespInitiator :: Mx.MiniProtocolNum
                     -> (ConnectionId peerAddr -> STM m [req])
                     -> RunMiniProtocol Mx.InitiatorMode
                                        (ExpandedInitiatorContext peerAddr m)
                                        (ResponderContext peerAddr)
                                        ByteString m [resp] Void
    reqRespInitiator :: MiniProtocolNum
-> (ConnectionId peerAddr -> STM m [req])
-> RunMiniProtocol
     'InitiatorMode
     (ExpandedInitiatorContext peerAddr m)
     (ResponderContext peerAddr)
     ByteString
     m
     [resp]
     Void
reqRespInitiator MiniProtocolNum
protocolNum ConnectionId peerAddr -> STM m [req]
nextRequest =
      MiniProtocolCb
  (ExpandedInitiatorContext peerAddr m) ByteString m [resp]
-> RunMiniProtocol
     'InitiatorMode
     (ExpandedInitiatorContext peerAddr m)
     (ResponderContext peerAddr)
     ByteString
     m
     [resp]
     Void
forall initiatorCtx bytes (m :: * -> *) a responderCtx.
MiniProtocolCb initiatorCtx bytes m a
-> RunMiniProtocol
     'InitiatorMode initiatorCtx responderCtx bytes m a Void
InitiatorProtocolOnly
        ((ExpandedInitiatorContext peerAddr m
 -> Channel m ByteString -> m ([resp], Maybe ByteString))
-> MiniProtocolCb
     (ExpandedInitiatorContext peerAddr m) ByteString m [resp]
forall ctx bytes (m :: * -> *) a.
(ctx -> Channel m bytes -> m (a, Maybe bytes))
-> MiniProtocolCb ctx bytes m a
MiniProtocolCb ((ExpandedInitiatorContext peerAddr m
  -> Channel m ByteString -> m ([resp], Maybe ByteString))
 -> MiniProtocolCb
      (ExpandedInitiatorContext peerAddr m) ByteString m [resp])
-> (ExpandedInitiatorContext peerAddr m
    -> Channel m ByteString -> m ([resp], Maybe ByteString))
-> MiniProtocolCb
     (ExpandedInitiatorContext peerAddr m) ByteString m [resp]
forall a b. (a -> b) -> a -> b
$ \ExpandedInitiatorContext { eicConnectionId :: forall addr (m :: * -> *).
ExpandedInitiatorContext addr m -> ConnectionId addr
eicConnectionId = ConnectionId peerAddr
connId } Channel m ByteString
channel ->
           Tracer m (TraceSendRecv (ReqResp req resp))
-> Codec (ReqResp req resp) DeserialiseFailure m ByteString
-> ProtocolSizeLimits (ReqResp req resp) ByteString
-> ProtocolTimeLimits (ReqResp req resp)
-> Channel m ByteString
-> Peer (ReqResp req resp) 'AsClient 'NonPipelined 'StIdle m [resp]
-> m ([resp], Maybe ByteString)
forall ps (st :: ps) (pr :: PeerRole) failure bytes (m :: * -> *)
       a.
(MonadAsync m, MonadFork m, MonadMask m, MonadThrow (STM m),
 MonadTimer m, ShowProxy ps,
 forall (st' :: ps) stok. (stok ~ StateToken st') => Show stok,
 Show failure) =>
Tracer m (TraceSendRecv ps)
-> Codec ps failure m bytes
-> ProtocolSizeLimits ps bytes
-> ProtocolTimeLimits ps
-> Channel m bytes
-> Peer ps pr 'NonPipelined st m a
-> m (a, Maybe bytes)
runPeerWithLimits
             ((name, String, MiniProtocolNum)
-> TraceSendRecv (ReqResp req resp)
-> WithName
     (name, String, MiniProtocolNum) (TraceSendRecv (ReqResp req resp))
forall name event. name -> event -> WithName name event
WithName (name
name,String
"Initiator",MiniProtocolNum
protocolNum) (TraceSendRecv (ReqResp req resp)
 -> WithName
      (name, String, MiniProtocolNum) (TraceSendRecv (ReqResp req resp)))
-> Tracer
     m
     (WithName
        (name, String, MiniProtocolNum) (TraceSendRecv (ReqResp req resp)))
-> Tracer m (TraceSendRecv (ReqResp req resp))
forall a' a. (a' -> a) -> Tracer m a -> Tracer m a'
forall (f :: * -> *) a' a.
Contravariant f =>
(a' -> a) -> f a -> f a'
`contramap` Tracer
  m
  (WithName
     (name, String, MiniProtocolNum) (TraceSendRecv (ReqResp req resp)))
forall (m :: * -> *) a. Applicative m => Tracer m a
nullTracer)
             -- TraceSendRecv
             Codec (ReqResp req resp) DeserialiseFailure m ByteString
forall req resp (m :: * -> *).
(MonadST m, Serialise req, Serialise resp) =>
Codec (ReqResp req resp) DeserialiseFailure m ByteString
codecReqResp
             ProtocolSizeLimits (ReqResp req resp) ByteString
forall {k} {k1} (req :: k) (resp :: k1).
ProtocolSizeLimits (ReqResp req resp) ByteString
reqRespSizeLimits
             ProtocolTimeLimits (ReqResp req resp)
forall {k} {k1} (req :: k) (resp :: k1).
ProtocolTimeLimits (ReqResp req resp)
reqRespTimeLimits
             Channel m ByteString
channel
             (m (Peer
     (ReqResp req resp) 'AsClient 'NonPipelined 'StIdle m [resp])
-> Peer (ReqResp req resp) 'AsClient 'NonPipelined 'StIdle m [resp]
forall ps (pl :: IsPipelined) (st :: ps) (m :: * -> *) a.
m (Client ps pl st m a) -> Client ps pl st m a
Effect (m (Peer
      (ReqResp req resp) 'AsClient 'NonPipelined 'StIdle m [resp])
 -> Peer
      (ReqResp req resp) 'AsClient 'NonPipelined 'StIdle m [resp])
-> m (Peer
        (ReqResp req resp) 'AsClient 'NonPipelined 'StIdle m [resp])
-> Peer (ReqResp req resp) 'AsClient 'NonPipelined 'StIdle m [resp]
forall a b. (a -> b) -> a -> b
$ do
               reqs <- STM m [req] -> m [req]
forall a. HasCallStack => STM m a -> m a
forall (m :: * -> *) a.
(MonadSTM m, HasCallStack) =>
STM m a -> m a
atomically (ConnectionId peerAddr -> STM m [req]
nextRequest ConnectionId peerAddr
connId)
               pure $ reqRespClientPeer (reqRespClientMap reqs)))


--
-- Rethrow policies
--

debugMuxErrorRethrowPolicy :: RethrowPolicy
debugMuxErrorRethrowPolicy :: RethrowPolicy
debugMuxErrorRethrowPolicy =
    (ErrorContext -> Error -> ErrorCommand) -> RethrowPolicy
forall e.
Exception e =>
(ErrorContext -> e -> ErrorCommand) -> RethrowPolicy
mkRethrowPolicy ((ErrorContext -> Error -> ErrorCommand) -> RethrowPolicy)
-> (ErrorContext -> Error -> ErrorCommand) -> RethrowPolicy
forall a b. (a -> b) -> a -> b
$
    \ErrorContext
_ Error
e ->
        case Error
e of
          Mx.IOException {}  -> ErrorCommand
ShutdownPeer
          Mx.BearerClosed {} -> ErrorCommand
ShutdownPeer
          Error
Mx.SDUReadTimeout  -> ErrorCommand
ShutdownPeer
          Error
Mx.SDUWriteTimeout -> ErrorCommand
ShutdownPeer
          Error
_                  -> ErrorCommand
ShutdownNode

debugMuxRuntimeErrorRethrowPolicy :: RethrowPolicy
debugMuxRuntimeErrorRethrowPolicy :: RethrowPolicy
debugMuxRuntimeErrorRethrowPolicy =
    (ErrorContext -> RuntimeError -> ErrorCommand) -> RethrowPolicy
forall e.
Exception e =>
(ErrorContext -> e -> ErrorCommand) -> RethrowPolicy
mkRethrowPolicy ((ErrorContext -> RuntimeError -> ErrorCommand) -> RethrowPolicy)
-> (ErrorContext -> RuntimeError -> ErrorCommand) -> RethrowPolicy
forall a b. (a -> b) -> a -> b
$
      \ErrorContext
_ (RuntimeError
_ :: Mx.RuntimeError) -> ErrorCommand
ShutdownPeer

debugIOErrorRethrowPolicy :: RethrowPolicy
debugIOErrorRethrowPolicy :: RethrowPolicy
debugIOErrorRethrowPolicy =
    (ErrorContext -> IOError -> ErrorCommand) -> RethrowPolicy
forall e.
Exception e =>
(ErrorContext -> e -> ErrorCommand) -> RethrowPolicy
mkRethrowPolicy ((ErrorContext -> IOError -> ErrorCommand) -> RethrowPolicy)
-> (ErrorContext -> IOError -> ErrorCommand) -> RethrowPolicy
forall a b. (a -> b) -> a -> b
$
      \ErrorContext
_ (IOError
_ :: IOError) -> ErrorCommand
ShutdownPeer


assertRethrowPolicy :: RethrowPolicy
assertRethrowPolicy :: RethrowPolicy
assertRethrowPolicy =
    (ErrorContext -> AssertionFailed -> ErrorCommand) -> RethrowPolicy
forall e.
Exception e =>
(ErrorContext -> e -> ErrorCommand) -> RethrowPolicy
mkRethrowPolicy ((ErrorContext -> AssertionFailed -> ErrorCommand)
 -> RethrowPolicy)
-> (ErrorContext -> AssertionFailed -> ErrorCommand)
-> RethrowPolicy
forall a b. (a -> b) -> a -> b
$
      \ErrorContext
_ (AssertionFailed
_ :: AssertionFailed) -> ErrorCommand
ShutdownNode

type WithNameAndBearer name addr = Compose (WithName name) (Mx.WithBearer (ConnectionId addr))

-- | Runs an example server which runs a single 'ReqResp' protocol for any hot
-- \/ warm \/ established peers and also gives access to bidirectional
-- 'ConnectionManager'.  This gives a way to connect to other peers.
-- Slightly unfortunate design decision does not give us a way to create
-- a client per connection.  This means that this connection manager takes list
-- of 'req' type which it will make to the other side (they will be multiplexed
-- across warm \/ how \/ established) protocols.
--
withBidirectionalConnectionManager
    :: forall name peerAddr socket acc req resp m a.
       ( ConnectionManagerMonad m

       , acc ~ [req], resp ~ [req]
       , Ord peerAddr, Show peerAddr, Typeable peerAddr
       , Serialise req, Typeable req

       -- debugging
       , MonadAsync m
       , MonadDelay m
       , MonadFix m
       , MonadLabelledSTM m
       , MonadTraceSTM m
       , MonadSay m, Show req
       , Show name
       )
    => name
    -> Timeouts
    -- ^ identifier (for logging)
    -> Tracer m (WithName name (RemoteTransitionTrace peerAddr))
    -> Tracer m (WithName name (AbstractTransitionTrace CM.ConnStateId))
    -> Tracer m (WithName name
                          (CM.Trace
                            peerAddr
                            (ConnectionHandlerTrace UnversionedProtocol DataFlowProtocolData)))
    -> Tracer m (WithName name (InboundGovernor.Trace peerAddr))
    -> Mx.Tracers' m (WithNameAndBearer name peerAddr)
    -> Tracer m (WithName name (InboundGovernor.Debug peerAddr DataFlowProtocolData))
    -> StdGen
    -> Snocket m socket peerAddr
    -> Mx.MakeBearer m socket
    -> CM.ConnStateIdSupply m
    -> (socket -> m ()) -- ^ configure socket
    -> socket
    -- ^ listening socket
    -> Maybe peerAddr
    -> acc
    -- ^ Initial state for the server
    -> TemperatureBundle (ConnectionId peerAddr -> STM m [req])
    -- ^ Functions to get the next requests for a given connection
    -- ^ series of request possible to do with the bidirectional connection
    -- manager towards some peer.
    -> ProtocolTimeLimits (Handshake UnversionedProtocol Term)
    -- ^ Handshake time limits
    -> AcceptedConnectionsLimit
    -> (ConnectionManagerWithExpandedCtx
          Mx.InitiatorResponderMode socket peerAddr
          DataFlowProtocolData UnversionedProtocol ByteString m [resp] acc
       -> peerAddr
       -> Async m Void
       -> m a)
    -> m a
withBidirectionalConnectionManager :: forall name peerAddr socket acc req resp (m :: * -> *) a.
(ConnectionManagerMonad m, acc ~ [req], resp ~ [req], Ord peerAddr,
 Show peerAddr, Typeable peerAddr, Serialise req, Typeable req,
 MonadAsync m, MonadDelay m, MonadFix m, MonadLabelledSTM m,
 MonadTraceSTM m, MonadSay m, Show req, Show name) =>
name
-> Timeouts
-> Tracer m (WithName name (RemoteTransitionTrace peerAddr))
-> Tracer m (WithName name (AbstractTransitionTrace ConnStateId))
-> Tracer
     m
     (WithName
        name
        (Trace
           peerAddr
           (ConnectionHandlerTrace UnversionedProtocol DataFlowProtocolData)))
-> Tracer m (WithName name (Trace peerAddr))
-> Tracers' m (WithNameAndBearer name peerAddr)
-> Tracer m (WithName name (Debug peerAddr DataFlowProtocolData))
-> StdGen
-> Snocket m socket peerAddr
-> MakeBearer m socket
-> ConnStateIdSupply m
-> (socket -> m ())
-> socket
-> Maybe peerAddr
-> acc
-> TemperatureBundle (ConnectionId peerAddr -> STM m [req])
-> ProtocolTimeLimits (Handshake UnversionedProtocol Term)
-> AcceptedConnectionsLimit
-> (ConnectionManagerWithExpandedCtx
      'InitiatorResponderMode
      socket
      peerAddr
      DataFlowProtocolData
      UnversionedProtocol
      ByteString
      m
      [resp]
      acc
    -> peerAddr -> Async m Void -> m a)
-> m a
withBidirectionalConnectionManager name
name Timeouts
timeouts
                                   Tracer m (WithName name (RemoteTransitionTrace peerAddr))
inboundTrTracer Tracer m (WithName name (AbstractTransitionTrace ConnStateId))
trTracer
                                   Tracer
  m
  (WithName
     name
     (Trace
        peerAddr
        (ConnectionHandlerTrace UnversionedProtocol DataFlowProtocolData)))
tracer Tracer m (WithName name (Trace peerAddr))
inboundTracer Tracers' m (WithNameAndBearer name peerAddr)
muxTracer Tracer m (WithName name (Debug peerAddr DataFlowProtocolData))
debugTracer
                                   StdGen
stdGen
                                   Snocket m socket peerAddr
snocket MakeBearer m socket
makeBearer ConnStateIdSupply m
connStateIdSupply
                                   socket -> m ()
confSock socket
socket
                                   Maybe peerAddr
localAddress
                                   acc
accumulatorInit TemperatureBundle (ConnectionId peerAddr -> STM m [req])
nextRequests
                                   ProtocolTimeLimits (Handshake UnversionedProtocol Term)
handshakeTimeLimits
                                   AcceptedConnectionsLimit
acceptedConnLimit ConnectionManagerWithExpandedCtx
  'InitiatorResponderMode
  socket
  peerAddr
  DataFlowProtocolData
  UnversionedProtocol
  ByteString
  m
  [resp]
  acc
-> peerAddr -> Async m Void -> m a
k = do
    mainThreadId <- m (ThreadId m)
forall (m :: * -> *). MonadThread m => m (ThreadId m)
myThreadId
    inbgovInfoChannel <- newInformationChannel
    let mkConnectionHandler =
          TracersWithBearer (ConnectionId peerAddr) m
-> ForkPolicy peerAddr
-> HandshakeArguments
     (ConnectionId peerAddr) UnversionedProtocol DataFlowProtocolData m
-> Versions
     UnversionedProtocol
     DataFlowProtocolData
     (OuroborosBundle
        'InitiatorResponderMode
        (ExpandedInitiatorContext peerAddr m)
        (ResponderContext peerAddr)
        ByteString
        m
        [resp]
        acc)
-> (ThreadId m, RethrowPolicy)
-> MkMuxConnectionHandler
     'InitiatorResponderMode
     socket
     (ExpandedInitiatorContext peerAddr m)
     (ResponderContext peerAddr)
     peerAddr
     UnversionedProtocol
     DataFlowProtocolData
     ByteString
     m
     [resp]
     acc
-> MuxConnectionHandler
     'InitiatorResponderMode
     socket
     (ExpandedInitiatorContext peerAddr m)
     (ResponderContext peerAddr)
     peerAddr
     UnversionedProtocol
     DataFlowProtocolData
     ByteString
     m
     [resp]
     acc
forall initiatorCtx responderCtx peerAddr (muxMode :: Mode) socket
       versionNumber versionData (m :: * -> *) a b.
(Alternative (STM m), MonadAsync m, MonadDelay m, MonadFork m,
 MonadLabelledSTM m, MonadThrow (STM m), MonadTimer m, MonadMask m,
 Ord versionNumber, Show peerAddr, Typeable peerAddr) =>
TracersWithBearer (ConnectionId peerAddr) m
-> ForkPolicy peerAddr
-> HandshakeArguments
     (ConnectionId peerAddr) versionNumber versionData m
-> Versions
     versionNumber
     versionData
     (OuroborosBundle
        muxMode initiatorCtx responderCtx ByteString m a b)
-> (ThreadId m, RethrowPolicy)
-> MkMuxConnectionHandler
     muxMode
     socket
     initiatorCtx
     responderCtx
     peerAddr
     versionNumber
     versionData
     ByteString
     m
     a
     b
-> MuxConnectionHandler
     muxMode
     socket
     initiatorCtx
     responderCtx
     peerAddr
     versionNumber
     versionData
     ByteString
     m
     a
     b
makeConnectionHandler
            ((WithName name (WithBearer (ConnectionId peerAddr) x)
-> WithNameAndBearer name peerAddr x
forall {k} {k1} (f :: k -> *) (g :: k1 -> k) (a :: k1).
f (g a) -> Compose f g a
Compose (WithName name (WithBearer (ConnectionId peerAddr) x)
 -> WithNameAndBearer name peerAddr x)
-> (WithBearer (ConnectionId peerAddr) x
    -> WithName name (WithBearer (ConnectionId peerAddr) x))
-> WithBearer (ConnectionId peerAddr) x
-> WithNameAndBearer name peerAddr x
forall b c a. (b -> c) -> (a -> b) -> a -> c
. name
-> WithBearer (ConnectionId peerAddr) x
-> WithName name (WithBearer (ConnectionId peerAddr) x)
forall name event. name -> event -> WithName name event
WithName name
name) (forall {x}.
 WithBearer (ConnectionId peerAddr) x
 -> WithNameAndBearer name peerAddr x)
-> Tracers' m (WithNameAndBearer name peerAddr)
-> TracersWithBearer (ConnectionId peerAddr) m
forall (f' :: * -> *) (f :: * -> *) (m :: * -> *).
(forall x. f' x -> f x) -> Tracers' m f -> Tracers' m f'
`Mx.contramapTracers'` Tracers' m (WithNameAndBearer name peerAddr)
muxTracer)
            ForkPolicy peerAddr
forall peerAddr. ForkPolicy peerAddr
noBindForkPolicy
            HandshakeArguments {
                -- TraceSendRecv
                haHandshakeTracer :: Tracer
  m
  (WithBearer
     (ConnectionId peerAddr)
     (TraceSendRecv (Handshake UnversionedProtocol Term)))
haHandshakeTracer = name
-> WithBearer
     (ConnectionId peerAddr)
     (TraceSendRecv (Handshake UnversionedProtocol Term))
-> WithName
     name
     (WithBearer
        (ConnectionId peerAddr)
        (TraceSendRecv (Handshake UnversionedProtocol Term)))
forall name event. name -> event -> WithName name event
WithName name
name (WithBearer
   (ConnectionId peerAddr)
   (TraceSendRecv (Handshake UnversionedProtocol Term))
 -> WithName
      name
      (WithBearer
         (ConnectionId peerAddr)
         (TraceSendRecv (Handshake UnversionedProtocol Term))))
-> Tracer
     m
     (WithName
        name
        (WithBearer
           (ConnectionId peerAddr)
           (TraceSendRecv (Handshake UnversionedProtocol Term))))
-> Tracer
     m
     (WithBearer
        (ConnectionId peerAddr)
        (TraceSendRecv (Handshake UnversionedProtocol Term)))
forall a' a. (a' -> a) -> Tracer m a -> Tracer m a'
forall (f :: * -> *) a' a.
Contravariant f =>
(a' -> a) -> f a -> f a'
`contramap` Tracer
  m
  (WithName
     name
     (WithBearer
        (ConnectionId peerAddr)
        (TraceSendRecv (Handshake UnversionedProtocol Term))))
forall (m :: * -> *) a. Applicative m => Tracer m a
nullTracer,
                haBearerTracer :: Tracer m (WithBearer (ConnectionId peerAddr) BearerTrace)
haBearerTracer = WithBearer (ConnectionId peerAddr) BearerTrace
-> ZonkAny 0
-> WithName
     (WithBearer (ConnectionId peerAddr) BearerTrace) (ZonkAny 0)
forall name event. name -> event -> WithName name event
WithName (WithBearer (ConnectionId peerAddr) BearerTrace
 -> ZonkAny 0
 -> WithName
      (WithBearer (ConnectionId peerAddr) BearerTrace) (ZonkAny 0))
-> Tracer
     m
     (ZonkAny 0
      -> WithName
           (WithBearer (ConnectionId peerAddr) BearerTrace) (ZonkAny 0))
-> Tracer m (WithBearer (ConnectionId peerAddr) BearerTrace)
forall a' a. (a' -> a) -> Tracer m a -> Tracer m a'
forall (f :: * -> *) a' a.
Contravariant f =>
(a' -> a) -> f a -> f a'
`contramap` Tracer
  m
  (ZonkAny 0
   -> WithName
        (WithBearer (ConnectionId peerAddr) BearerTrace) (ZonkAny 0))
forall (m :: * -> *) a. Applicative m => Tracer m a
nullTracer,
                haHandshakeCodec :: Codec
  (Handshake UnversionedProtocol Term)
  DeserialiseFailure
  m
  ByteString
haHandshakeCodec = Codec
  (Handshake UnversionedProtocol Term)
  DeserialiseFailure
  m
  ByteString
forall (m :: * -> *).
MonadST m =>
Codec
  (Handshake UnversionedProtocol Term)
  DeserialiseFailure
  m
  ByteString
unversionedHandshakeCodec,
                haVersionDataCodec :: VersionDataCodec Term UnversionedProtocol DataFlowProtocolData
haVersionDataCodec = (UnversionedProtocol -> CodecCBORTerm Text DataFlowProtocolData)
-> VersionDataCodec Term UnversionedProtocol DataFlowProtocolData
forall vNumber vData.
(vNumber -> CodecCBORTerm Text vData)
-> VersionDataCodec Term vNumber vData
cborTermVersionDataCodec UnversionedProtocol -> CodecCBORTerm Text DataFlowProtocolData
dataFlowProtocolDataCodec,
                haAcceptVersion :: DataFlowProtocolData
-> DataFlowProtocolData -> Accept DataFlowProtocolData
haAcceptVersion = DataFlowProtocolData
-> DataFlowProtocolData -> Accept DataFlowProtocolData
forall v. Acceptable v => v -> v -> Accept v
acceptableVersion,
                haQueryVersion :: DataFlowProtocolData -> Bool
haQueryVersion = DataFlowProtocolData -> Bool
forall v. Queryable v => v -> Bool
queryVersion,
                haTimeLimits :: ProtocolTimeLimits (Handshake UnversionedProtocol Term)
haTimeLimits = ProtocolTimeLimits (Handshake UnversionedProtocol Term)
handshakeTimeLimits
              }
            (DataFlow
-> OuroborosBundle
     'InitiatorResponderMode
     (ExpandedInitiatorContext peerAddr m)
     (ResponderContext peerAddr)
     ByteString
     m
     [resp]
     acc
-> Versions
     UnversionedProtocol
     DataFlowProtocolData
     (OuroborosBundle
        'InitiatorResponderMode
        (ExpandedInitiatorContext peerAddr m)
        (ResponderContext peerAddr)
        ByteString
        m
        [resp]
        acc)
forall app.
DataFlow
-> app -> Versions UnversionedProtocol DataFlowProtocolData app
dataFlowProtocol DataFlow
Duplex OuroborosBundle
  'InitiatorResponderMode
  (ExpandedInitiatorContext peerAddr m)
  (ResponderContext peerAddr)
  ByteString
  m
  [resp]
  acc
serverApplication)
            (ThreadId m
mainThreadId,   RethrowPolicy
debugMuxErrorRethrowPolicy
                          RethrowPolicy -> RethrowPolicy -> RethrowPolicy
forall a. Semigroup a => a -> a -> a
<> RethrowPolicy
debugMuxRuntimeErrorRethrowPolicy
                          RethrowPolicy -> RethrowPolicy -> RethrowPolicy
forall a. Semigroup a => a -> a -> a
<> RethrowPolicy
debugIOErrorRethrowPolicy
                          RethrowPolicy -> RethrowPolicy -> RethrowPolicy
forall a. Semigroup a => a -> a -> a
<> RethrowPolicy
assertRethrowPolicy)

        withConnectionManager MuxConnectionHandler
  'InitiatorResponderMode
  socket
  (ExpandedInitiatorContext peerAddr m)
  (ResponderContext peerAddr)
  peerAddr
  UnversionedProtocol
  DataFlowProtocolData
  ByteString
  m
  [resp]
  acc
connectionHandler ConnectionManagerWithExpandedCtx
  'InitiatorResponderMode
  socket
  peerAddr
  DataFlowProtocolData
  UnversionedProtocol
  ByteString
  m
  [resp]
  acc
-> m a
k' =
          Arguments
  (ConnectionHandlerTrace UnversionedProtocol DataFlowProtocolData)
  socket
  peerAddr
  (Handle
     'InitiatorResponderMode
     (ExpandedInitiatorContext peerAddr m)
     (ResponderContext peerAddr)
     DataFlowProtocolData
     ByteString
     m
     [resp]
     acc)
  (HandlerError UnversionedProtocol)
  UnversionedProtocol
  DataFlowProtocolData
  m
  [resp]
  acc
-> InResponderMode
     'InitiatorResponderMode
     (InformationChannel
        (Event
           'InitiatorResponderMode
           (Handle
              'InitiatorResponderMode
              (ExpandedInitiatorContext peerAddr m)
              (ResponderContext peerAddr)
              DataFlowProtocolData
              ByteString
              m
              [resp]
              acc)
           (ExpandedInitiatorContext peerAddr m)
           peerAddr
           DataFlowProtocolData
           m
           [resp]
           acc)
        m)
-> MuxConnectionHandler
     'InitiatorResponderMode
     socket
     (ExpandedInitiatorContext peerAddr m)
     (ResponderContext peerAddr)
     peerAddr
     UnversionedProtocol
     DataFlowProtocolData
     ByteString
     m
     [resp]
     acc
-> (ConnectionManagerWithExpandedCtx
      'InitiatorResponderMode
      socket
      peerAddr
      DataFlowProtocolData
      UnversionedProtocol
      ByteString
      m
      [resp]
      acc
    -> m a)
-> m a
forall (muxMode :: Mode) peerAddr socket initiatorCtx handlerTrace
       handle handleError version versionData (m :: * -> *) a b x.
(Alternative (STM m), MonadLabelledSTM m, MonadTraceSTM m,
 MonadFork m, MonadAsync m, MonadDelay m, MonadEvaluate m,
 MonadFix m, MonadMask m, MonadThrow (STM m), MonadTimer m,
 Ord peerAddr, Show peerAddr, Typeable peerAddr) =>
Arguments
  handlerTrace
  socket
  peerAddr
  handle
  handleError
  version
  versionData
  m
  a
  b
-> InResponderMode
     muxMode
     (InformationChannel
        (Event muxMode handle initiatorCtx peerAddr versionData m a b) m)
-> ConnectionHandler
     muxMode
     handlerTrace
     socket
     peerAddr
     handle
     handleError
     version
     versionData
     m
-> (ConnectionManager muxMode socket peerAddr handle handleError m
    -> m x)
-> m x
CM.with CM.Arguments {
            -- ConnectionManagerTrace
            tracer :: Tracer
  m
  (Trace
     peerAddr
     (ConnectionHandlerTrace UnversionedProtocol DataFlowProtocolData))
tracer    = name
-> Trace
     peerAddr
     (ConnectionHandlerTrace UnversionedProtocol DataFlowProtocolData)
-> WithName
     name
     (Trace
        peerAddr
        (ConnectionHandlerTrace UnversionedProtocol DataFlowProtocolData))
forall name event. name -> event -> WithName name event
WithName name
name
                          (Trace
   peerAddr
   (ConnectionHandlerTrace UnversionedProtocol DataFlowProtocolData)
 -> WithName
      name
      (Trace
         peerAddr
         (ConnectionHandlerTrace UnversionedProtocol DataFlowProtocolData)))
-> Tracer
     m
     (WithName
        name
        (Trace
           peerAddr
           (ConnectionHandlerTrace UnversionedProtocol DataFlowProtocolData)))
-> Tracer
     m
     (Trace
        peerAddr
        (ConnectionHandlerTrace UnversionedProtocol DataFlowProtocolData))
forall a' a. (a' -> a) -> Tracer m a -> Tracer m a'
forall (f :: * -> *) a' a.
Contravariant f =>
(a' -> a) -> f a -> f a'
`contramap` Tracer
  m
  (WithName
     name
     (Trace
        peerAddr
        (ConnectionHandlerTrace UnversionedProtocol DataFlowProtocolData)))
tracer,
            trTracer :: Tracer
  m
  (TransitionTrace
     ConnStateId
     (ConnectionState
        peerAddr
        (Handle
           'InitiatorResponderMode
           (ExpandedInitiatorContext peerAddr m)
           (ResponderContext peerAddr)
           DataFlowProtocolData
           ByteString
           m
           [resp]
           acc)
        (HandlerError UnversionedProtocol)
        UnversionedProtocol
        m))
trTracer  = (name
-> AbstractTransitionTrace ConnStateId
-> WithName name (AbstractTransitionTrace ConnStateId)
forall name event. name -> event -> WithName name event
WithName name
name (AbstractTransitionTrace ConnStateId
 -> WithName name (AbstractTransitionTrace ConnStateId))
-> (TransitionTrace
      ConnStateId
      (ConnectionState
         peerAddr
         (Handle
            'InitiatorResponderMode
            (ExpandedInitiatorContext peerAddr m)
            (ResponderContext peerAddr)
            DataFlowProtocolData
            ByteString
            m
            [resp]
            acc)
         (HandlerError UnversionedProtocol)
         UnversionedProtocol
         m)
    -> AbstractTransitionTrace ConnStateId)
-> TransitionTrace
     ConnStateId
     (ConnectionState
        peerAddr
        (Handle
           'InitiatorResponderMode
           (ExpandedInitiatorContext peerAddr m)
           (ResponderContext peerAddr)
           DataFlowProtocolData
           ByteString
           m
           [resp]
           acc)
        (HandlerError UnversionedProtocol)
        UnversionedProtocol
        m)
-> WithName name (AbstractTransitionTrace ConnStateId)
forall b c a. (b -> c) -> (a -> b) -> a -> c
. (MaybeUnknown
   (ConnectionState
      peerAddr
      (Handle
         'InitiatorResponderMode
         (ExpandedInitiatorContext peerAddr m)
         (ResponderContext peerAddr)
         DataFlowProtocolData
         ByteString
         m
         [resp]
         acc)
      (HandlerError UnversionedProtocol)
      UnversionedProtocol
      m)
 -> AbstractState)
-> TransitionTrace
     ConnStateId
     (ConnectionState
        peerAddr
        (Handle
           'InitiatorResponderMode
           (ExpandedInitiatorContext peerAddr m)
           (ResponderContext peerAddr)
           DataFlowProtocolData
           ByteString
           m
           [resp]
           acc)
        (HandlerError UnversionedProtocol)
        UnversionedProtocol
        m)
-> AbstractTransitionTrace ConnStateId
forall a b.
(a -> b)
-> TransitionTrace' ConnStateId a -> TransitionTrace' ConnStateId b
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
fmap MaybeUnknown
  (ConnectionState
     peerAddr
     (Handle
        'InitiatorResponderMode
        (ExpandedInitiatorContext peerAddr m)
        (ResponderContext peerAddr)
        DataFlowProtocolData
        ByteString
        m
        [resp]
        acc)
     (HandlerError UnversionedProtocol)
     UnversionedProtocol
     m)
-> AbstractState
forall muxMode peerAddr m a (b :: * -> *).
MaybeUnknown (ConnectionState muxMode peerAddr m a b)
-> AbstractState
CM.abstractState)
                          (TransitionTrace
   ConnStateId
   (ConnectionState
      peerAddr
      (Handle
         'InitiatorResponderMode
         (ExpandedInitiatorContext peerAddr m)
         (ResponderContext peerAddr)
         DataFlowProtocolData
         ByteString
         m
         [resp]
         acc)
      (HandlerError UnversionedProtocol)
      UnversionedProtocol
      m)
 -> WithName name (AbstractTransitionTrace ConnStateId))
-> Tracer m (WithName name (AbstractTransitionTrace ConnStateId))
-> Tracer
     m
     (TransitionTrace
        ConnStateId
        (ConnectionState
           peerAddr
           (Handle
              'InitiatorResponderMode
              (ExpandedInitiatorContext peerAddr m)
              (ResponderContext peerAddr)
              DataFlowProtocolData
              ByteString
              m
              [resp]
              acc)
           (HandlerError UnversionedProtocol)
           UnversionedProtocol
           m))
forall a' a. (a' -> a) -> Tracer m a -> Tracer m a'
forall (f :: * -> *) a' a.
Contravariant f =>
(a' -> a) -> f a -> f a'
`contramap` Tracer m (WithName name (AbstractTransitionTrace ConnStateId))
trTracer,
            -- low level bearer tracer
            ipv4Address :: Maybe peerAddr
ipv4Address  = Maybe peerAddr
localAddress,
            ipv6Address :: Maybe peerAddr
ipv6Address  = Maybe peerAddr
forall a. Maybe a
Nothing,
            addressType :: peerAddr -> Maybe AddressType
addressType  = \peerAddr
_ -> AddressType -> Maybe AddressType
forall a. a -> Maybe a
Just AddressType
IPv4Address,
            Snocket m socket peerAddr
snocket :: Snocket m socket peerAddr
snocket :: Snocket m socket peerAddr
snocket,
            MakeBearer m socket
makeBearer :: MakeBearer m socket
makeBearer :: MakeBearer m socket
makeBearer,
            withBuffer :: (Maybe (ReadBuffer m) -> m ()) -> m ()
withBuffer = \Maybe (ReadBuffer m) -> m ()
f -> Maybe (ReadBuffer m) -> m ()
f Maybe (ReadBuffer m)
forall a. Maybe a
Nothing,
            configureSocket :: socket -> Maybe peerAddr -> m ()
configureSocket = \socket
sock Maybe peerAddr
_ -> socket -> m ()
confSock socket
sock,
            connectionDataFlow :: DataFlowProtocolData -> DataFlow
connectionDataFlow = \(DataFlowProtocolData DataFlow
df PeerSharing
_) -> DataFlow
df,
            timeWaitTimeout :: DiffTime
timeWaitTimeout = Timeouts -> DiffTime
tTimeWaitTimeout Timeouts
timeouts,
            outboundIdleTimeout :: DiffTime
outboundIdleTimeout = Timeouts -> DiffTime
tOutboundIdleTimeout Timeouts
timeouts,
            -- CM.connectionDataFlow = \(DataFlowProtocolData df _) -> df,
            prunePolicy :: PrunePolicy peerAddr
prunePolicy = PrunePolicy peerAddr
forall peerAddr. Ord peerAddr => PrunePolicy peerAddr
simplePrunePolicy,
            StdGen
stdGen :: StdGen
stdGen :: StdGen
stdGen,
            connectionsLimits :: AcceptedConnectionsLimit
connectionsLimits = AcceptedConnectionsLimit
acceptedConnLimit,
            updateVersionData :: DataFlowProtocolData -> DiffusionMode -> DataFlowProtocolData
updateVersionData = \DataFlowProtocolData
versionData DiffusionMode
diffusionMode ->
                                  DataFlowProtocolData
versionData { getProtocolDataFlow =
                                                  case diffusionMode of
                                                    DiffusionMode
InitiatorOnlyDiffusionMode         -> DataFlow
Unidirectional
                                                    DiffusionMode
InitiatorAndResponderDiffusionMode -> DataFlow
Duplex
                                              },
            ConnStateIdSupply m
connStateIdSupply :: ConnStateIdSupply m
connStateIdSupply :: ConnStateIdSupply m
connStateIdSupply,
            classifyHandlerError :: HandlerError UnversionedProtocol -> HandlerErrorType
classifyHandlerError = (\HandlerError UnversionedProtocol
_ -> HandlerErrorType
HandshakeFailure)
            }
            (InformationChannel
  (Event
     'InitiatorResponderMode
     (Handle
        'InitiatorResponderMode
        (ExpandedInitiatorContext peerAddr m)
        (ResponderContext peerAddr)
        DataFlowProtocolData
        ByteString
        m
        [resp]
        acc)
     (ExpandedInitiatorContext peerAddr m)
     peerAddr
     DataFlowProtocolData
     m
     [resp]
     acc)
  m
-> InResponderMode
     'InitiatorResponderMode
     (InformationChannel
        (Event
           'InitiatorResponderMode
           (Handle
              'InitiatorResponderMode
              (ExpandedInitiatorContext peerAddr m)
              (ResponderContext peerAddr)
              DataFlowProtocolData
              ByteString
              m
              [resp]
              acc)
           (ExpandedInitiatorContext peerAddr m)
           peerAddr
           DataFlowProtocolData
           m
           [resp]
           acc)
        m)
forall (mode :: Mode) a.
(HasResponder mode ~ 'True) =>
a -> InResponderMode mode a
InResponderMode InformationChannel
  (Event
     'InitiatorResponderMode
     (Handle
        'InitiatorResponderMode
        (ExpandedInitiatorContext peerAddr m)
        (ResponderContext peerAddr)
        DataFlowProtocolData
        ByteString
        m
        [resp]
        acc)
     (ExpandedInitiatorContext peerAddr m)
     peerAddr
     DataFlowProtocolData
     m
     [resp]
     acc)
  m
inbgovInfoChannel)
            MuxConnectionHandler
  'InitiatorResponderMode
  socket
  (ExpandedInitiatorContext peerAddr m)
  (ResponderContext peerAddr)
  peerAddr
  UnversionedProtocol
  DataFlowProtocolData
  ByteString
  m
  [resp]
  acc
connectionHandler
            ConnectionManagerWithExpandedCtx
  'InitiatorResponderMode
  socket
  peerAddr
  DataFlowProtocolData
  UnversionedProtocol
  ByteString
  m
  [resp]
  acc
-> m a
k'

    serverAddr <- Snocket.getLocalAddr snocket socket
    handle (\(SomeException
e :: SomeException) -> SomeException -> m a
forall e a. Exception e => e -> m a
forall (m :: * -> *) e a. (MonadThrow m, Exception e) => e -> m a
throwIO SomeException
e) $
      Server.with
        Server.Arguments {
          sockets = socket :| [],
          snocket = snocket,
          tracer =
            WithName name `contramap` nullTracer, -- ServerTrace
          connectionLimits = acceptedConnLimit,
          inboundGovernorArgs =
            InboundGovernor.Arguments {
              transitionTracer =
               WithName name `contramap` inboundTrTracer,
              tracer =
                WithName name `contramap` inboundTracer, -- InboundGovernorTrace
              debugTracer =
                WithName name `contramap` debugTracer,
              connectionDataFlow = \(DataFlowProtocolData DataFlow
df PeerSharing
_) -> DataFlow
df,
              infoChannel = inbgovInfoChannel,
              idleTimeout = Just (tProtocolIdleTimeout timeouts),
              withConnectionManager,
              mkConnectionHandler = mkConnectionHandler . MuxInitiatorResponderConnectionHandler (\(DataFlowProtocolData DataFlow
df PeerSharing
_) -> DataFlow
df)
              }
          }
          (\Async m Void
inboundGovernorAsync m (PublicState peerAddr DataFlowProtocolData)
_ ConnectionManagerWithExpandedCtx
  'InitiatorResponderMode
  socket
  peerAddr
  DataFlowProtocolData
  UnversionedProtocol
  ByteString
  m
  [resp]
  acc
connectionManager -> ConnectionManagerWithExpandedCtx
  'InitiatorResponderMode
  socket
  peerAddr
  DataFlowProtocolData
  UnversionedProtocol
  ByteString
  m
  [resp]
  acc
-> peerAddr -> Async m Void -> m a
k ConnectionManagerWithExpandedCtx
  'InitiatorResponderMode
  socket
  peerAddr
  DataFlowProtocolData
  UnversionedProtocol
  ByteString
  m
  [resp]
  acc
connectionManager peerAddr
serverAddr Async m Void
inboundGovernorAsync)
  where
    serverApplication :: TemperatureBundle
                          [MiniProtocol Mx.InitiatorResponderMode
                                        (ExpandedInitiatorContext peerAddr m)
                                        (ResponderContext peerAddr)
                                        ByteString m [resp] acc]
    serverApplication :: OuroborosBundle
  'InitiatorResponderMode
  (ExpandedInitiatorContext peerAddr m)
  (ResponderContext peerAddr)
  ByteString
  m
  [resp]
  acc
serverApplication = MiniProtocolNum
-> (ConnectionId peerAddr -> STM m [req])
-> [MiniProtocol
      'InitiatorResponderMode
      (ExpandedInitiatorContext peerAddr m)
      (ResponderContext peerAddr)
      ByteString
      m
      [resp]
      acc]
mkProto (MiniProtocolNum
 -> (ConnectionId peerAddr -> STM m [req])
 -> [MiniProtocol
       'InitiatorResponderMode
       (ExpandedInitiatorContext peerAddr m)
       (ResponderContext peerAddr)
       ByteString
       m
       [resp]
       acc])
-> TemperatureBundle MiniProtocolNum
-> TemperatureBundle
     ((ConnectionId peerAddr -> STM m [req])
      -> [MiniProtocol
            'InitiatorResponderMode
            (ExpandedInitiatorContext peerAddr m)
            (ResponderContext peerAddr)
            ByteString
            m
            [resp]
            acc])
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> (Word16 -> MiniProtocolNum
Mx.MiniProtocolNum (Word16 -> MiniProtocolNum)
-> TemperatureBundle Word16 -> TemperatureBundle MiniProtocolNum
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> TemperatureBundle Word16
nums) TemperatureBundle
  ((ConnectionId peerAddr -> STM m [req])
   -> [MiniProtocol
         'InitiatorResponderMode
         (ExpandedInitiatorContext peerAddr m)
         (ResponderContext peerAddr)
         ByteString
         m
         [resp]
         acc])
-> TemperatureBundle (ConnectionId peerAddr -> STM m [req])
-> OuroborosBundle
     'InitiatorResponderMode
     (ExpandedInitiatorContext peerAddr m)
     (ResponderContext peerAddr)
     ByteString
     m
     [resp]
     acc
forall a b.
TemperatureBundle (a -> b)
-> TemperatureBundle a -> TemperatureBundle b
forall (f :: * -> *) a b. Applicative f => f (a -> b) -> f a -> f b
<*> TemperatureBundle (ConnectionId peerAddr -> STM m [req])
nextRequests
      where nums :: TemperatureBundle Word16
nums = WithProtocolTemperature 'Hot Word16
-> WithProtocolTemperature 'Warm Word16
-> WithProtocolTemperature 'Established Word16
-> TemperatureBundle Word16
forall a.
WithProtocolTemperature 'Hot a
-> WithProtocolTemperature 'Warm a
-> WithProtocolTemperature 'Established a
-> TemperatureBundle a
TemperatureBundle (Word16 -> WithProtocolTemperature 'Hot Word16
forall a. a -> WithProtocolTemperature 'Hot a
WithHot Word16
1) (Word16 -> WithProtocolTemperature 'Warm Word16
forall a. a -> WithProtocolTemperature 'Warm a
WithWarm Word16
2) (Word16 -> WithProtocolTemperature 'Established Word16
forall a. a -> WithProtocolTemperature 'Established a
WithEstablished Word16
3)
            mkProto :: MiniProtocolNum
-> (ConnectionId peerAddr -> STM m [req])
-> [MiniProtocol
      'InitiatorResponderMode
      (ExpandedInitiatorContext peerAddr m)
      (ResponderContext peerAddr)
      ByteString
      m
      [resp]
      acc]
mkProto MiniProtocolNum
miniProtocolNum ConnectionId peerAddr -> STM m [req]
nextRequest =
              [MiniProtocol {
                  MiniProtocolNum
miniProtocolNum :: MiniProtocolNum
miniProtocolNum :: MiniProtocolNum
miniProtocolNum,
                  miniProtocolStart :: StartOnDemandOrEagerly
miniProtocolStart  = StartOnDemandOrEagerly
Mx.StartOnDemand,
                  miniProtocolLimits :: MiniProtocolLimits
miniProtocolLimits = Int -> MiniProtocolLimits
Mx.MiniProtocolLimits Int
forall a. Bounded a => a
maxBound,
                  miniProtocolRun :: RunMiniProtocol
  'InitiatorResponderMode
  (ExpandedInitiatorContext peerAddr m)
  (ResponderContext peerAddr)
  ByteString
  m
  [resp]
  acc
miniProtocolRun = MiniProtocolNum
-> acc
-> (ConnectionId peerAddr -> STM m [req])
-> RunMiniProtocol
     'InitiatorResponderMode
     (ExpandedInitiatorContext peerAddr m)
     (ResponderContext peerAddr)
     ByteString
     m
     [resp]
     acc
reqRespInitiatorAndResponder
                                        MiniProtocolNum
miniProtocolNum
                                        acc
accumulatorInit
                                        ConnectionId peerAddr -> STM m [req]
nextRequest
              }]

    reqRespInitiatorAndResponder
      :: Mx.MiniProtocolNum
      -> acc
      -> (ConnectionId peerAddr -> STM m [req])
      -> RunMiniProtocol Mx.InitiatorResponderMode
                         (ExpandedInitiatorContext peerAddr m)
                         (ResponderContext peerAddr)
                         ByteString m [resp] acc
    reqRespInitiatorAndResponder :: MiniProtocolNum
-> acc
-> (ConnectionId peerAddr -> STM m [req])
-> RunMiniProtocol
     'InitiatorResponderMode
     (ExpandedInitiatorContext peerAddr m)
     (ResponderContext peerAddr)
     ByteString
     m
     [resp]
     acc
reqRespInitiatorAndResponder MiniProtocolNum
protocolNum acc
accInit ConnectionId peerAddr -> STM m [req]
nextRequest =
      MiniProtocolCb
  (ExpandedInitiatorContext peerAddr m) ByteString m [resp]
-> MiniProtocolCb (ResponderContext peerAddr) ByteString m acc
-> RunMiniProtocol
     'InitiatorResponderMode
     (ExpandedInitiatorContext peerAddr m)
     (ResponderContext peerAddr)
     ByteString
     m
     [resp]
     acc
forall initiatorCtx bytes (m :: * -> *) a responderCtx b.
MiniProtocolCb initiatorCtx bytes m a
-> MiniProtocolCb responderCtx bytes m b
-> RunMiniProtocol
     'InitiatorResponderMode initiatorCtx responderCtx bytes m a b
InitiatorAndResponderProtocol
        ((ExpandedInitiatorContext peerAddr m
 -> Channel m ByteString -> m ([resp], Maybe ByteString))
-> MiniProtocolCb
     (ExpandedInitiatorContext peerAddr m) ByteString m [resp]
forall ctx bytes (m :: * -> *) a.
(ctx -> Channel m bytes -> m (a, Maybe bytes))
-> MiniProtocolCb ctx bytes m a
MiniProtocolCb ((ExpandedInitiatorContext peerAddr m
  -> Channel m ByteString -> m ([resp], Maybe ByteString))
 -> MiniProtocolCb
      (ExpandedInitiatorContext peerAddr m) ByteString m [resp])
-> (ExpandedInitiatorContext peerAddr m
    -> Channel m ByteString -> m ([resp], Maybe ByteString))
-> MiniProtocolCb
     (ExpandedInitiatorContext peerAddr m) ByteString m [resp]
forall a b. (a -> b) -> a -> b
$ \ExpandedInitiatorContext { eicConnectionId :: forall addr (m :: * -> *).
ExpandedInitiatorContext addr m -> ConnectionId addr
eicConnectionId = ConnectionId peerAddr
connId } Channel m ByteString
channel ->
           Tracer m (TraceSendRecv (ReqResp req resp))
-> Codec (ReqResp req resp) DeserialiseFailure m ByteString
-> ProtocolSizeLimits (ReqResp req resp) ByteString
-> ProtocolTimeLimits (ReqResp req resp)
-> Channel m ByteString
-> Peer (ReqResp req resp) 'AsClient 'NonPipelined 'StIdle m [resp]
-> m ([resp], Maybe ByteString)
forall ps (st :: ps) (pr :: PeerRole) failure bytes (m :: * -> *)
       a.
(MonadAsync m, MonadFork m, MonadMask m, MonadThrow (STM m),
 MonadTimer m, ShowProxy ps,
 forall (st' :: ps) stok. (stok ~ StateToken st') => Show stok,
 Show failure) =>
Tracer m (TraceSendRecv ps)
-> Codec ps failure m bytes
-> ProtocolSizeLimits ps bytes
-> ProtocolTimeLimits ps
-> Channel m bytes
-> Peer ps pr 'NonPipelined st m a
-> m (a, Maybe bytes)
runPeerWithLimits
             ((name, String, MiniProtocolNum)
-> TraceSendRecv (ReqResp req resp)
-> WithName
     (name, String, MiniProtocolNum) (TraceSendRecv (ReqResp req resp))
forall name event. name -> event -> WithName name event
WithName (name
name,String
"Initiator",MiniProtocolNum
protocolNum) (TraceSendRecv (ReqResp req resp)
 -> WithName
      (name, String, MiniProtocolNum) (TraceSendRecv (ReqResp req resp)))
-> Tracer
     m
     (WithName
        (name, String, MiniProtocolNum) (TraceSendRecv (ReqResp req resp)))
-> Tracer m (TraceSendRecv (ReqResp req resp))
forall a' a. (a' -> a) -> Tracer m a -> Tracer m a'
forall (f :: * -> *) a' a.
Contravariant f =>
(a' -> a) -> f a -> f a'
`contramap` Tracer
  m
  (WithName
     (name, String, MiniProtocolNum) (TraceSendRecv (ReqResp req resp)))
forall (m :: * -> *) a. Applicative m => Tracer m a
nullTracer)
             -- TraceSendRecv
             Codec (ReqResp req resp) DeserialiseFailure m ByteString
forall req resp (m :: * -> *).
(MonadST m, Serialise req, Serialise resp) =>
Codec (ReqResp req resp) DeserialiseFailure m ByteString
codecReqResp
             ProtocolSizeLimits (ReqResp req resp) ByteString
forall {k} {k1} (req :: k) (resp :: k1).
ProtocolSizeLimits (ReqResp req resp) ByteString
reqRespSizeLimits
             ProtocolTimeLimits (ReqResp req resp)
forall {k} {k1} (req :: k) (resp :: k1).
ProtocolTimeLimits (ReqResp req resp)
reqRespTimeLimits
             Channel m ByteString
channel
             (m (Peer
     (ReqResp req resp) 'AsClient 'NonPipelined 'StIdle m [resp])
-> Peer (ReqResp req resp) 'AsClient 'NonPipelined 'StIdle m [resp]
forall ps (pl :: IsPipelined) (st :: ps) (m :: * -> *) a.
m (Client ps pl st m a) -> Client ps pl st m a
Effect (m (Peer
      (ReqResp req resp) 'AsClient 'NonPipelined 'StIdle m [resp])
 -> Peer
      (ReqResp req resp) 'AsClient 'NonPipelined 'StIdle m [resp])
-> m (Peer
        (ReqResp req resp) 'AsClient 'NonPipelined 'StIdle m [resp])
-> Peer (ReqResp req resp) 'AsClient 'NonPipelined 'StIdle m [resp]
forall a b. (a -> b) -> a -> b
$ do
               reqs <- STM m [req] -> m [req]
forall a. HasCallStack => STM m a -> m a
forall (m :: * -> *) a.
(MonadSTM m, HasCallStack) =>
STM m a -> m a
atomically (ConnectionId peerAddr -> STM m [req]
nextRequest ConnectionId peerAddr
connId)
               pure $ reqRespClientPeer (reqRespClientMap reqs)))
        ((ResponderContext peerAddr
 -> Channel m ByteString -> m (acc, Maybe ByteString))
-> MiniProtocolCb (ResponderContext peerAddr) ByteString m acc
forall ctx bytes (m :: * -> *) a.
(ctx -> Channel m bytes -> m (a, Maybe bytes))
-> MiniProtocolCb ctx bytes m a
MiniProtocolCb ((ResponderContext peerAddr
  -> Channel m ByteString -> m (acc, Maybe ByteString))
 -> MiniProtocolCb (ResponderContext peerAddr) ByteString m acc)
-> (ResponderContext peerAddr
    -> Channel m ByteString -> m (acc, Maybe ByteString))
-> MiniProtocolCb (ResponderContext peerAddr) ByteString m acc
forall a b. (a -> b) -> a -> b
$ \ResponderContext peerAddr
_ctx Channel m ByteString
channel ->
           Tracer m (TraceSendRecv (ReqResp req resp))
-> Codec (ReqResp req resp) DeserialiseFailure m ByteString
-> ProtocolSizeLimits (ReqResp req resp) ByteString
-> ProtocolTimeLimits (ReqResp req resp)
-> Channel m ByteString
-> Peer (ReqResp req resp) 'AsServer 'NonPipelined 'StIdle m acc
-> m (acc, Maybe ByteString)
forall ps (st :: ps) (pr :: PeerRole) failure bytes (m :: * -> *)
       a.
(MonadAsync m, MonadFork m, MonadMask m, MonadThrow (STM m),
 MonadTimer m, ShowProxy ps,
 forall (st' :: ps) stok. (stok ~ StateToken st') => Show stok,
 Show failure) =>
Tracer m (TraceSendRecv ps)
-> Codec ps failure m bytes
-> ProtocolSizeLimits ps bytes
-> ProtocolTimeLimits ps
-> Channel m bytes
-> Peer ps pr 'NonPipelined st m a
-> m (a, Maybe bytes)
runPeerWithLimits
             ((name, String, MiniProtocolNum)
-> TraceSendRecv (ReqResp req resp)
-> WithName
     (name, String, MiniProtocolNum) (TraceSendRecv (ReqResp req resp))
forall name event. name -> event -> WithName name event
WithName (name
name,String
"Responder",MiniProtocolNum
protocolNum) (TraceSendRecv (ReqResp req resp)
 -> WithName
      (name, String, MiniProtocolNum) (TraceSendRecv (ReqResp req resp)))
-> Tracer
     m
     (WithName
        (name, String, MiniProtocolNum) (TraceSendRecv (ReqResp req resp)))
-> Tracer m (TraceSendRecv (ReqResp req resp))
forall a' a. (a' -> a) -> Tracer m a -> Tracer m a'
forall (f :: * -> *) a' a.
Contravariant f =>
(a' -> a) -> f a -> f a'
`contramap` Tracer
  m
  (WithName
     (name, String, MiniProtocolNum) (TraceSendRecv (ReqResp req resp)))
forall (m :: * -> *) a. Applicative m => Tracer m a
nullTracer)
             -- TraceSendRecv
             Codec (ReqResp req resp) DeserialiseFailure m ByteString
forall req resp (m :: * -> *).
(MonadST m, Serialise req, Serialise resp) =>
Codec (ReqResp req resp) DeserialiseFailure m ByteString
codecReqResp
             ProtocolSizeLimits (ReqResp req resp) ByteString
forall {k} {k1} (req :: k) (resp :: k1).
ProtocolSizeLimits (ReqResp req resp) ByteString
reqRespSizeLimits
             ProtocolTimeLimits (ReqResp req resp)
forall {k} {k1} (req :: k) (resp :: k1).
ProtocolTimeLimits (ReqResp req resp)
reqRespTimeLimits
             Channel m ByteString
channel
             (ReqRespServer req resp m acc
-> Peer (ReqResp req resp) 'AsServer 'NonPipelined 'StIdle m acc
forall (m :: * -> *) req resp a.
Monad m =>
ReqRespServer req resp m a
-> Server (ReqResp req resp) 'NonPipelined 'StIdle m a
reqRespServerPeer (ReqRespServer req resp m acc
 -> Peer (ReqResp req resp) 'AsServer 'NonPipelined 'StIdle m acc)
-> ReqRespServer req resp m acc
-> Peer (ReqResp req resp) 'AsServer 'NonPipelined 'StIdle m acc
forall a b. (a -> b) -> a -> b
$ acc -> ReqRespServer req resp m acc
reqRespServerMapAccumL' acc
accInit))

    reqRespServerMapAccumL' :: acc -> ReqRespServer req resp m acc
    reqRespServerMapAccumL' :: acc -> ReqRespServer req resp m acc
reqRespServerMapAccumL' = acc -> ReqRespServer req resp m acc
[req] -> ReqRespServer req [req] m [req]
forall {m :: * -> *} {a}.
Monad m =>
[a] -> ReqRespServer a [a] m [a]
go
      where
        fn :: [a] -> a -> ([a], [a])
fn [a]
acc a
x = (a
x a -> [a] -> [a]
forall a. a -> [a] -> [a]
: [a]
acc, a
x a -> [a] -> [a]
forall a. a -> [a] -> [a]
: [a]
acc)
        go :: [a] -> ReqRespServer a [a] m [a]
go [a]
acc =
          ReqRespServer {
              recvMsgReq :: a -> m ([a], ReqRespServer a [a] m [a])
recvMsgReq = \a
req ->
                  let ([a]
acc', [a]
resp) = [a] -> a -> ([a], [a])
forall {a}. [a] -> a -> ([a], [a])
fn [a]
acc a
req
                  in ([a], ReqRespServer a [a] m [a])
-> m ([a], ReqRespServer a [a] m [a])
forall a. a -> m a
forall (m :: * -> *) a. Monad m => a -> m a
return ([a]
resp, [a] -> ReqRespServer a [a] m [a]
go [a]
acc'),
              recvMsgDone :: m [a]
recvMsgDone = [a] -> m [a]
forall a. a -> m a
forall (m :: * -> *) a. Monad m => a -> m a
return [a]
acc
            }


reqRespSizeLimits :: forall req resp. ProtocolSizeLimits (ReqResp req resp)
                                                         ByteString
reqRespSizeLimits :: forall {k} {k1} (req :: k) (resp :: k1).
ProtocolSizeLimits (ReqResp req resp) ByteString
reqRespSizeLimits = ProtocolSizeLimits
    { StateToken st -> Word
forall (st :: ReqResp req resp).
ActiveState st =>
StateToken st -> Word
forall (st :: ReqResp req resp). StateToken st -> Word
sizeLimitForState :: forall (st :: ReqResp req resp). StateToken st -> Word
sizeLimitForState :: forall (st :: ReqResp req resp).
ActiveState st =>
StateToken st -> Word
sizeLimitForState
    , dataSize :: ByteString -> Word
dataSize = Int64 -> Word
forall a b. (Integral a, Num b) => a -> b
fromIntegral (Int64 -> Word) -> (ByteString -> Int64) -> ByteString -> Word
forall b c a. (b -> c) -> (a -> b) -> a -> c
. ByteString -> Int64
LBS.length
    }
  where
    sizeLimitForState :: forall (st :: ReqResp req resp).
                         StateToken st -> Word
    sizeLimitForState :: forall (st :: ReqResp req resp). StateToken st -> Word
sizeLimitForState StateToken st
_ = Word
forall a. Bounded a => a
maxBound

reqRespTimeLimits :: forall req resp. ProtocolTimeLimits (ReqResp req resp)
reqRespTimeLimits :: forall {k} {k1} (req :: k) (resp :: k1).
ProtocolTimeLimits (ReqResp req resp)
reqRespTimeLimits = ProtocolTimeLimits { StateToken st -> Maybe DiffTime
forall (st :: ReqResp req resp).
ActiveState st =>
StateToken st -> Maybe DiffTime
timeLimitForState :: forall (st :: ReqResp req resp).
ActiveState st =>
StateToken st -> Maybe DiffTime
timeLimitForState :: forall (st :: ReqResp req resp).
ActiveState st =>
StateToken st -> Maybe DiffTime
timeLimitForState }
  where
    timeLimitForState :: forall (st :: ReqResp req resp).
                         ActiveState st
                      => StateToken st -> Maybe DiffTime
    timeLimitForState :: forall (st :: ReqResp req resp).
ActiveState st =>
StateToken st -> Maybe DiffTime
timeLimitForState StateToken st
SReqResp st
SingIdle           = Maybe DiffTime
forall a. Maybe a
Nothing
    timeLimitForState StateToken st
SReqResp st
SingBusy           = DiffTime -> Maybe DiffTime
forall a. a -> Maybe a
Just DiffTime
60
    timeLimitForState a :: StateToken st
a@StateToken st
SReqResp st
ReqResp.SingDone = StateToken 'StDone -> forall a. a
forall ps (st :: ps).
(StateAgency st ~ 'NobodyAgency, ActiveState st) =>
StateToken st -> forall a. a
notActiveState StateToken st
StateToken 'StDone
a



-- | Run all initiator mini-protocols and collect results. Throw exception if
-- any of the thread returned an exception.
--
-- This function assumes that there's one established, one warm and one hot
-- mini-protocol, which is compatible with both
--
-- * 'withInitiatorOnlyConnectionManager', and
-- * 'withBidirectionalConnectionManager'.
--
runInitiatorProtocols
    :: forall muxMode addr m a b.
       ( Alternative (STM m)
       , MonadAsync       m
       , MonadCatch       m
       , MonadLabelledSTM m
       , MonadMask        m
       , MonadSTM         m
       , MonadThrow  (STM m)
       , HasInitiator muxMode ~ True
       , MonadSay         m
       )
    => SingMuxMode muxMode
    -> Mx.Mux muxMode m
    -> OuroborosBundle muxMode (ExpandedInitiatorContext addr m)
                               (ResponderContext addr)
                               ByteString m a b
    -> TemperatureBundle (StrictTVar m ControlMessage)
    -> ConnectionId addr
    -> m (TemperatureBundle a)
runInitiatorProtocols :: forall (muxMode :: Mode) addr (m :: * -> *) a b.
(Alternative (STM m), MonadAsync m, MonadCatch m,
 MonadLabelledSTM m, MonadMask m, MonadSTM m, MonadThrow (STM m),
 HasInitiator muxMode ~ 'True, MonadSay m) =>
SingMuxMode muxMode
-> Mux muxMode m
-> OuroborosBundle
     muxMode
     (ExpandedInitiatorContext addr m)
     (ResponderContext addr)
     ByteString
     m
     a
     b
-> TemperatureBundle (StrictTVar m ControlMessage)
-> ConnectionId addr
-> m (TemperatureBundle a)
runInitiatorProtocols SingMuxMode muxMode
singMuxMode Mux muxMode m
mux OuroborosBundle
  muxMode
  (ExpandedInitiatorContext addr m)
  (ResponderContext addr)
  ByteString
  m
  a
  b
bundle TemperatureBundle (StrictTVar m ControlMessage)
controlBundle ConnectionId addr
connId = do
    -- start all mini-protocols
    bundle' <- ((MiniProtocolWithExpandedCtx muxMode addr ByteString m a b,
  STM m ControlMessage)
 -> m (STM m (Either SomeException a)))
-> TemperatureBundle
     (MiniProtocolWithExpandedCtx muxMode addr ByteString m a b,
      STM m ControlMessage)
-> m (TemperatureBundle (STM m (Either SomeException a)))
forall (t :: * -> *) (f :: * -> *) a b.
(Traversable t, Applicative f) =>
(a -> f b) -> t a -> f (t b)
forall (f :: * -> *) a b.
Applicative f =>
(a -> f b) -> TemperatureBundle a -> f (TemperatureBundle b)
traverse ((MiniProtocolWithExpandedCtx muxMode addr ByteString m a b
 -> STM m ControlMessage -> m (STM m (Either SomeException a)))
-> (MiniProtocolWithExpandedCtx muxMode addr ByteString m a b,
    STM m ControlMessage)
-> m (STM m (Either SomeException a))
forall a b c. (a -> b -> c) -> (a, b) -> c
uncurry MiniProtocolWithExpandedCtx muxMode addr ByteString m a b
-> STM m ControlMessage -> m (STM m (Either SomeException a))
runInitiator) ((,) (MiniProtocolWithExpandedCtx muxMode addr ByteString m a b
 -> STM m ControlMessage
 -> (MiniProtocolWithExpandedCtx muxMode addr ByteString m a b,
     STM m ControlMessage))
-> TemperatureBundle
     (MiniProtocolWithExpandedCtx muxMode addr ByteString m a b)
-> TemperatureBundle
     (STM m ControlMessage
      -> (MiniProtocolWithExpandedCtx muxMode addr ByteString m a b,
          STM m ControlMessage))
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> ([MiniProtocolWithExpandedCtx muxMode addr ByteString m a b]
-> MiniProtocolWithExpandedCtx muxMode addr ByteString m a b
forall a. HasCallStack => [a] -> a
head ([MiniProtocolWithExpandedCtx muxMode addr ByteString m a b]
 -> MiniProtocolWithExpandedCtx muxMode addr ByteString m a b)
-> OuroborosBundle
     muxMode
     (ExpandedInitiatorContext addr m)
     (ResponderContext addr)
     ByteString
     m
     a
     b
-> TemperatureBundle
     (MiniProtocolWithExpandedCtx muxMode addr ByteString m a b)
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> OuroborosBundle
  muxMode
  (ExpandedInitiatorContext addr m)
  (ResponderContext addr)
  ByteString
  m
  a
  b
bundle)
                                                    TemperatureBundle
  (STM m ControlMessage
   -> (MiniProtocolWithExpandedCtx muxMode addr ByteString m a b,
       STM m ControlMessage))
-> TemperatureBundle (STM m ControlMessage)
-> TemperatureBundle
     (MiniProtocolWithExpandedCtx muxMode addr ByteString m a b,
      STM m ControlMessage)
forall a b.
TemperatureBundle (a -> b)
-> TemperatureBundle a -> TemperatureBundle b
forall (f :: * -> *) a b. Applicative f => f (a -> b) -> f a -> f b
<*> (StrictTVar m ControlMessage -> STM m ControlMessage
forall (m :: * -> *) a. MonadSTM m => StrictTVar m a -> STM m a
readTVar (StrictTVar m ControlMessage -> STM m ControlMessage)
-> TemperatureBundle (StrictTVar m ControlMessage)
-> TemperatureBundle (STM m ControlMessage)
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> TemperatureBundle (StrictTVar m ControlMessage)
controlBundle))
    -- await for their termination
    traverse (atomically >=> either throwIO return)
             bundle'
  where
    runInitiator :: MiniProtocolWithExpandedCtx muxMode addr ByteString m a b
                 -> ControlMessageSTM m
                 -> m (STM m (Either SomeException a))
    runInitiator :: MiniProtocolWithExpandedCtx muxMode addr ByteString m a b
-> STM m ControlMessage -> m (STM m (Either SomeException a))
runInitiator MiniProtocolWithExpandedCtx muxMode addr ByteString m a b
ptcl STM m ControlMessage
controlMessage =
        Mux muxMode m
-> MiniProtocolNum
-> MiniProtocolDirection muxMode
-> StartOnDemandOrEagerly
-> (ByteChannel m -> m (a, Maybe ByteString))
-> m (STM m (Either SomeException a))
forall (mode :: Mode) (m :: * -> *) a.
(Alternative (STM m), MonadSTM m, MonadThrow m,
 MonadThrow (STM m)) =>
Mux mode m
-> MiniProtocolNum
-> MiniProtocolDirection mode
-> StartOnDemandOrEagerly
-> (ByteChannel m -> m (a, Maybe ByteString))
-> m (STM m (Either SomeException a))
Mx.runMiniProtocol
          Mux muxMode m
mux
          (MiniProtocolWithExpandedCtx muxMode addr ByteString m a b
-> MiniProtocolNum
forall (mode :: Mode) initiatorCtx responderCtx bytes (m :: * -> *)
       a b.
MiniProtocol mode initiatorCtx responderCtx bytes m a b
-> MiniProtocolNum
miniProtocolNum MiniProtocolWithExpandedCtx muxMode addr ByteString m a b
ptcl)
          (case SingMuxMode muxMode
singMuxMode of
            SingMuxMode muxMode
SingInitiatorMode          -> MiniProtocolDirection muxMode
MiniProtocolDirection 'InitiatorMode
Mx.InitiatorDirectionOnly
            SingMuxMode muxMode
SingInitiatorResponderMode -> MiniProtocolDirection muxMode
MiniProtocolDirection 'InitiatorResponderMode
Mx.InitiatorDirection)
          StartOnDemandOrEagerly
Mx.StartEagerly
          (MiniProtocolCb (ExpandedInitiatorContext addr m) ByteString m a
-> ExpandedInitiatorContext addr m
-> ByteChannel m
-> m (a, Maybe ByteString)
forall ctx bytes (m :: * -> *) a.
MiniProtocolCb ctx bytes m a
-> ctx -> Channel m bytes -> m (a, Maybe bytes)
runMiniProtocolCb
            (case MiniProtocolWithExpandedCtx muxMode addr ByteString m a b
-> RunMiniProtocol
     muxMode
     (ExpandedInitiatorContext addr m)
     (ResponderContext addr)
     ByteString
     m
     a
     b
forall (mode :: Mode) initiatorCtx responderCtx bytes (m :: * -> *)
       a b.
MiniProtocol mode initiatorCtx responderCtx bytes m a b
-> RunMiniProtocol mode initiatorCtx responderCtx bytes m a b
miniProtocolRun MiniProtocolWithExpandedCtx muxMode addr ByteString m a b
ptcl of
              InitiatorProtocolOnly MiniProtocolCb (ExpandedInitiatorContext addr m) ByteString m a
initiator           -> MiniProtocolCb (ExpandedInitiatorContext addr m) ByteString m a
initiator
              InitiatorAndResponderProtocol MiniProtocolCb (ExpandedInitiatorContext addr m) ByteString m a
initiator MiniProtocolCb (ResponderContext addr) ByteString m b
_ -> MiniProtocolCb (ExpandedInitiatorContext addr m) ByteString m a
initiator)
            ExpandedInitiatorContext addr m
initiatorCtx)
      where
        initiatorCtx :: ExpandedInitiatorContext addr m
initiatorCtx = ExpandedInitiatorContext {
            eicConnectionId :: ConnectionId addr
eicConnectionId    = ConnectionId addr
connId,
            eicControlMessage :: STM m ControlMessage
eicControlMessage  = STM m ControlMessage
controlMessage,
            eicIsBigLedgerPeer :: IsBigLedgerPeer
eicIsBigLedgerPeer = IsBigLedgerPeer
IsNotBigLedgerPeer
          }

--
-- Experiments \/ Demos & Properties
--

-- | Max bound AcceptedConnectionsLimit
maxAcceptedConnectionsLimit :: AcceptedConnectionsLimit
maxAcceptedConnectionsLimit :: AcceptedConnectionsLimit
maxAcceptedConnectionsLimit = Word32 -> Word32 -> DiffTime -> AcceptedConnectionsLimit
AcceptedConnectionsLimit Word32
forall a. Bounded a => a
maxBound Word32
forall a. Bounded a => a
maxBound DiffTime
0


-- | This test runs an initiator only connection manager (client side) and bidirectional
-- connection manager (which runs a server).  The client connect to the
-- server and runs protocols to completion.
--
-- There is a good reason why we don't run two bidirectional connection managers;
-- If we would do that, when the either side terminates the connection the
-- client side server would through an exception as it is listening.
--
unidirectionalExperiment
    :: forall peerAddr socket acc req resp m.
       ( ConnectionManagerMonad m
       , MonadAsync m
       , MonadDelay m
       , MonadFix m
       , MonadLabelledSTM m
       , MonadTraceSTM m
       , MonadSay m

       , acc ~ [req], resp ~ [req]
       , Ord peerAddr, Show peerAddr, Typeable peerAddr, Eq peerAddr
       , Hashable peerAddr
       , Serialise req, Show req
       , Serialise resp, Show resp, Eq resp
       , Typeable req, Typeable resp
       )
    => StdGen
    -> Timeouts
    -> Snocket m socket peerAddr
    -> Mx.MakeBearer m socket
    -> (socket -> m ())
    -> socket
    -> ClientAndServerData req
    -> m Property
unidirectionalExperiment :: forall peerAddr socket acc req resp (m :: * -> *).
(ConnectionManagerMonad m, MonadAsync m, MonadDelay m, MonadFix m,
 MonadLabelledSTM m, MonadTraceSTM m, MonadSay m, acc ~ [req],
 resp ~ [req], Ord peerAddr, Show peerAddr, Typeable peerAddr,
 Eq peerAddr, Hashable peerAddr, Serialise req, Show req,
 Serialise resp, Show resp, Eq resp, Typeable req, Typeable resp) =>
StdGen
-> Timeouts
-> Snocket m socket peerAddr
-> MakeBearer m socket
-> (socket -> m ())
-> socket
-> ClientAndServerData req
-> m Property
unidirectionalExperiment StdGen
stdGen Timeouts
timeouts Snocket m socket peerAddr
snocket MakeBearer m socket
makeBearer socket -> m ()
confSock socket
socket ClientAndServerData req
clientAndServerData = do
    let (StdGen
stdGen', StdGen
stdGen'') = StdGen -> (StdGen, StdGen)
forall g. RandomGen g => g -> (g, g)
Random.split StdGen
stdGen
    nextReqs <- ClientAndServerData req
-> m (TemperatureBundle (ConnectionId peerAddr -> STM m [req]))
forall req peerAddr (m :: * -> *).
MonadSTM m =>
ClientAndServerData req
-> m (TemperatureBundle (ConnectionId peerAddr -> STM m [req]))
oneshotNextRequests ClientAndServerData req
clientAndServerData
    connStateIdSupply <- atomically $ CM.newConnStateIdSupply (Proxy @m)
    withInitiatorOnlyConnectionManager
      "client" timeouts nullTracer nullTracer stdGen' snocket makeBearer connStateIdSupply Nothing nextReqs
      timeLimitsHandshake maxAcceptedConnectionsLimit
      $ \ConnectionManagerWithExpandedCtx
  'InitiatorMode
  socket
  peerAddr
  DataFlowProtocolData
  UnversionedProtocol
  ByteString
  m
  [resp]
  Void
connectionManager ->
        String
-> Timeouts
-> Tracer m (WithName String (RemoteTransitionTrace peerAddr))
-> Tracer m (WithName String (AbstractTransitionTrace ConnStateId))
-> Tracer
     m
     (WithName
        String
        (Trace
           peerAddr
           (ConnectionHandlerTrace UnversionedProtocol DataFlowProtocolData)))
-> Tracer m (WithName String (Trace peerAddr))
-> Tracers' m (WithNameAndBearer String peerAddr)
-> Tracer m (WithName String (Debug peerAddr DataFlowProtocolData))
-> StdGen
-> Snocket m socket peerAddr
-> MakeBearer m socket
-> ConnStateIdSupply m
-> (socket -> m ())
-> socket
-> Maybe peerAddr
-> [req]
-> TemperatureBundle (ConnectionId peerAddr -> STM m [req])
-> ProtocolTimeLimits (Handshake UnversionedProtocol Term)
-> AcceptedConnectionsLimit
-> (ConnectionManager
      'InitiatorResponderMode
      socket
      peerAddr
      (Handle
         'InitiatorResponderMode
         (ExpandedInitiatorContext peerAddr m)
         (ResponderContext peerAddr)
         DataFlowProtocolData
         ByteString
         m
         [[req]]
         [req])
      (HandlerError UnversionedProtocol)
      m
    -> peerAddr -> Async m Void -> m Property)
-> m Property
forall name peerAddr socket acc req resp (m :: * -> *) a.
(ConnectionManagerMonad m, acc ~ [req], resp ~ [req], Ord peerAddr,
 Show peerAddr, Typeable peerAddr, Serialise req, Typeable req,
 MonadAsync m, MonadDelay m, MonadFix m, MonadLabelledSTM m,
 MonadTraceSTM m, MonadSay m, Show req, Show name) =>
name
-> Timeouts
-> Tracer m (WithName name (RemoteTransitionTrace peerAddr))
-> Tracer m (WithName name (AbstractTransitionTrace ConnStateId))
-> Tracer
     m
     (WithName
        name
        (Trace
           peerAddr
           (ConnectionHandlerTrace UnversionedProtocol DataFlowProtocolData)))
-> Tracer m (WithName name (Trace peerAddr))
-> Tracers' m (WithNameAndBearer name peerAddr)
-> Tracer m (WithName name (Debug peerAddr DataFlowProtocolData))
-> StdGen
-> Snocket m socket peerAddr
-> MakeBearer m socket
-> ConnStateIdSupply m
-> (socket -> m ())
-> socket
-> Maybe peerAddr
-> acc
-> TemperatureBundle (ConnectionId peerAddr -> STM m [req])
-> ProtocolTimeLimits (Handshake UnversionedProtocol Term)
-> AcceptedConnectionsLimit
-> (ConnectionManagerWithExpandedCtx
      'InitiatorResponderMode
      socket
      peerAddr
      DataFlowProtocolData
      UnversionedProtocol
      ByteString
      m
      [resp]
      acc
    -> peerAddr -> Async m Void -> m a)
-> m a
withBidirectionalConnectionManager String
"server" Timeouts
timeouts
                                           Tracer m (WithName String (RemoteTransitionTrace peerAddr))
forall (m :: * -> *) a. Applicative m => Tracer m a
nullTracer Tracer m (WithName String (AbstractTransitionTrace ConnStateId))
forall (m :: * -> *) a. Applicative m => Tracer m a
nullTracer Tracer
  m
  (WithName
     String
     (Trace
        peerAddr
        (ConnectionHandlerTrace UnversionedProtocol DataFlowProtocolData)))
forall (m :: * -> *) a. Applicative m => Tracer m a
nullTracer
                                           Tracer m (WithName String (Trace peerAddr))
forall (m :: * -> *) a. Applicative m => Tracer m a
nullTracer Tracers' m (WithNameAndBearer String peerAddr)
forall (m :: * -> *) (f :: * -> *). Applicative m => Tracers' m f
Mx.nullTracers Tracer m (WithName String (Debug peerAddr DataFlowProtocolData))
forall (m :: * -> *) a. Applicative m => Tracer m a
nullTracer
                                           StdGen
stdGen''
                                           Snocket m socket peerAddr
snocket MakeBearer m socket
makeBearer ConnStateIdSupply m
connStateIdSupply
                                           socket -> m ()
confSock socket
socket Maybe peerAddr
forall a. Maybe a
Nothing
                                           [ClientAndServerData req -> req
forall req. ClientAndServerData req -> req
accumulatorInit ClientAndServerData req
clientAndServerData]
                                           TemperatureBundle (ConnectionId peerAddr -> STM m [req])
forall (stm :: * -> *) req peerAddr.
Applicative stm =>
TemperatureBundle (ConnectionId peerAddr -> stm [req])
noNextRequests
                                           ProtocolTimeLimits (Handshake UnversionedProtocol Term)
forall {k} (vNumber :: k).
ProtocolTimeLimits (Handshake vNumber Term)
timeLimitsHandshake
                                           AcceptedConnectionsLimit
maxAcceptedConnectionsLimit
          ((ConnectionManager
    'InitiatorResponderMode
    socket
    peerAddr
    (Handle
       'InitiatorResponderMode
       (ExpandedInitiatorContext peerAddr m)
       (ResponderContext peerAddr)
       DataFlowProtocolData
       ByteString
       m
       [[req]]
       [req])
    (HandlerError UnversionedProtocol)
    m
  -> peerAddr -> Async m Void -> m Property)
 -> m Property)
-> (ConnectionManager
      'InitiatorResponderMode
      socket
      peerAddr
      (Handle
         'InitiatorResponderMode
         (ExpandedInitiatorContext peerAddr m)
         (ResponderContext peerAddr)
         DataFlowProtocolData
         ByteString
         m
         [[req]]
         [req])
      (HandlerError UnversionedProtocol)
      m
    -> peerAddr -> Async m Void -> m Property)
-> m Property
forall a b. (a -> b) -> a -> b
$ \ConnectionManager
  'InitiatorResponderMode
  socket
  peerAddr
  (Handle
     'InitiatorResponderMode
     (ExpandedInitiatorContext peerAddr m)
     (ResponderContext peerAddr)
     DataFlowProtocolData
     ByteString
     m
     [[req]]
     [req])
  (HandlerError UnversionedProtocol)
  m
_ peerAddr
serverAddr Async m Void
_serverAsync -> do
            -- client → server: connect
            (rs :: [Either SomeException (TemperatureBundle [resp])]) <-
                Int
-> m (Either SomeException (TemperatureBundle [resp]))
-> m [Either SomeException (TemperatureBundle [resp])]
forall (m :: * -> *) a. Applicative m => Int -> m a -> m [a]
replicateM
                  (ClientAndServerData req -> Int
forall req. ClientAndServerData req -> Int
numberOfRounds ClientAndServerData req
clientAndServerData)
                  (m (Connected
     peerAddr
     (HandleWithExpandedCtx
        'InitiatorMode
        peerAddr
        DataFlowProtocolData
        ByteString
        m
        [resp]
        Void)
     (HandlerError UnversionedProtocol))
-> (Connected
      peerAddr
      (HandleWithExpandedCtx
         'InitiatorMode
         peerAddr
         DataFlowProtocolData
         ByteString
         m
         [resp]
         Void)
      (HandlerError UnversionedProtocol)
    -> m (OperationResult AbstractState))
-> (Connected
      peerAddr
      (HandleWithExpandedCtx
         'InitiatorMode
         peerAddr
         DataFlowProtocolData
         ByteString
         m
         [resp]
         Void)
      (HandlerError UnversionedProtocol)
    -> m (Either SomeException (TemperatureBundle [resp])))
-> m (Either SomeException (TemperatureBundle [resp]))
forall a b c. m a -> (a -> m b) -> (a -> m c) -> m c
forall (m :: * -> *) a b c.
MonadThrow m =>
m a -> (a -> m b) -> (a -> m c) -> m c
bracket
                     (ConnectionManagerWithExpandedCtx
  'InitiatorMode
  socket
  peerAddr
  DataFlowProtocolData
  UnversionedProtocol
  ByteString
  m
  [resp]
  Void
-> AcquireOutboundConnection
     peerAddr
     (HandleWithExpandedCtx
        'InitiatorMode
        peerAddr
        DataFlowProtocolData
        ByteString
        m
        [resp]
        Void)
     (HandlerError UnversionedProtocol)
     m
forall (muxMode :: Mode) socket peerAddr handle handleError
       (m :: * -> *).
(HasInitiator muxMode ~ 'True) =>
ConnectionManager muxMode socket peerAddr handle handleError m
-> AcquireOutboundConnection peerAddr handle handleError m
acquireOutboundConnection ConnectionManagerWithExpandedCtx
  'InitiatorMode
  socket
  peerAddr
  DataFlowProtocolData
  UnversionedProtocol
  ByteString
  m
  [resp]
  Void
connectionManager DiffusionMode
InitiatorOnlyDiffusionMode peerAddr
serverAddr)
                     (\case
                        Connected ConnectionId peerAddr
connId DataFlow
_ HandleWithExpandedCtx
  'InitiatorMode
  peerAddr
  DataFlowProtocolData
  ByteString
  m
  [resp]
  Void
_ -> ConnectionManagerWithExpandedCtx
  'InitiatorMode
  socket
  peerAddr
  DataFlowProtocolData
  UnversionedProtocol
  ByteString
  m
  [resp]
  Void
-> ConnectionId peerAddr -> m (OperationResult AbstractState)
forall (muxMode :: Mode) socket peerAddr handle handleError
       (m :: * -> *).
(HasInitiator muxMode ~ 'True) =>
ConnectionManager muxMode socket peerAddr handle handleError m
-> ConnectionId peerAddr -> m (OperationResult AbstractState)
releaseOutboundConnection ConnectionManagerWithExpandedCtx
  'InitiatorMode
  socket
  peerAddr
  DataFlowProtocolData
  UnversionedProtocol
  ByteString
  m
  [resp]
  Void
connectionManager ConnectionId peerAddr
connId
                        Disconnected {} -> String -> m (OperationResult AbstractState)
forall a. HasCallStack => String -> a
error String
"unidirectionalExperiment: impossible happened")
                     (\Connected
  peerAddr
  (HandleWithExpandedCtx
     'InitiatorMode
     peerAddr
     DataFlowProtocolData
     ByteString
     m
     [resp]
     Void)
  (HandlerError UnversionedProtocol)
connHandle -> do
                      case Connected
  peerAddr
  (HandleWithExpandedCtx
     'InitiatorMode
     peerAddr
     DataFlowProtocolData
     ByteString
     m
     [resp]
     Void)
  (HandlerError UnversionedProtocol)
connHandle of
                        Connected ConnectionId peerAddr
connId DataFlow
_ (Handle Mux 'InitiatorMode m
mux OuroborosBundle
  'InitiatorMode
  (ExpandedInitiatorContext peerAddr m)
  (ResponderContext peerAddr)
  ByteString
  m
  [resp]
  Void
muxBundle TemperatureBundle (StrictTVar m ControlMessage)
controlBundle DataFlowProtocolData
_
                                        :: HandleWithExpandedCtx Mx.InitiatorMode peerAddr
                                              DataFlowProtocolData ByteString m [resp] Void) ->
                          forall (m :: * -> *) e a.
(MonadCatch m, Exception e) =>
m a -> m (Either e a)
try @_ @SomeException (m (TemperatureBundle [resp])
 -> m (Either SomeException (TemperatureBundle [resp])))
-> m (TemperatureBundle [resp])
-> m (Either SomeException (TemperatureBundle [resp]))
forall a b. (a -> b) -> a -> b
$
                            (SingMuxMode 'InitiatorMode
-> Mux 'InitiatorMode m
-> OuroborosBundle
     'InitiatorMode
     (ExpandedInitiatorContext peerAddr m)
     (ResponderContext peerAddr)
     ByteString
     m
     [resp]
     Void
-> TemperatureBundle (StrictTVar m ControlMessage)
-> ConnectionId peerAddr
-> m (TemperatureBundle [resp])
forall (muxMode :: Mode) addr (m :: * -> *) a b.
(Alternative (STM m), MonadAsync m, MonadCatch m,
 MonadLabelledSTM m, MonadMask m, MonadSTM m, MonadThrow (STM m),
 HasInitiator muxMode ~ 'True, MonadSay m) =>
SingMuxMode muxMode
-> Mux muxMode m
-> OuroborosBundle
     muxMode
     (ExpandedInitiatorContext addr m)
     (ResponderContext addr)
     ByteString
     m
     a
     b
-> TemperatureBundle (StrictTVar m ControlMessage)
-> ConnectionId addr
-> m (TemperatureBundle a)
runInitiatorProtocols
                              SingMuxMode 'InitiatorMode
SingInitiatorMode Mux 'InitiatorMode m
mux OuroborosBundle
  'InitiatorMode
  (ExpandedInitiatorContext peerAddr m)
  (ResponderContext peerAddr)
  ByteString
  m
  [resp]
  Void
muxBundle TemperatureBundle (StrictTVar m ControlMessage)
controlBundle ConnectionId peerAddr
connId
                              :: m (TemperatureBundle [resp])
                            )
                        Disconnected ConnectionId peerAddr
_ DisconnectionException (HandlerError UnversionedProtocol)
err ->
                          IOError -> m (Either SomeException (TemperatureBundle [resp]))
forall e a. Exception e => e -> m a
forall (m :: * -> *) e a. (MonadThrow m, Exception e) => e -> m a
throwIO (String -> IOError
userError (String -> IOError) -> String -> IOError
forall a b. (a -> b) -> a -> b
$ String
"unidirectionalExperiment: " String -> ShowS
forall a. [a] -> [a] -> [a]
++ DisconnectionException (HandlerError UnversionedProtocol) -> String
forall a. Show a => a -> String
show DisconnectionException (HandlerError UnversionedProtocol)
err))
                  )
            pure $
              foldr
                (\ (Either SomeException (TemperatureBundle [resp])
r, TemperatureBundle [resp]
expected) Property
acc ->
                  case Either SomeException (TemperatureBundle [resp])
r of
                    Left SomeException
err -> String -> Bool -> Property
forall prop. Testable prop => String -> prop -> Property
counterexample (SomeException -> String
forall a. Show a => a -> String
show SomeException
err) Bool
False
                    Right TemperatureBundle [resp]
a  -> TemperatureBundle [resp]
a TemperatureBundle [resp] -> TemperatureBundle [resp] -> Property
forall a. (Eq a, Show a) => a -> a -> Property
=== TemperatureBundle [resp]
expected Property -> Property -> Property
forall prop1 prop2.
(Testable prop1, Testable prop2) =>
prop1 -> prop2 -> Property
.&&. Property
acc)
                (property True)
                $ zip rs (expectedResult clientAndServerData clientAndServerData)


-- | Bidirectional send and receive.
--
bidirectionalExperiment
    :: forall peerAddr socket acc req resp m.
       ( ConnectionManagerMonad m
       , MonadAsync m
       , MonadDelay m
       , MonadFix m
       , MonadLabelledSTM m
       , MonadTraceSTM m
       , MonadSay m

       , acc ~ [req], resp ~ [req]
       , Ord peerAddr, Show peerAddr, Typeable peerAddr, Eq peerAddr
       , Hashable peerAddr

       , Serialise req, Show req
       , Serialise resp, Show resp, Eq resp
       , Typeable req, Typeable resp
       , Show acc
       )
    => Bool
    -> StdGen
    -> Timeouts
    -> Snocket m socket peerAddr
    -> Mx.MakeBearer m socket
    -> (socket -> m ()) -- ^ configure socket
    -> socket
    -> socket
    -> peerAddr
    -> peerAddr
    -> ClientAndServerData req
    -> ClientAndServerData req
    -> m Property
bidirectionalExperiment :: forall peerAddr socket acc req resp (m :: * -> *).
(ConnectionManagerMonad m, MonadAsync m, MonadDelay m, MonadFix m,
 MonadLabelledSTM m, MonadTraceSTM m, MonadSay m, acc ~ [req],
 resp ~ [req], Ord peerAddr, Show peerAddr, Typeable peerAddr,
 Eq peerAddr, Hashable peerAddr, Serialise req, Show req,
 Serialise resp, Show resp, Eq resp, Typeable req, Typeable resp,
 Show acc) =>
Bool
-> StdGen
-> Timeouts
-> Snocket m socket peerAddr
-> MakeBearer m socket
-> (socket -> m ())
-> socket
-> socket
-> peerAddr
-> peerAddr
-> ClientAndServerData req
-> ClientAndServerData req
-> m Property
bidirectionalExperiment
    Bool
useLock StdGen
stdGen Timeouts
timeouts Snocket m socket peerAddr
snocket MakeBearer m socket
makeBearer socket -> m ()
confSock socket
socket0 socket
socket1 peerAddr
localAddr0 peerAddr
localAddr1
    ClientAndServerData req
clientAndServerData0 ClientAndServerData req
clientAndServerData1 = do
      let (StdGen
stdGen', StdGen
stdGen'') = StdGen -> (StdGen, StdGen)
forall g. RandomGen g => g -> (g, g)
Random.split StdGen
stdGen
      lock <- () -> m (StrictTMVar m ())
forall (m :: * -> *) a. MonadSTM m => a -> m (StrictTMVar m a)
newTMVarIO ()
      connStateIdSupply <- atomically $ CM.newConnStateIdSupply (Proxy @m)
      nextRequests0 <- oneshotNextRequests clientAndServerData0
      nextRequests1 <- oneshotNextRequests clientAndServerData1
      withBidirectionalConnectionManager "node-0" timeouts
                                         nullTracer nullTracer nullTracer nullTracer Mx.nullTracers
                                         nullTracer stdGen' snocket makeBearer
                                         connStateIdSupply confSock
                                         socket0 (Just localAddr0)
                                         [accumulatorInit clientAndServerData0]
                                         nextRequests0
                                         noTimeLimitsHandshake
                                         maxAcceptedConnectionsLimit
        (\ConnectionManagerWithExpandedCtx
  'InitiatorResponderMode
  socket
  peerAddr
  DataFlowProtocolData
  UnversionedProtocol
  ByteString
  m
  [resp]
  [req]
connectionManager0 peerAddr
_serverAddr0 Async m Void
_serverAsync0 -> do
          String
-> Timeouts
-> Tracer m (WithName String (RemoteTransitionTrace peerAddr))
-> Tracer m (WithName String (AbstractTransitionTrace ConnStateId))
-> Tracer
     m
     (WithName
        String
        (Trace
           peerAddr
           (ConnectionHandlerTrace UnversionedProtocol DataFlowProtocolData)))
-> Tracer m (WithName String (Trace peerAddr))
-> Tracers' m (WithNameAndBearer String peerAddr)
-> Tracer m (WithName String (Debug peerAddr DataFlowProtocolData))
-> StdGen
-> Snocket m socket peerAddr
-> MakeBearer m socket
-> ConnStateIdSupply m
-> (socket -> m ())
-> socket
-> Maybe peerAddr
-> [req]
-> TemperatureBundle (ConnectionId peerAddr -> STM m [req])
-> ProtocolTimeLimits (Handshake UnversionedProtocol Term)
-> AcceptedConnectionsLimit
-> (ConnectionManagerWithExpandedCtx
      'InitiatorResponderMode
      socket
      peerAddr
      DataFlowProtocolData
      UnversionedProtocol
      ByteString
      m
      [resp]
      [req]
    -> peerAddr -> Async m Void -> m Property)
-> m Property
forall name peerAddr socket acc req resp (m :: * -> *) a.
(ConnectionManagerMonad m, acc ~ [req], resp ~ [req], Ord peerAddr,
 Show peerAddr, Typeable peerAddr, Serialise req, Typeable req,
 MonadAsync m, MonadDelay m, MonadFix m, MonadLabelledSTM m,
 MonadTraceSTM m, MonadSay m, Show req, Show name) =>
name
-> Timeouts
-> Tracer m (WithName name (RemoteTransitionTrace peerAddr))
-> Tracer m (WithName name (AbstractTransitionTrace ConnStateId))
-> Tracer
     m
     (WithName
        name
        (Trace
           peerAddr
           (ConnectionHandlerTrace UnversionedProtocol DataFlowProtocolData)))
-> Tracer m (WithName name (Trace peerAddr))
-> Tracers' m (WithNameAndBearer name peerAddr)
-> Tracer m (WithName name (Debug peerAddr DataFlowProtocolData))
-> StdGen
-> Snocket m socket peerAddr
-> MakeBearer m socket
-> ConnStateIdSupply m
-> (socket -> m ())
-> socket
-> Maybe peerAddr
-> acc
-> TemperatureBundle (ConnectionId peerAddr -> STM m [req])
-> ProtocolTimeLimits (Handshake UnversionedProtocol Term)
-> AcceptedConnectionsLimit
-> (ConnectionManagerWithExpandedCtx
      'InitiatorResponderMode
      socket
      peerAddr
      DataFlowProtocolData
      UnversionedProtocol
      ByteString
      m
      [resp]
      acc
    -> peerAddr -> Async m Void -> m a)
-> m a
withBidirectionalConnectionManager String
"node-1" Timeouts
timeouts
                                             Tracer m (WithName String (RemoteTransitionTrace peerAddr))
forall (m :: * -> *) a. Applicative m => Tracer m a
nullTracer Tracer m (WithName String (AbstractTransitionTrace ConnStateId))
forall (m :: * -> *) a. Applicative m => Tracer m a
nullTracer Tracer
  m
  (WithName
     String
     (Trace
        peerAddr
        (ConnectionHandlerTrace UnversionedProtocol DataFlowProtocolData)))
forall (m :: * -> *) a. Applicative m => Tracer m a
nullTracer Tracer m (WithName String (Trace peerAddr))
forall (m :: * -> *) a. Applicative m => Tracer m a
nullTracer Tracers' m (WithNameAndBearer String peerAddr)
forall (m :: * -> *) (f :: * -> *). Applicative m => Tracers' m f
Mx.nullTracers
                                             Tracer m (WithName String (Debug peerAddr DataFlowProtocolData))
forall (m :: * -> *) a. Applicative m => Tracer m a
nullTracer StdGen
stdGen'' Snocket m socket peerAddr
snocket MakeBearer m socket
makeBearer
                                             ConnStateIdSupply m
connStateIdSupply socket -> m ()
confSock
                                             socket
socket1 (peerAddr -> Maybe peerAddr
forall a. a -> Maybe a
Just peerAddr
localAddr1)
                                             [ClientAndServerData req -> req
forall req. ClientAndServerData req -> req
accumulatorInit ClientAndServerData req
clientAndServerData1]
                                             TemperatureBundle (ConnectionId peerAddr -> STM m [req])
nextRequests1
                                             ProtocolTimeLimits (Handshake UnversionedProtocol Term)
forall {k} (vNumber :: k).
ProtocolTimeLimits (Handshake vNumber Term)
noTimeLimitsHandshake
                                             AcceptedConnectionsLimit
maxAcceptedConnectionsLimit
            (\ConnectionManagerWithExpandedCtx
  'InitiatorResponderMode
  socket
  peerAddr
  DataFlowProtocolData
  UnversionedProtocol
  ByteString
  m
  [resp]
  [req]
connectionManager1 peerAddr
_serverAddr1 Async m Void
serverAsync1 -> do
              Async m Void -> m ()
forall (m :: * -> *) a.
(MonadAsync m, MonadFork m, MonadMask m) =>
Async m a -> m ()
link Async m Void
serverAsync1
              -- runInitiatorProtocols returns a list of results per each
              -- protocol in each bucket (warm \/ hot \/ established); but
              -- we run only one mini-protocol. We can use `concat` to
              -- flatten the results.
              ( rs0 :: [Either SomeException (TemperatureBundle [resp])]
                , rs1 :: [Either SomeException (TemperatureBundle [resp])]
                ) <-
                -- Run initiator twice; this tests if the responders on
                -- the other end are restarted.
                Int
-> m (Either SomeException (TemperatureBundle [resp]))
-> m [Either SomeException (TemperatureBundle [resp])]
forall (m :: * -> *) a. Applicative m => Int -> m a -> m [a]
replicateM
                  (ClientAndServerData req -> Int
forall req. ClientAndServerData req -> Int
numberOfRounds ClientAndServerData req
clientAndServerData0)
                  (m (Connected
     peerAddr
     (HandleWithExpandedCtx
        'InitiatorResponderMode
        peerAddr
        DataFlowProtocolData
        ByteString
        m
        [resp]
        [req])
     (HandlerError UnversionedProtocol))
-> (Connected
      peerAddr
      (HandleWithExpandedCtx
         'InitiatorResponderMode
         peerAddr
         DataFlowProtocolData
         ByteString
         m
         [resp]
         [req])
      (HandlerError UnversionedProtocol)
    -> m (OperationResult AbstractState))
-> (Connected
      peerAddr
      (HandleWithExpandedCtx
         'InitiatorResponderMode
         peerAddr
         DataFlowProtocolData
         ByteString
         m
         [resp]
         [req])
      (HandlerError UnversionedProtocol)
    -> m (Either SomeException (TemperatureBundle [resp])))
-> m (Either SomeException (TemperatureBundle [resp]))
forall a b c. m a -> (a -> m b) -> (a -> m c) -> m c
forall (m :: * -> *) a b c.
MonadThrow m =>
m a -> (a -> m b) -> (a -> m c) -> m c
bracket
                    (Bool
-> StrictTMVar m ()
-> m (Connected
        peerAddr
        (HandleWithExpandedCtx
           'InitiatorResponderMode
           peerAddr
           DataFlowProtocolData
           ByteString
           m
           [resp]
           [req])
        (HandlerError UnversionedProtocol))
-> m (Connected
        peerAddr
        (HandleWithExpandedCtx
           'InitiatorResponderMode
           peerAddr
           DataFlowProtocolData
           ByteString
           m
           [resp]
           [req])
        (HandlerError UnversionedProtocol))
forall (m :: * -> *) a.
(MonadSTM m, MonadThrow m) =>
Bool -> StrictTMVar m () -> m a -> m a
withLock Bool
useLock StrictTMVar m ()
lock
                      (ConnectionManagerWithExpandedCtx
  'InitiatorResponderMode
  socket
  peerAddr
  DataFlowProtocolData
  UnversionedProtocol
  ByteString
  m
  [resp]
  [req]
-> AcquireOutboundConnection
     peerAddr
     (HandleWithExpandedCtx
        'InitiatorResponderMode
        peerAddr
        DataFlowProtocolData
        ByteString
        m
        [resp]
        [req])
     (HandlerError UnversionedProtocol)
     m
forall (muxMode :: Mode) socket peerAddr handle handleError
       (m :: * -> *).
(HasInitiator muxMode ~ 'True) =>
ConnectionManager muxMode socket peerAddr handle handleError m
-> AcquireOutboundConnection peerAddr handle handleError m
acquireOutboundConnection
                        ConnectionManagerWithExpandedCtx
  'InitiatorResponderMode
  socket
  peerAddr
  DataFlowProtocolData
  UnversionedProtocol
  ByteString
  m
  [resp]
  [req]
connectionManager0
                        DiffusionMode
InitiatorAndResponderDiffusionMode
                        peerAddr
localAddr1))
                    (\case
                      Connected ConnectionId peerAddr
connId DataFlow
_ HandleWithExpandedCtx
  'InitiatorResponderMode
  peerAddr
  DataFlowProtocolData
  ByteString
  m
  [resp]
  [req]
_ ->
                        ConnectionManagerWithExpandedCtx
  'InitiatorResponderMode
  socket
  peerAddr
  DataFlowProtocolData
  UnversionedProtocol
  ByteString
  m
  [resp]
  [req]
-> ConnectionId peerAddr -> m (OperationResult AbstractState)
forall (muxMode :: Mode) socket peerAddr handle handleError
       (m :: * -> *).
(HasInitiator muxMode ~ 'True) =>
ConnectionManager muxMode socket peerAddr handle handleError m
-> ConnectionId peerAddr -> m (OperationResult AbstractState)
releaseOutboundConnection
                          ConnectionManagerWithExpandedCtx
  'InitiatorResponderMode
  socket
  peerAddr
  DataFlowProtocolData
  UnversionedProtocol
  ByteString
  m
  [resp]
  [req]
connectionManager0
                          ConnectionId peerAddr
connId
                      Disconnected {} ->
                        String -> m (OperationResult AbstractState)
forall a. HasCallStack => String -> a
error String
"bidirectionalExperiment: impossible happened")
                    (\Connected
  peerAddr
  (HandleWithExpandedCtx
     'InitiatorResponderMode
     peerAddr
     DataFlowProtocolData
     ByteString
     m
     [resp]
     [req])
  (HandlerError UnversionedProtocol)
connHandle ->
                      case Connected
  peerAddr
  (HandleWithExpandedCtx
     'InitiatorResponderMode
     peerAddr
     DataFlowProtocolData
     ByteString
     m
     [resp]
     [req])
  (HandlerError UnversionedProtocol)
connHandle of
                        Connected ConnectionId peerAddr
connId DataFlow
_ (Handle Mux 'InitiatorResponderMode m
mux OuroborosBundle
  'InitiatorResponderMode
  (ExpandedInitiatorContext peerAddr m)
  (ResponderContext peerAddr)
  ByteString
  m
  [resp]
  [req]
muxBundle TemperatureBundle (StrictTVar m ControlMessage)
controlBundle DataFlowProtocolData
_) -> do
                          forall (m :: * -> *) e a.
(MonadCatch m, Exception e) =>
m a -> m (Either e a)
try @_ @SomeException (m (TemperatureBundle [resp])
 -> m (Either SomeException (TemperatureBundle [resp])))
-> m (TemperatureBundle [resp])
-> m (Either SomeException (TemperatureBundle [resp]))
forall a b. (a -> b) -> a -> b
$
                            SingMuxMode 'InitiatorResponderMode
-> Mux 'InitiatorResponderMode m
-> OuroborosBundle
     'InitiatorResponderMode
     (ExpandedInitiatorContext peerAddr m)
     (ResponderContext peerAddr)
     ByteString
     m
     [resp]
     [req]
-> TemperatureBundle (StrictTVar m ControlMessage)
-> ConnectionId peerAddr
-> m (TemperatureBundle [resp])
forall (muxMode :: Mode) addr (m :: * -> *) a b.
(Alternative (STM m), MonadAsync m, MonadCatch m,
 MonadLabelledSTM m, MonadMask m, MonadSTM m, MonadThrow (STM m),
 HasInitiator muxMode ~ 'True, MonadSay m) =>
SingMuxMode muxMode
-> Mux muxMode m
-> OuroborosBundle
     muxMode
     (ExpandedInitiatorContext addr m)
     (ResponderContext addr)
     ByteString
     m
     a
     b
-> TemperatureBundle (StrictTVar m ControlMessage)
-> ConnectionId addr
-> m (TemperatureBundle a)
runInitiatorProtocols
                              SingMuxMode 'InitiatorResponderMode
SingInitiatorResponderMode
                              Mux 'InitiatorResponderMode m
mux OuroborosBundle
  'InitiatorResponderMode
  (ExpandedInitiatorContext peerAddr m)
  (ResponderContext peerAddr)
  ByteString
  m
  [resp]
  [req]
muxBundle TemperatureBundle (StrictTVar m ControlMessage)
controlBundle ConnectionId peerAddr
connId
                        Disconnected ConnectionId peerAddr
_ DisconnectionException (HandlerError UnversionedProtocol)
err ->
                          IOError -> m (Either SomeException (TemperatureBundle [resp]))
forall e a. Exception e => e -> m a
forall (m :: * -> *) e a. (MonadThrow m, Exception e) => e -> m a
throwIO (String -> IOError
userError (String -> IOError) -> String -> IOError
forall a b. (a -> b) -> a -> b
$ String
"bidirectionalExperiment: " String -> ShowS
forall a. [a] -> [a] -> [a]
++ DisconnectionException (HandlerError UnversionedProtocol) -> String
forall a. Show a => a -> String
show DisconnectionException (HandlerError UnversionedProtocol)
err)
                  ))
                m [Either SomeException (TemperatureBundle [resp])]
-> m [Either SomeException (TemperatureBundle [resp])]
-> m ([Either SomeException (TemperatureBundle [resp])],
      [Either SomeException (TemperatureBundle [resp])])
forall a b. m a -> m b -> m (a, b)
forall (m :: * -> *) a b. MonadAsync m => m a -> m b -> m (a, b)
`concurrently`
                Int
-> m (Either SomeException (TemperatureBundle [resp]))
-> m [Either SomeException (TemperatureBundle [resp])]
forall (m :: * -> *) a. Applicative m => Int -> m a -> m [a]
replicateM
                  (ClientAndServerData req -> Int
forall req. ClientAndServerData req -> Int
numberOfRounds ClientAndServerData req
clientAndServerData1)
                  (m (Connected
     peerAddr
     (HandleWithExpandedCtx
        'InitiatorResponderMode
        peerAddr
        DataFlowProtocolData
        ByteString
        m
        [resp]
        [req])
     (HandlerError UnversionedProtocol))
-> (Connected
      peerAddr
      (HandleWithExpandedCtx
         'InitiatorResponderMode
         peerAddr
         DataFlowProtocolData
         ByteString
         m
         [resp]
         [req])
      (HandlerError UnversionedProtocol)
    -> m (OperationResult AbstractState))
-> (Connected
      peerAddr
      (HandleWithExpandedCtx
         'InitiatorResponderMode
         peerAddr
         DataFlowProtocolData
         ByteString
         m
         [resp]
         [req])
      (HandlerError UnversionedProtocol)
    -> m (Either SomeException (TemperatureBundle [resp])))
-> m (Either SomeException (TemperatureBundle [resp]))
forall a b c. m a -> (a -> m b) -> (a -> m c) -> m c
forall (m :: * -> *) a b c.
MonadThrow m =>
m a -> (a -> m b) -> (a -> m c) -> m c
bracket
                    (Bool
-> StrictTMVar m ()
-> m (Connected
        peerAddr
        (HandleWithExpandedCtx
           'InitiatorResponderMode
           peerAddr
           DataFlowProtocolData
           ByteString
           m
           [resp]
           [req])
        (HandlerError UnversionedProtocol))
-> m (Connected
        peerAddr
        (HandleWithExpandedCtx
           'InitiatorResponderMode
           peerAddr
           DataFlowProtocolData
           ByteString
           m
           [resp]
           [req])
        (HandlerError UnversionedProtocol))
forall (m :: * -> *) a.
(MonadSTM m, MonadThrow m) =>
Bool -> StrictTMVar m () -> m a -> m a
withLock Bool
useLock StrictTMVar m ()
lock
                      (ConnectionManagerWithExpandedCtx
  'InitiatorResponderMode
  socket
  peerAddr
  DataFlowProtocolData
  UnversionedProtocol
  ByteString
  m
  [resp]
  [req]
-> AcquireOutboundConnection
     peerAddr
     (HandleWithExpandedCtx
        'InitiatorResponderMode
        peerAddr
        DataFlowProtocolData
        ByteString
        m
        [resp]
        [req])
     (HandlerError UnversionedProtocol)
     m
forall (muxMode :: Mode) socket peerAddr handle handleError
       (m :: * -> *).
(HasInitiator muxMode ~ 'True) =>
ConnectionManager muxMode socket peerAddr handle handleError m
-> AcquireOutboundConnection peerAddr handle handleError m
acquireOutboundConnection
                        ConnectionManagerWithExpandedCtx
  'InitiatorResponderMode
  socket
  peerAddr
  DataFlowProtocolData
  UnversionedProtocol
  ByteString
  m
  [resp]
  [req]
connectionManager1
                        DiffusionMode
InitiatorAndResponderDiffusionMode
                        peerAddr
localAddr0))
                    (\case
                      Connected ConnectionId peerAddr
connId DataFlow
_ HandleWithExpandedCtx
  'InitiatorResponderMode
  peerAddr
  DataFlowProtocolData
  ByteString
  m
  [resp]
  [req]
_ ->
                        ConnectionManagerWithExpandedCtx
  'InitiatorResponderMode
  socket
  peerAddr
  DataFlowProtocolData
  UnversionedProtocol
  ByteString
  m
  [resp]
  [req]
-> ConnectionId peerAddr -> m (OperationResult AbstractState)
forall (muxMode :: Mode) socket peerAddr handle handleError
       (m :: * -> *).
(HasInitiator muxMode ~ 'True) =>
ConnectionManager muxMode socket peerAddr handle handleError m
-> ConnectionId peerAddr -> m (OperationResult AbstractState)
releaseOutboundConnection
                          ConnectionManagerWithExpandedCtx
  'InitiatorResponderMode
  socket
  peerAddr
  DataFlowProtocolData
  UnversionedProtocol
  ByteString
  m
  [resp]
  [req]
connectionManager1
                          ConnectionId peerAddr
connId
                      Disconnected {} ->
                        String -> m (OperationResult AbstractState)
forall a. HasCallStack => String -> a
error String
"ibidirectionalExperiment: impossible happened")
                    (\Connected
  peerAddr
  (HandleWithExpandedCtx
     'InitiatorResponderMode
     peerAddr
     DataFlowProtocolData
     ByteString
     m
     [resp]
     [req])
  (HandlerError UnversionedProtocol)
connHandle ->
                      case Connected
  peerAddr
  (HandleWithExpandedCtx
     'InitiatorResponderMode
     peerAddr
     DataFlowProtocolData
     ByteString
     m
     [resp]
     [req])
  (HandlerError UnversionedProtocol)
connHandle of
                        Connected ConnectionId peerAddr
connId DataFlow
_ (Handle Mux 'InitiatorResponderMode m
mux OuroborosBundle
  'InitiatorResponderMode
  (ExpandedInitiatorContext peerAddr m)
  (ResponderContext peerAddr)
  ByteString
  m
  [resp]
  [req]
muxBundle TemperatureBundle (StrictTVar m ControlMessage)
controlBundle DataFlowProtocolData
_) -> do
                          forall (m :: * -> *) e a.
(MonadCatch m, Exception e) =>
m a -> m (Either e a)
try @_ @SomeException (m (TemperatureBundle [resp])
 -> m (Either SomeException (TemperatureBundle [resp])))
-> m (TemperatureBundle [resp])
-> m (Either SomeException (TemperatureBundle [resp]))
forall a b. (a -> b) -> a -> b
$
                            SingMuxMode 'InitiatorResponderMode
-> Mux 'InitiatorResponderMode m
-> OuroborosBundle
     'InitiatorResponderMode
     (ExpandedInitiatorContext peerAddr m)
     (ResponderContext peerAddr)
     ByteString
     m
     [resp]
     [req]
-> TemperatureBundle (StrictTVar m ControlMessage)
-> ConnectionId peerAddr
-> m (TemperatureBundle [resp])
forall (muxMode :: Mode) addr (m :: * -> *) a b.
(Alternative (STM m), MonadAsync m, MonadCatch m,
 MonadLabelledSTM m, MonadMask m, MonadSTM m, MonadThrow (STM m),
 HasInitiator muxMode ~ 'True, MonadSay m) =>
SingMuxMode muxMode
-> Mux muxMode m
-> OuroborosBundle
     muxMode
     (ExpandedInitiatorContext addr m)
     (ResponderContext addr)
     ByteString
     m
     a
     b
-> TemperatureBundle (StrictTVar m ControlMessage)
-> ConnectionId addr
-> m (TemperatureBundle a)
runInitiatorProtocols
                              SingMuxMode 'InitiatorResponderMode
SingInitiatorResponderMode
                              Mux 'InitiatorResponderMode m
mux OuroborosBundle
  'InitiatorResponderMode
  (ExpandedInitiatorContext peerAddr m)
  (ResponderContext peerAddr)
  ByteString
  m
  [resp]
  [req]
muxBundle TemperatureBundle (StrictTVar m ControlMessage)
controlBundle ConnectionId peerAddr
connId
                        Disconnected ConnectionId peerAddr
_ DisconnectionException (HandlerError UnversionedProtocol)
err ->
                          IOError -> m (Either SomeException (TemperatureBundle [resp]))
forall e a. Exception e => e -> m a
forall (m :: * -> *) e a. (MonadThrow m, Exception e) => e -> m a
throwIO (String -> IOError
userError (String -> IOError) -> String -> IOError
forall a b. (a -> b) -> a -> b
$ String
"bidirectionalExperiment: " String -> ShowS
forall a. [a] -> [a] -> [a]
++ DisconnectionException (HandlerError UnversionedProtocol) -> String
forall a. Show a => a -> String
show DisconnectionException (HandlerError UnversionedProtocol)
err)
                  ))

              pure $
                foldr
                  (\ (Either SomeException (TemperatureBundle [resp])
r, TemperatureBundle [resp]
expected) Property
acc ->
                    case Either SomeException (TemperatureBundle [resp])
r of
                      Left SomeException
err -> String -> Bool -> Property
forall prop. Testable prop => String -> prop -> Property
counterexample (SomeException -> String
forall a. Show a => a -> String
show SomeException
err) Bool
False
                      Right TemperatureBundle [resp]
a  -> TemperatureBundle [resp]
a TemperatureBundle [resp] -> TemperatureBundle [resp] -> Property
forall a. (Eq a, Show a) => a -> a -> Property
=== TemperatureBundle [resp]
expected Property -> Property -> Property
forall prop1 prop2.
(Testable prop1, Testable prop2) =>
prop1 -> prop2 -> Property
.&&. Property
acc)
                  (property True)
                  (zip rs0 (expectedResult clientAndServerData0 clientAndServerData1))
                .&&.
                foldr
                  (\ (Either SomeException (TemperatureBundle [resp])
r, TemperatureBundle [resp]
expected) Property
acc ->
                    case Either SomeException (TemperatureBundle [resp])
r of
                      Left SomeException
err -> String -> Bool -> Property
forall prop. Testable prop => String -> prop -> Property
counterexample (SomeException -> String
forall a. Show a => a -> String
show SomeException
err) Bool
False
                      Right TemperatureBundle [resp]
a  -> TemperatureBundle [resp]
a TemperatureBundle [resp] -> TemperatureBundle [resp] -> Property
forall a. (Eq a, Show a) => a -> a -> Property
=== TemperatureBundle [resp]
expected Property -> Property -> Property
forall prop1 prop2.
(Testable prop1, Testable prop2) =>
prop1 -> prop2 -> Property
.&&. Property
acc)
                  (property True)
                  (zip rs1 (expectedResult clientAndServerData1 clientAndServerData0))
                ))


--
-- Utils
--

withLock :: ( MonadSTM   m
            , MonadThrow m
            )
         => Bool
         -> StrictTMVar m ()
         -> m a
         -> m a
withLock :: forall (m :: * -> *) a.
(MonadSTM m, MonadThrow m) =>
Bool -> StrictTMVar m () -> m a -> m a
withLock Bool
False StrictTMVar m ()
_v m a
m = m a
m
withLock Bool
True   StrictTMVar m ()
v m a
m =
    m () -> (() -> m ()) -> (() -> m a) -> m a
forall a b c. m a -> (a -> m b) -> (a -> m c) -> m c
forall (m :: * -> *) a b c.
MonadThrow m =>
m a -> (a -> m b) -> (a -> m c) -> m c
bracket (STM m () -> m ()
forall a. HasCallStack => STM m a -> m a
forall (m :: * -> *) a.
(MonadSTM m, HasCallStack) =>
STM m a -> m a
atomically (STM m () -> m ()) -> STM m () -> m ()
forall a b. (a -> b) -> a -> b
$ StrictTMVar m () -> STM m ()
forall (m :: * -> *) a. MonadSTM m => StrictTMVar m a -> STM m a
takeTMVar StrictTMVar m ()
v)
            (STM m () -> m ()
forall a. HasCallStack => STM m a -> m a
forall (m :: * -> *) a.
(MonadSTM m, HasCallStack) =>
STM m a -> m a
atomically (STM m () -> m ()) -> (() -> STM m ()) -> () -> m ()
forall b c a. (b -> c) -> (a -> b) -> a -> c
. StrictTMVar m () -> () -> STM m ()
forall (m :: * -> *) a.
MonadSTM m =>
StrictTMVar m a -> a -> STM m ()
putTMVar StrictTMVar m ()
v)
            (m a -> () -> m a
forall a b. a -> b -> a
const m a
m)