{-# LANGUAGE CPP                 #-}
{-# LANGUAGE ConstraintKinds     #-}
{-# LANGUAGE DataKinds           #-}
{-# LANGUAGE FlexibleContexts    #-}
{-# LANGUAGE GADTs               #-}
{-# LANGUAGE LambdaCase          #-}
{-# LANGUAGE NamedFieldPuns      #-}
{-# LANGUAGE PolyKinds           #-}
{-# LANGUAGE RankNTypes          #-}
{-# LANGUAGE ScopedTypeVariables #-}
{-# LANGUAGE TupleSections       #-}
{-# LANGUAGE TypeApplications    #-}
{-# LANGUAGE TypeOperators       #-}

-- for 'debugTracer'
{-# OPTIONS_GHC -Wno-redundant-constraints #-}
#if __GLASGOW_HASKELL__ >= 908
{-# OPTIONS_GHC -Wno-x-partial #-}
#endif

-- | This module contains experiments which can be executed either in `IO` or
-- in `IOSim`.
--
module Test.Ouroboros.Network.ConnectionManager.Experiments
  ( ClientAndServerData (..)
  , unidirectionalExperiment
  , bidirectionalExperiment
  , ConnectionManagerMonad
  , withInitiatorOnlyConnectionManager
  , withBidirectionalConnectionManager
  , runInitiatorProtocols
  , oneshotNextRequests
  ) where

import Control.Applicative (Alternative)
import Control.Concurrent.Class.MonadSTM.Strict
import Control.Exception (AssertionFailed)
import Control.Monad (replicateM, (>=>))
import Control.Monad.Class.MonadAsync
import Control.Monad.Class.MonadFork
import Control.Monad.Class.MonadSay
import Control.Monad.Class.MonadST (MonadST)
import Control.Monad.Class.MonadThrow
import Control.Monad.Class.MonadTime.SI
import Control.Monad.Class.MonadTimer.SI
import Control.Monad.Fix (MonadFix)
import Control.Tracer (Tracer (..), contramap, nullTracer)

import Codec.Serialise.Class (Serialise)
import Data.ByteString.Lazy (ByteString)
import Data.ByteString.Lazy qualified as LBS
import Data.Functor (($>), (<&>))
import Data.List (mapAccumL)
import Data.List.NonEmpty (NonEmpty (..))
import Data.Proxy (Proxy (..))
import Data.Typeable (Typeable)
import Data.Void (Void)

import System.Random (StdGen, split)

import Test.QuickCheck

import Codec.CBOR.Term (Term)

import Network.Mux qualified as Mx
import Network.TypedProtocol.Core
import Network.TypedProtocol.Peer.Client

import Network.TypedProtocol.ReqResp.Client
import Network.TypedProtocol.ReqResp.Codec.CBOR
import Network.TypedProtocol.ReqResp.Examples
import Network.TypedProtocol.ReqResp.Server
import Network.TypedProtocol.ReqResp.Type as ReqResp

import Ouroboros.Network.ConnectionHandler
import Ouroboros.Network.ConnectionId
import Ouroboros.Network.ConnectionManager.Core qualified as CM
import Ouroboros.Network.ConnectionManager.State qualified as CM
import Ouroboros.Network.ConnectionManager.Types
import Ouroboros.Network.Context
import Ouroboros.Network.ControlMessage
import Ouroboros.Network.Driver.Limits
import Ouroboros.Network.InboundGovernor qualified as InboundGovernor
import Ouroboros.Network.Mux
import Ouroboros.Network.MuxMode
import Ouroboros.Network.NodeToNode.Version (DiffusionMode (..))
import Ouroboros.Network.Protocol.Handshake
import Ouroboros.Network.Protocol.Handshake.Codec (cborTermVersionDataCodec,
           noTimeLimitsHandshake, timeLimitsHandshake)
import Ouroboros.Network.Protocol.Handshake.Type (Handshake)
import Ouroboros.Network.Protocol.Handshake.Unversioned
import Ouroboros.Network.Protocol.Handshake.Version (Acceptable (..),
           Queryable (..))
import Ouroboros.Network.RethrowPolicy
import Ouroboros.Network.Server.RateLimiting (AcceptedConnectionsLimit (..))
import Ouroboros.Network.Server2 (RemoteTransitionTrace)
import Ouroboros.Network.Server2 qualified as Server
import Ouroboros.Network.Snocket (Snocket)
import Ouroboros.Network.Snocket qualified as Snocket

import Test.Ouroboros.Network.ConnectionManager.Timeouts
import Test.Ouroboros.Network.Orphans ()
import Test.Ouroboros.Network.Utils (WithName (..))

import Ouroboros.Network.ConnectionManager.InformationChannel
           (newInformationChannel)


--
-- Server tests
--

-- | The protocol will run three instances of  `ReqResp` protocol; one for each
-- state: warm, hot and established.
--
data ClientAndServerData req = ClientAndServerData {
    forall req. ClientAndServerData req -> req
accumulatorInit              :: req,
    -- ^ Initial value. In for each request the server sends back a list received requests (in
    --   reverse order) terminating with the accumulatorInit.
    forall req. ClientAndServerData req -> [[req]]
hotInitiatorRequests         :: [[req]],
    -- ^ list of requests run by the hot initiator in each round; Running
    -- multiple rounds allows us to test restarting of responders.
    forall req. ClientAndServerData req -> [[req]]
warmInitiatorRequests        :: [[req]],
    -- ^ list of requests run by the warm initiator in each round
    forall req. ClientAndServerData req -> [[req]]
establishedInitiatorRequests :: [[req]]
    -- ^ list of requests run by the established initiator in each round
  }
  deriving Int -> ClientAndServerData req -> ShowS
[ClientAndServerData req] -> ShowS
ClientAndServerData req -> String
(Int -> ClientAndServerData req -> ShowS)
-> (ClientAndServerData req -> String)
-> ([ClientAndServerData req] -> ShowS)
-> Show (ClientAndServerData req)
forall req. Show req => Int -> ClientAndServerData req -> ShowS
forall req. Show req => [ClientAndServerData req] -> ShowS
forall req. Show req => ClientAndServerData req -> String
forall a.
(Int -> a -> ShowS) -> (a -> String) -> ([a] -> ShowS) -> Show a
$cshowsPrec :: forall req. Show req => Int -> ClientAndServerData req -> ShowS
showsPrec :: Int -> ClientAndServerData req -> ShowS
$cshow :: forall req. Show req => ClientAndServerData req -> String
show :: ClientAndServerData req -> String
$cshowList :: forall req. Show req => [ClientAndServerData req] -> ShowS
showList :: [ClientAndServerData req] -> ShowS
Show


-- Number of rounds to exhaust all the requests.
--
numberOfRounds :: ClientAndServerData req ->  Int
numberOfRounds :: forall req. ClientAndServerData req -> Int
numberOfRounds ClientAndServerData {
                  [[req]]
hotInitiatorRequests :: forall req. ClientAndServerData req -> [[req]]
hotInitiatorRequests :: [[req]]
hotInitiatorRequests,
                  [[req]]
warmInitiatorRequests :: forall req. ClientAndServerData req -> [[req]]
warmInitiatorRequests :: [[req]]
warmInitiatorRequests,
                  [[req]]
establishedInitiatorRequests :: forall req. ClientAndServerData req -> [[req]]
establishedInitiatorRequests :: [[req]]
establishedInitiatorRequests
                } =
    [[req]] -> Int
forall a. [a] -> Int
forall (t :: * -> *) a. Foldable t => t a -> Int
length [[req]]
hotInitiatorRequests
    Int -> Int -> Int
forall a. Ord a => a -> a -> a
`max`
    [[req]] -> Int
forall a. [a] -> Int
forall (t :: * -> *) a. Foldable t => t a -> Int
length [[req]]
warmInitiatorRequests
    Int -> Int -> Int
forall a. Ord a => a -> a -> a
`max`
    [[req]] -> Int
forall a. [a] -> Int
forall (t :: * -> *) a. Foldable t => t a -> Int
length [[req]]
establishedInitiatorRequests


-- | We use it to generate a list of messages for a list of rounds.  In each
-- round we connect to the same server, and run 'ReqResp' protocol.
--
arbitraryList :: Arbitrary a =>  Gen [[a]]
arbitraryList :: forall a. Arbitrary a => Gen [[a]]
arbitraryList =
    Int -> Gen [[a]] -> Gen [[a]]
forall a. Int -> Gen a -> Gen a
resize Int
3 (Gen [a] -> Gen [[a]]
forall a. Gen a -> Gen [a]
listOf (Int -> Gen [a] -> Gen [a]
forall a. Int -> Gen a -> Gen a
resize Int
3 (Gen a -> Gen [a]
forall a. Gen a -> Gen [a]
listOf (Int -> Gen a -> Gen a
forall a. Int -> Gen a -> Gen a
resize Int
100 Gen a
forall a. Arbitrary a => Gen a
arbitrary))))

instance Arbitrary req => Arbitrary (ClientAndServerData req) where
    arbitrary :: Gen (ClientAndServerData req)
arbitrary =
      req -> [[req]] -> [[req]] -> [[req]] -> ClientAndServerData req
forall req.
req -> [[req]] -> [[req]] -> [[req]] -> ClientAndServerData req
ClientAndServerData (req -> [[req]] -> [[req]] -> [[req]] -> ClientAndServerData req)
-> Gen req
-> Gen ([[req]] -> [[req]] -> [[req]] -> ClientAndServerData req)
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> Gen req
forall a. Arbitrary a => Gen a
arbitrary
                          Gen ([[req]] -> [[req]] -> [[req]] -> ClientAndServerData req)
-> Gen [[req]]
-> Gen ([[req]] -> [[req]] -> ClientAndServerData req)
forall a b. Gen (a -> b) -> Gen a -> Gen b
forall (f :: * -> *) a b. Applicative f => f (a -> b) -> f a -> f b
<*> Gen [[req]]
forall a. Arbitrary a => Gen [[a]]
arbitraryList
                          Gen ([[req]] -> [[req]] -> ClientAndServerData req)
-> Gen [[req]] -> Gen ([[req]] -> ClientAndServerData req)
forall a b. Gen (a -> b) -> Gen a -> Gen b
forall (f :: * -> *) a b. Applicative f => f (a -> b) -> f a -> f b
<*> Gen [[req]]
forall a. Arbitrary a => Gen [[a]]
arbitraryList
                          Gen ([[req]] -> ClientAndServerData req)
-> Gen [[req]] -> Gen (ClientAndServerData req)
forall a b. Gen (a -> b) -> Gen a -> Gen b
forall (f :: * -> *) a b. Applicative f => f (a -> b) -> f a -> f b
<*> Gen [[req]]
forall a. Arbitrary a => Gen [[a]]
arbitraryList

    shrink :: ClientAndServerData req -> [ClientAndServerData req]
shrink (ClientAndServerData req
ini [[req]]
hot [[req]]
warm [[req]]
est) = [[ClientAndServerData req]] -> [ClientAndServerData req]
forall (t :: * -> *) a. Foldable t => t [a] -> [a]
concat
      [ req -> [req]
forall a. Arbitrary a => a -> [a]
shrink req
ini  [req]
-> (req -> ClientAndServerData req) -> [ClientAndServerData req]
forall (f :: * -> *) a b. Functor f => f a -> (a -> b) -> f b
<&> \ req
ini'  -> req -> [[req]] -> [[req]] -> [[req]] -> ClientAndServerData req
forall req.
req -> [[req]] -> [[req]] -> [[req]] -> ClientAndServerData req
ClientAndServerData req
ini' [[req]]
hot  [[req]]
warm  [[req]]
est
      , [[req]] -> [[[req]]]
forall a. Arbitrary a => a -> [a]
shrink [[req]]
hot  [[[req]]]
-> ([[req]] -> ClientAndServerData req)
-> [ClientAndServerData req]
forall (f :: * -> *) a b. Functor f => f a -> (a -> b) -> f b
<&> \ [[req]]
hot'  -> req -> [[req]] -> [[req]] -> [[req]] -> ClientAndServerData req
forall req.
req -> [[req]] -> [[req]] -> [[req]] -> ClientAndServerData req
ClientAndServerData req
ini  [[req]]
hot' [[req]]
warm  [[req]]
est
      , [[req]] -> [[[req]]]
forall a. Arbitrary a => a -> [a]
shrink [[req]]
warm [[[req]]]
-> ([[req]] -> ClientAndServerData req)
-> [ClientAndServerData req]
forall (f :: * -> *) a b. Functor f => f a -> (a -> b) -> f b
<&> \ [[req]]
warm' -> req -> [[req]] -> [[req]] -> [[req]] -> ClientAndServerData req
forall req.
req -> [[req]] -> [[req]] -> [[req]] -> ClientAndServerData req
ClientAndServerData req
ini  [[req]]
hot  [[req]]
warm' [[req]]
est
      , [[req]] -> [[[req]]]
forall a. Arbitrary a => a -> [a]
shrink [[req]]
est  [[[req]]]
-> ([[req]] -> ClientAndServerData req)
-> [ClientAndServerData req]
forall (f :: * -> *) a b. Functor f => f a -> (a -> b) -> f b
<&> \ [[req]]
est'  -> req -> [[req]] -> [[req]] -> [[req]] -> ClientAndServerData req
forall req.
req -> [[req]] -> [[req]] -> [[req]] -> ClientAndServerData req
ClientAndServerData req
ini  [[req]]
hot  [[req]]
warm  [[req]]
est'
      ]

expectedResult :: ClientAndServerData req
               -> ClientAndServerData req
               -> [TemperatureBundle [[req]]]
expectedResult :: forall req.
ClientAndServerData req
-> ClientAndServerData req -> [TemperatureBundle [[req]]]
expectedResult client :: ClientAndServerData req
client@ClientAndServerData
                                   { [[req]]
hotInitiatorRequests :: forall req. ClientAndServerData req -> [[req]]
hotInitiatorRequests :: [[req]]
hotInitiatorRequests
                                   , [[req]]
warmInitiatorRequests :: forall req. ClientAndServerData req -> [[req]]
warmInitiatorRequests :: [[req]]
warmInitiatorRequests
                                   , [[req]]
establishedInitiatorRequests :: forall req. ClientAndServerData req -> [[req]]
establishedInitiatorRequests :: [[req]]
establishedInitiatorRequests
                                   }
               ClientAndServerData { req
accumulatorInit :: forall req. ClientAndServerData req -> req
accumulatorInit :: req
accumulatorInit
                                   } =
    [[req]] -> [[req]] -> [[req]] -> [TemperatureBundle [[req]]]
go
      (Int -> [[req]] -> [[req]]
forall a. Int -> [a] -> [a]
take Int
rounds ([[req]] -> [[req]]) -> [[req]] -> [[req]]
forall a b. (a -> b) -> a -> b
$ [[req]]
hotInitiatorRequests         [[req]] -> [[req]] -> [[req]]
forall a. [a] -> [a] -> [a]
++ [req] -> [[req]]
forall a. a -> [a]
repeat [])
      (Int -> [[req]] -> [[req]]
forall a. Int -> [a] -> [a]
take Int
rounds ([[req]] -> [[req]]) -> [[req]] -> [[req]]
forall a b. (a -> b) -> a -> b
$ [[req]]
warmInitiatorRequests        [[req]] -> [[req]] -> [[req]]
forall a. [a] -> [a] -> [a]
++ [req] -> [[req]]
forall a. a -> [a]
repeat [])
      (Int -> [[req]] -> [[req]]
forall a. Int -> [a] -> [a]
take Int
rounds ([[req]] -> [[req]]) -> [[req]] -> [[req]]
forall a b. (a -> b) -> a -> b
$ [[req]]
establishedInitiatorRequests [[req]] -> [[req]] -> [[req]]
forall a. [a] -> [a] -> [a]
++ [req] -> [[req]]
forall a. a -> [a]
repeat [])
  where
    rounds :: Int
rounds = ClientAndServerData req -> Int
forall req. ClientAndServerData req -> Int
numberOfRounds ClientAndServerData req
client
    fn :: [a] -> a -> ([a], [a])
fn [a]
acc a
x = (a
x a -> [a] -> [a]
forall a. a -> [a] -> [a]
: [a]
acc, a
x a -> [a] -> [a]
forall a. a -> [a] -> [a]
: [a]
acc)
    go :: [[req]] -> [[req]] -> [[req]] -> [TemperatureBundle [[req]]]
go ([req]
a : [[req]]
as) ([req]
b : [[req]]
bs) ([req]
c : [[req]]
cs) =
      WithProtocolTemperature 'Hot [[req]]
-> WithProtocolTemperature 'Warm [[req]]
-> WithProtocolTemperature 'Established [[req]]
-> TemperatureBundle [[req]]
forall a.
WithProtocolTemperature 'Hot a
-> WithProtocolTemperature 'Warm a
-> WithProtocolTemperature 'Established a
-> TemperatureBundle a
TemperatureBundle
        ([[req]] -> WithProtocolTemperature 'Hot [[req]]
forall a. a -> WithProtocolTemperature 'Hot a
WithHot         (([req], [[req]]) -> [[req]]
forall a b. (a, b) -> b
snd (([req], [[req]]) -> [[req]]) -> ([req], [[req]]) -> [[req]]
forall a b. (a -> b) -> a -> b
$ ([req] -> req -> ([req], [req]))
-> [req] -> [req] -> ([req], [[req]])
forall (t :: * -> *) s a b.
Traversable t =>
(s -> a -> (s, b)) -> s -> t a -> (s, t b)
mapAccumL [req] -> req -> ([req], [req])
forall {a}. [a] -> a -> ([a], [a])
fn [req
accumulatorInit] [req]
a))
        ([[req]] -> WithProtocolTemperature 'Warm [[req]]
forall a. a -> WithProtocolTemperature 'Warm a
WithWarm        (([req], [[req]]) -> [[req]]
forall a b. (a, b) -> b
snd (([req], [[req]]) -> [[req]]) -> ([req], [[req]]) -> [[req]]
forall a b. (a -> b) -> a -> b
$ ([req] -> req -> ([req], [req]))
-> [req] -> [req] -> ([req], [[req]])
forall (t :: * -> *) s a b.
Traversable t =>
(s -> a -> (s, b)) -> s -> t a -> (s, t b)
mapAccumL [req] -> req -> ([req], [req])
forall {a}. [a] -> a -> ([a], [a])
fn [req
accumulatorInit] [req]
b))
        ([[req]] -> WithProtocolTemperature 'Established [[req]]
forall a. a -> WithProtocolTemperature 'Established a
WithEstablished (([req], [[req]]) -> [[req]]
forall a b. (a, b) -> b
snd (([req], [[req]]) -> [[req]]) -> ([req], [[req]]) -> [[req]]
forall a b. (a -> b) -> a -> b
$ ([req] -> req -> ([req], [req]))
-> [req] -> [req] -> ([req], [[req]])
forall (t :: * -> *) s a b.
Traversable t =>
(s -> a -> (s, b)) -> s -> t a -> (s, t b)
mapAccumL [req] -> req -> ([req], [req])
forall {a}. [a] -> a -> ([a], [a])
fn [req
accumulatorInit] [req]
c))
      TemperatureBundle [[req]]
-> [TemperatureBundle [[req]]] -> [TemperatureBundle [[req]]]
forall a. a -> [a] -> [a]
: [[req]] -> [[req]] -> [[req]] -> [TemperatureBundle [[req]]]
go [[req]]
as [[req]]
bs [[req]]
cs
    go [] [] [] = []
    go [[req]]
_  [[req]]
_  [[req]]
_  = String -> [TemperatureBundle [[req]]]
forall a. HasCallStack => String -> a
error String
"expectedResult: impossible happened"

noNextRequests :: forall stm req peerAddr. Applicative stm => TemperatureBundle (ConnectionId peerAddr -> stm [req])
noNextRequests :: forall (stm :: * -> *) req peerAddr.
Applicative stm =>
TemperatureBundle (ConnectionId peerAddr -> stm [req])
noNextRequests = (ConnectionId peerAddr -> stm [req])
-> TemperatureBundle (ConnectionId peerAddr -> stm [req])
forall a. a -> TemperatureBundle a
forall (f :: * -> *) a. Applicative f => a -> f a
pure ((ConnectionId peerAddr -> stm [req])
 -> TemperatureBundle (ConnectionId peerAddr -> stm [req]))
-> (ConnectionId peerAddr -> stm [req])
-> TemperatureBundle (ConnectionId peerAddr -> stm [req])
forall a b. (a -> b) -> a -> b
$ \ConnectionId peerAddr
_ -> [req] -> stm [req]
forall a. a -> stm a
forall (f :: * -> *) a. Applicative f => a -> f a
pure []

-- | Next requests bundle for bidirectional and unidirectional experiments.
oneshotNextRequests
  :: forall req peerAddr m. MonadSTM m
  => ClientAndServerData req
  -> m (TemperatureBundle (ConnectionId peerAddr -> STM m [req]))
oneshotNextRequests :: forall req peerAddr (m :: * -> *).
MonadSTM m =>
ClientAndServerData req
-> m (TemperatureBundle (ConnectionId peerAddr -> STM m [req]))
oneshotNextRequests ClientAndServerData {
                      [[req]]
hotInitiatorRequests :: forall req. ClientAndServerData req -> [[req]]
hotInitiatorRequests :: [[req]]
hotInitiatorRequests,
                      [[req]]
warmInitiatorRequests :: forall req. ClientAndServerData req -> [[req]]
warmInitiatorRequests :: [[req]]
warmInitiatorRequests,
                      [[req]]
establishedInitiatorRequests :: forall req. ClientAndServerData req -> [[req]]
establishedInitiatorRequests :: [[req]]
establishedInitiatorRequests
                    } = do
    -- we pass a `StrictTVar` with all the requests to each initiator.  This way
    -- the each round (which runs a single instance of `ReqResp` protocol) will
    -- use its own request list.
    hotRequestsVar         <- [[req]] -> m (StrictTVar m [[req]])
forall (m :: * -> *) a. MonadSTM m => a -> m (StrictTVar m a)
newTVarIO [[req]]
hotInitiatorRequests
    warmRequestsVar        <- newTVarIO warmInitiatorRequests
    establishedRequestsVar <- newTVarIO establishedInitiatorRequests
    return $ TemperatureBundle
               (WithHot hotRequestsVar)
               (WithWarm warmRequestsVar)
               (WithEstablished establishedRequestsVar)
              <&> \ StrictTVar m [[req]]
reqVar ConnectionId peerAddr
_ -> StrictTVar m [[req]] -> STM m [req]
forall {m :: * -> *} {a}.
MonadSTM m =>
StrictTVar m [[a]] -> STM m [a]
popRequests StrictTVar m [[req]]
reqVar
  where
    popRequests :: StrictTVar m [[a]] -> STM m [a]
popRequests StrictTVar m [[a]]
requestsVar = do
      requests <- StrictTVar m [[a]] -> STM m [[a]]
forall (m :: * -> *) a. MonadSTM m => StrictTVar m a -> STM m a
readTVar StrictTVar m [[a]]
requestsVar
      case requests of
        [a]
reqs : [[a]]
rest -> StrictTVar m [[a]] -> [[a]] -> STM m ()
forall (m :: * -> *) a.
MonadSTM m =>
StrictTVar m a -> a -> STM m ()
writeTVar StrictTVar m [[a]]
requestsVar [[a]]
rest STM m () -> [a] -> STM m [a]
forall (f :: * -> *) a b. Functor f => f a -> b -> f b
$> [a]
reqs
        []          -> [a] -> STM m [a]
forall a. a -> STM m a
forall (f :: * -> *) a. Applicative f => a -> f a
pure []


--
-- Various ConnectionManagers
--

type ConnectionManagerMonad m =
       ( Alternative (STM m), MonadAsync m, MonadCatch m, MonadEvaluate m,
         MonadFork m, MonadMask  m, MonadST m, MonadTime m, MonadTimer m,
         MonadThrow m, MonadThrow (STM m)
       )


withInitiatorOnlyConnectionManager
    :: forall name peerAddr socket req resp m a.
       ( ConnectionManagerMonad m

       , resp ~ [req]
       , Ord peerAddr, Show peerAddr, Typeable peerAddr
       , Serialise req, Typeable req
       , MonadAsync m
       , MonadDelay m
       , MonadFix m
       , MonadLabelledSTM m
       , MonadTraceSTM m
       , MonadSay m, Show req
       , Show name
       )
    => name
    -- ^ identifier (for logging)
    -> Timeouts
    -> Tracer m (WithName name (AbstractTransitionTrace CM.ConnStateId))
    -> Tracer m (WithName name
                          (CM.Trace
                            peerAddr
                            (ConnectionHandlerTrace UnversionedProtocol DataFlowProtocolData)))
    -> StdGen
    -> Snocket m socket peerAddr
    -> Mx.MakeBearer m socket
    -- ^ series of request possible to do with the bidirectional connection
    -- manager towards some peer.
    -> CM.ConnStateIdSupply m
    -> Maybe peerAddr
    -> TemperatureBundle (ConnectionId peerAddr -> STM m [req])
    -- ^ Functions to get the next requests for a given connection
    -> ProtocolTimeLimits (Handshake UnversionedProtocol Term)
    -- ^ Handshake time limits
    -> AcceptedConnectionsLimit
    -> (ConnectionManagerWithExpandedCtx
          Mx.InitiatorMode socket peerAddr
          DataFlowProtocolData UnversionedProtocol ByteString m [resp] Void
       -> m a)
    -> m a
withInitiatorOnlyConnectionManager :: forall name peerAddr socket req resp (m :: * -> *) a.
(ConnectionManagerMonad m, resp ~ [req], Ord peerAddr,
 Show peerAddr, Typeable peerAddr, Serialise req, Typeable req,
 MonadAsync m, MonadDelay m, MonadFix m, MonadLabelledSTM m,
 MonadTraceSTM m, MonadSay m, Show req, Show name) =>
name
-> Timeouts
-> Tracer m (WithName name (AbstractTransitionTrace ConnStateId))
-> Tracer
     m
     (WithName
        name
        (Trace
           peerAddr
           (ConnectionHandlerTrace UnversionedProtocol DataFlowProtocolData)))
-> StdGen
-> Snocket m socket peerAddr
-> MakeBearer m socket
-> ConnStateIdSupply m
-> Maybe peerAddr
-> TemperatureBundle (ConnectionId peerAddr -> STM m [req])
-> ProtocolTimeLimits (Handshake UnversionedProtocol Term)
-> AcceptedConnectionsLimit
-> (ConnectionManagerWithExpandedCtx
      'InitiatorMode
      socket
      peerAddr
      DataFlowProtocolData
      UnversionedProtocol
      ByteString
      m
      [resp]
      Void
    -> m a)
-> m a
withInitiatorOnlyConnectionManager name
name Timeouts
timeouts Tracer m (WithName name (AbstractTransitionTrace ConnStateId))
trTracer Tracer
  m
  (WithName
     name
     (Trace
        peerAddr
        (ConnectionHandlerTrace UnversionedProtocol DataFlowProtocolData)))
tracer StdGen
stdGen Snocket m socket peerAddr
snocket MakeBearer m socket
makeBearer ConnStateIdSupply m
connStateIdSupply
                                   Maybe peerAddr
localAddr TemperatureBundle (ConnectionId peerAddr -> STM m [req])
nextRequests ProtocolTimeLimits (Handshake UnversionedProtocol Term)
handshakeTimeLimits AcceptedConnectionsLimit
acceptedConnLimit ConnectionManagerWithExpandedCtx
  'InitiatorMode
  socket
  peerAddr
  DataFlowProtocolData
  UnversionedProtocol
  ByteString
  m
  [resp]
  Void
-> m a
k = do
    mainThreadId <- m (ThreadId m)
forall (m :: * -> *). MonadThread m => m (ThreadId m)
myThreadId
    let muxTracer = (name
name,) (WithBearer (ConnectionId peerAddr) Trace
 -> (name, WithBearer (ConnectionId peerAddr) Trace))
-> Tracer m (name, WithBearer (ConnectionId peerAddr) Trace)
-> Tracer m (WithBearer (ConnectionId peerAddr) Trace)
forall a' a. (a' -> a) -> Tracer m a -> Tracer m a'
forall (f :: * -> *) a' a.
Contravariant f =>
(a' -> a) -> f a -> f a'
`contramap` Tracer m (name, WithBearer (ConnectionId peerAddr) Trace)
forall (m :: * -> *) a. Applicative m => Tracer m a
nullTracer -- mux tracer
    CM.with
      CM.Arguments {
          -- ConnectionManagerTrace
          CM.tracer    = WithName name
                        `contramap` tracer,
          CM.trTracer  = (WithName name . fmap CM.abstractState)
                        `contramap` trTracer,
         -- MuxTracer
          CM.muxTracer = muxTracer,
          CM.ipv4Address = localAddr,
          CM.ipv6Address = Nothing,
          CM.addressType = \peerAddr
_ -> AddressType -> Maybe AddressType
forall a. a -> Maybe a
Just AddressType
IPv4Address,
          CM.snocket = snocket,
          CM.makeBearer = makeBearer,
          CM.configureSocket = \socket
_ Maybe peerAddr
_ -> () -> m ()
forall a. a -> m a
forall (m :: * -> *) a. Monad m => a -> m a
return (),
          CM.connectionDataFlow = \(DataFlowProtocolData DataFlow
df PeerSharing
_) -> DataFlow
df,
          CM.prunePolicy = simplePrunePolicy,
          CM.stdGen,
          CM.connectionsLimits = acceptedConnLimit,
          CM.timeWaitTimeout = tTimeWaitTimeout timeouts,
          CM.outboundIdleTimeout = tOutboundIdleTimeout timeouts,
          CM.updateVersionData = \DataFlowProtocolData
a DiffusionMode
_ -> DataFlowProtocolData
a,
          CM.connStateIdSupply
        }
      (makeConnectionHandler
        muxTracer
        SingInitiatorMode
        HandshakeArguments {
            -- TraceSendRecv
            haHandshakeTracer = (name,) `contramap` nullTracer,
            haHandshakeCodec = unversionedHandshakeCodec,
            haVersionDataCodec = cborTermVersionDataCodec dataFlowProtocolDataCodec,
            haAcceptVersion = acceptableVersion,
            haQueryVersion = queryVersion,
            haTimeLimits = handshakeTimeLimits
          }
        (dataFlowProtocol Unidirectional clientApplication)
        (mainThreadId, debugMuxErrorRethrowPolicy
                    <> debugMuxRuntimeErrorRethrowPolicy
                    <> debugIOErrorRethrowPolicy
                    <> assertRethrowPolicy))
      (\HandleError 'InitiatorMode UnversionedProtocol
_ -> HandleErrorType
HandshakeFailure)
      NotInResponderMode
      (\ConnectionManagerWithExpandedCtx
  'InitiatorMode
  socket
  peerAddr
  DataFlowProtocolData
  UnversionedProtocol
  ByteString
  m
  [resp]
  Void
cm ->
        ConnectionManagerWithExpandedCtx
  'InitiatorMode
  socket
  peerAddr
  DataFlowProtocolData
  UnversionedProtocol
  ByteString
  m
  [resp]
  Void
-> m a
k ConnectionManagerWithExpandedCtx
  'InitiatorMode
  socket
  peerAddr
  DataFlowProtocolData
  UnversionedProtocol
  ByteString
  m
  [resp]
  Void
cm m a -> (SomeException -> m a) -> m a
forall e a. Exception e => m a -> (e -> m a) -> m a
forall (m :: * -> *) e a.
(MonadCatch m, Exception e) =>
m a -> (e -> m a) -> m a
`catch` \(SomeException
e :: SomeException) -> SomeException -> m a
forall e a. Exception e => e -> m a
forall (m :: * -> *) e a. (MonadThrow m, Exception e) => e -> m a
throwIO SomeException
e)
  where
    clientApplication :: TemperatureBundle
                           [MiniProtocol Mx.InitiatorMode
                                         (ExpandedInitiatorContext peerAddr m)
                                         (ResponderContext peerAddr)
                                         ByteString m [resp] Void]
    clientApplication :: OuroborosBundle
  'InitiatorMode
  (ExpandedInitiatorContext peerAddr m)
  (ResponderContext peerAddr)
  ByteString
  m
  [resp]
  Void
clientApplication = MiniProtocolNum
-> (ConnectionId peerAddr -> STM m [req])
-> [MiniProtocol
      'InitiatorMode
      (ExpandedInitiatorContext peerAddr m)
      (ResponderContext peerAddr)
      ByteString
      m
      [resp]
      Void]
mkProto (MiniProtocolNum
 -> (ConnectionId peerAddr -> STM m [req])
 -> [MiniProtocol
       'InitiatorMode
       (ExpandedInitiatorContext peerAddr m)
       (ResponderContext peerAddr)
       ByteString
       m
       [resp]
       Void])
-> TemperatureBundle MiniProtocolNum
-> TemperatureBundle
     ((ConnectionId peerAddr -> STM m [req])
      -> [MiniProtocol
            'InitiatorMode
            (ExpandedInitiatorContext peerAddr m)
            (ResponderContext peerAddr)
            ByteString
            m
            [resp]
            Void])
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> (Word16 -> MiniProtocolNum
Mx.MiniProtocolNum (Word16 -> MiniProtocolNum)
-> TemperatureBundle Word16 -> TemperatureBundle MiniProtocolNum
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> TemperatureBundle Word16
nums)
                                TemperatureBundle
  ((ConnectionId peerAddr -> STM m [req])
   -> [MiniProtocol
         'InitiatorMode
         (ExpandedInitiatorContext peerAddr m)
         (ResponderContext peerAddr)
         ByteString
         m
         [resp]
         Void])
-> TemperatureBundle (ConnectionId peerAddr -> STM m [req])
-> OuroborosBundle
     'InitiatorMode
     (ExpandedInitiatorContext peerAddr m)
     (ResponderContext peerAddr)
     ByteString
     m
     [resp]
     Void
forall a b.
TemperatureBundle (a -> b)
-> TemperatureBundle a -> TemperatureBundle b
forall (f :: * -> *) a b. Applicative f => f (a -> b) -> f a -> f b
<*> TemperatureBundle (ConnectionId peerAddr -> STM m [req])
nextRequests

      where nums :: TemperatureBundle Word16
nums = WithProtocolTemperature 'Hot Word16
-> WithProtocolTemperature 'Warm Word16
-> WithProtocolTemperature 'Established Word16
-> TemperatureBundle Word16
forall a.
WithProtocolTemperature 'Hot a
-> WithProtocolTemperature 'Warm a
-> WithProtocolTemperature 'Established a
-> TemperatureBundle a
TemperatureBundle (Word16 -> WithProtocolTemperature 'Hot Word16
forall a. a -> WithProtocolTemperature 'Hot a
WithHot Word16
1) (Word16 -> WithProtocolTemperature 'Warm Word16
forall a. a -> WithProtocolTemperature 'Warm a
WithWarm Word16
2) (Word16 -> WithProtocolTemperature 'Established Word16
forall a. a -> WithProtocolTemperature 'Established a
WithEstablished Word16
3)
            mkProto :: MiniProtocolNum
-> (ConnectionId peerAddr -> STM m [req])
-> [MiniProtocol
      'InitiatorMode
      (ExpandedInitiatorContext peerAddr m)
      (ResponderContext peerAddr)
      ByteString
      m
      [resp]
      Void]
mkProto MiniProtocolNum
miniProtocolNum ConnectionId peerAddr -> STM m [req]
nextRequest =
              [MiniProtocol {
                  MiniProtocolNum
miniProtocolNum :: MiniProtocolNum
miniProtocolNum :: MiniProtocolNum
miniProtocolNum,
                  miniProtocolLimits :: MiniProtocolLimits
miniProtocolLimits = Int -> MiniProtocolLimits
Mx.MiniProtocolLimits Int
forall a. Bounded a => a
maxBound,
                  miniProtocolRun :: RunMiniProtocol
  'InitiatorMode
  (ExpandedInitiatorContext peerAddr m)
  (ResponderContext peerAddr)
  ByteString
  m
  [resp]
  Void
miniProtocolRun = MiniProtocolNum
-> (ConnectionId peerAddr -> STM m [req])
-> RunMiniProtocol
     'InitiatorMode
     (ExpandedInitiatorContext peerAddr m)
     (ResponderContext peerAddr)
     ByteString
     m
     [resp]
     Void
reqRespInitiator MiniProtocolNum
miniProtocolNum
                                                     ConnectionId peerAddr -> STM m [req]
nextRequest
                }]

    reqRespInitiator :: Mx.MiniProtocolNum
                     -> (ConnectionId peerAddr -> STM m [req])
                     -> RunMiniProtocol Mx.InitiatorMode
                                        (ExpandedInitiatorContext peerAddr m)
                                        (ResponderContext peerAddr)
                                        ByteString m [resp] Void
    reqRespInitiator :: MiniProtocolNum
-> (ConnectionId peerAddr -> STM m [req])
-> RunMiniProtocol
     'InitiatorMode
     (ExpandedInitiatorContext peerAddr m)
     (ResponderContext peerAddr)
     ByteString
     m
     [resp]
     Void
reqRespInitiator MiniProtocolNum
protocolNum ConnectionId peerAddr -> STM m [req]
nextRequest =
      MiniProtocolCb
  (ExpandedInitiatorContext peerAddr m) ByteString m [resp]
-> RunMiniProtocol
     'InitiatorMode
     (ExpandedInitiatorContext peerAddr m)
     (ResponderContext peerAddr)
     ByteString
     m
     [resp]
     Void
forall initiatorCtx bytes (m :: * -> *) a responderCtx.
MiniProtocolCb initiatorCtx bytes m a
-> RunMiniProtocol
     'InitiatorMode initiatorCtx responderCtx bytes m a Void
InitiatorProtocolOnly
        ((ExpandedInitiatorContext peerAddr m
 -> Channel m ByteString -> m ([resp], Maybe ByteString))
-> MiniProtocolCb
     (ExpandedInitiatorContext peerAddr m) ByteString m [resp]
forall ctx bytes (m :: * -> *) a.
(ctx -> Channel m bytes -> m (a, Maybe bytes))
-> MiniProtocolCb ctx bytes m a
MiniProtocolCb ((ExpandedInitiatorContext peerAddr m
  -> Channel m ByteString -> m ([resp], Maybe ByteString))
 -> MiniProtocolCb
      (ExpandedInitiatorContext peerAddr m) ByteString m [resp])
-> (ExpandedInitiatorContext peerAddr m
    -> Channel m ByteString -> m ([resp], Maybe ByteString))
-> MiniProtocolCb
     (ExpandedInitiatorContext peerAddr m) ByteString m [resp]
forall a b. (a -> b) -> a -> b
$ \ExpandedInitiatorContext { eicConnectionId :: forall addr (m :: * -> *).
ExpandedInitiatorContext addr m -> ConnectionId addr
eicConnectionId = ConnectionId peerAddr
connId } Channel m ByteString
channel ->
           Tracer m (TraceSendRecv (ReqResp req resp))
-> Codec (ReqResp req resp) DeserialiseFailure m ByteString
-> ProtocolSizeLimits (ReqResp req resp) ByteString
-> ProtocolTimeLimits (ReqResp req resp)
-> Channel m ByteString
-> Peer (ReqResp req resp) 'AsClient 'NonPipelined 'StIdle m [resp]
-> m ([resp], Maybe ByteString)
forall ps (st :: ps) (pr :: PeerRole) failure bytes (m :: * -> *)
       a.
(MonadAsync m, MonadFork m, MonadMask m, MonadThrow (STM m),
 MonadTimer m, ShowProxy ps,
 forall (st' :: ps) stok. (stok ~ StateToken st') => Show stok,
 Show failure) =>
Tracer m (TraceSendRecv ps)
-> Codec ps failure m bytes
-> ProtocolSizeLimits ps bytes
-> ProtocolTimeLimits ps
-> Channel m bytes
-> Peer ps pr 'NonPipelined st m a
-> m (a, Maybe bytes)
runPeerWithLimits
             ((name, String, MiniProtocolNum)
-> TraceSendRecv (ReqResp req resp)
-> WithName
     (name, String, MiniProtocolNum) (TraceSendRecv (ReqResp req resp))
forall name event. name -> event -> WithName name event
WithName (name
name,String
"Initiator",MiniProtocolNum
protocolNum) (TraceSendRecv (ReqResp req resp)
 -> WithName
      (name, String, MiniProtocolNum) (TraceSendRecv (ReqResp req resp)))
-> Tracer
     m
     (WithName
        (name, String, MiniProtocolNum) (TraceSendRecv (ReqResp req resp)))
-> Tracer m (TraceSendRecv (ReqResp req resp))
forall a' a. (a' -> a) -> Tracer m a -> Tracer m a'
forall (f :: * -> *) a' a.
Contravariant f =>
(a' -> a) -> f a -> f a'
`contramap` Tracer
  m
  (WithName
     (name, String, MiniProtocolNum) (TraceSendRecv (ReqResp req resp)))
forall (m :: * -> *) a. Applicative m => Tracer m a
nullTracer)
             -- TraceSendRecv
             Codec (ReqResp req resp) DeserialiseFailure m ByteString
forall req resp (m :: * -> *).
(MonadST m, Serialise req, Serialise resp) =>
Codec (ReqResp req resp) DeserialiseFailure m ByteString
codecReqResp
             ProtocolSizeLimits (ReqResp req resp) ByteString
forall {k} {k1} (req :: k) (resp :: k1).
ProtocolSizeLimits (ReqResp req resp) ByteString
reqRespSizeLimits
             ProtocolTimeLimits (ReqResp req resp)
forall {k} {k1} (req :: k) (resp :: k1).
ProtocolTimeLimits (ReqResp req resp)
reqRespTimeLimits
             Channel m ByteString
channel
             (m (Peer
     (ReqResp req resp) 'AsClient 'NonPipelined 'StIdle m [resp])
-> Peer (ReqResp req resp) 'AsClient 'NonPipelined 'StIdle m [resp]
forall ps (pl :: IsPipelined) (st :: ps) (m :: * -> *) a.
m (Client ps pl st m a) -> Client ps pl st m a
Effect (m (Peer
      (ReqResp req resp) 'AsClient 'NonPipelined 'StIdle m [resp])
 -> Peer
      (ReqResp req resp) 'AsClient 'NonPipelined 'StIdle m [resp])
-> m (Peer
        (ReqResp req resp) 'AsClient 'NonPipelined 'StIdle m [resp])
-> Peer (ReqResp req resp) 'AsClient 'NonPipelined 'StIdle m [resp]
forall a b. (a -> b) -> a -> b
$ do
               reqs <- STM m [req] -> m [req]
forall a. HasCallStack => STM m a -> m a
forall (m :: * -> *) a.
(MonadSTM m, HasCallStack) =>
STM m a -> m a
atomically (ConnectionId peerAddr -> STM m [req]
nextRequest ConnectionId peerAddr
connId)
               pure $ reqRespClientPeer (reqRespClientMap reqs)))


--
-- Rethrow policies
--

debugMuxErrorRethrowPolicy :: RethrowPolicy
debugMuxErrorRethrowPolicy :: RethrowPolicy
debugMuxErrorRethrowPolicy =
    (ErrorContext -> Error -> ErrorCommand) -> RethrowPolicy
forall e.
Exception e =>
(ErrorContext -> e -> ErrorCommand) -> RethrowPolicy
mkRethrowPolicy ((ErrorContext -> Error -> ErrorCommand) -> RethrowPolicy)
-> (ErrorContext -> Error -> ErrorCommand) -> RethrowPolicy
forall a b. (a -> b) -> a -> b
$
    \ErrorContext
_ Error
e ->
        case Error
e of
          Mx.IOException {}  -> ErrorCommand
ShutdownPeer
          Mx.BearerClosed {} -> ErrorCommand
ShutdownPeer
          Error
Mx.SDUReadTimeout  -> ErrorCommand
ShutdownPeer
          Error
Mx.SDUWriteTimeout -> ErrorCommand
ShutdownPeer
          Error
_                  -> ErrorCommand
ShutdownNode

debugMuxRuntimeErrorRethrowPolicy :: RethrowPolicy
debugMuxRuntimeErrorRethrowPolicy :: RethrowPolicy
debugMuxRuntimeErrorRethrowPolicy =
    (ErrorContext -> RuntimeError -> ErrorCommand) -> RethrowPolicy
forall e.
Exception e =>
(ErrorContext -> e -> ErrorCommand) -> RethrowPolicy
mkRethrowPolicy ((ErrorContext -> RuntimeError -> ErrorCommand) -> RethrowPolicy)
-> (ErrorContext -> RuntimeError -> ErrorCommand) -> RethrowPolicy
forall a b. (a -> b) -> a -> b
$
      \ErrorContext
_ (RuntimeError
_ :: Mx.RuntimeError) -> ErrorCommand
ShutdownPeer

debugIOErrorRethrowPolicy :: RethrowPolicy
debugIOErrorRethrowPolicy :: RethrowPolicy
debugIOErrorRethrowPolicy =
    (ErrorContext -> IOError -> ErrorCommand) -> RethrowPolicy
forall e.
Exception e =>
(ErrorContext -> e -> ErrorCommand) -> RethrowPolicy
mkRethrowPolicy ((ErrorContext -> IOError -> ErrorCommand) -> RethrowPolicy)
-> (ErrorContext -> IOError -> ErrorCommand) -> RethrowPolicy
forall a b. (a -> b) -> a -> b
$
      \ErrorContext
_ (IOError
_ :: IOError) -> ErrorCommand
ShutdownPeer


assertRethrowPolicy :: RethrowPolicy
assertRethrowPolicy :: RethrowPolicy
assertRethrowPolicy =
    (ErrorContext -> AssertionFailed -> ErrorCommand) -> RethrowPolicy
forall e.
Exception e =>
(ErrorContext -> e -> ErrorCommand) -> RethrowPolicy
mkRethrowPolicy ((ErrorContext -> AssertionFailed -> ErrorCommand)
 -> RethrowPolicy)
-> (ErrorContext -> AssertionFailed -> ErrorCommand)
-> RethrowPolicy
forall a b. (a -> b) -> a -> b
$
      \ErrorContext
_ (AssertionFailed
_ :: AssertionFailed) -> ErrorCommand
ShutdownNode


-- | Runs an example server which runs a single 'ReqResp' protocol for any hot
-- \/ warm \/ established peers and also gives access to bidirectional
-- 'ConnectionManager'.  This gives a way to connect to other peers.
-- Slightly unfortunate design decision does not give us a way to create
-- a client per connection.  This means that this connection manager takes list
-- of 'req' type which it will make to the other side (they will be multiplexed
-- across warm \/ how \/ established) protocols.
--
withBidirectionalConnectionManager
    :: forall name peerAddr socket acc req resp m a.
       ( ConnectionManagerMonad m

       , acc ~ [req], resp ~ [req]
       , Ord peerAddr, Show peerAddr, Typeable peerAddr
       , Serialise req, Typeable req

       -- debugging
       , MonadAsync m
       , MonadDelay m
       , MonadFix m
       , MonadLabelledSTM m
       , MonadTraceSTM m
       , MonadSay m, Show req
       , Show name
       )
    => name
    -> Timeouts
    -- ^ identifier (for logging)
    -> Tracer m (WithName name (RemoteTransitionTrace peerAddr))
    -> Tracer m (WithName name (AbstractTransitionTrace CM.ConnStateId))
    -> Tracer m (WithName name
                          (CM.Trace
                            peerAddr
                            (ConnectionHandlerTrace UnversionedProtocol DataFlowProtocolData)))
    -> Tracer m (WithName name (InboundGovernor.Trace peerAddr))
    -> Tracer m (WithName name (InboundGovernor.Debug peerAddr DataFlowProtocolData))
    -> StdGen
    -> Snocket m socket peerAddr
    -> Mx.MakeBearer m socket
    -> CM.ConnStateIdSupply m
    -> (socket -> m ()) -- ^ configure socket
    -> socket
    -- ^ listening socket
    -> Maybe peerAddr
    -> acc
    -- ^ Initial state for the server
    -> TemperatureBundle (ConnectionId peerAddr -> STM m [req])
    -- ^ Functions to get the next requests for a given connection
    -- ^ series of request possible to do with the bidirectional connection
    -- manager towards some peer.
    -> ProtocolTimeLimits (Handshake UnversionedProtocol Term)
    -- ^ Handshake time limits
    -> AcceptedConnectionsLimit
    -> (ConnectionManagerWithExpandedCtx
          Mx.InitiatorResponderMode socket peerAddr
          DataFlowProtocolData UnversionedProtocol ByteString m [resp] acc
       -> peerAddr
       -> Async m Void
       -> m a)
    -> m a
withBidirectionalConnectionManager :: forall name peerAddr socket acc req resp (m :: * -> *) a.
(ConnectionManagerMonad m, acc ~ [req], resp ~ [req], Ord peerAddr,
 Show peerAddr, Typeable peerAddr, Serialise req, Typeable req,
 MonadAsync m, MonadDelay m, MonadFix m, MonadLabelledSTM m,
 MonadTraceSTM m, MonadSay m, Show req, Show name) =>
name
-> Timeouts
-> Tracer m (WithName name (RemoteTransitionTrace peerAddr))
-> Tracer m (WithName name (AbstractTransitionTrace ConnStateId))
-> Tracer
     m
     (WithName
        name
        (Trace
           peerAddr
           (ConnectionHandlerTrace UnversionedProtocol DataFlowProtocolData)))
-> Tracer m (WithName name (Trace peerAddr))
-> Tracer m (WithName name (Debug peerAddr DataFlowProtocolData))
-> StdGen
-> Snocket m socket peerAddr
-> MakeBearer m socket
-> ConnStateIdSupply m
-> (socket -> m ())
-> socket
-> Maybe peerAddr
-> acc
-> TemperatureBundle (ConnectionId peerAddr -> STM m [req])
-> ProtocolTimeLimits (Handshake UnversionedProtocol Term)
-> AcceptedConnectionsLimit
-> (ConnectionManagerWithExpandedCtx
      'InitiatorResponderMode
      socket
      peerAddr
      DataFlowProtocolData
      UnversionedProtocol
      ByteString
      m
      [resp]
      acc
    -> peerAddr -> Async m Void -> m a)
-> m a
withBidirectionalConnectionManager name
name Timeouts
timeouts
                                   Tracer m (WithName name (RemoteTransitionTrace peerAddr))
inboundTrTracer Tracer m (WithName name (AbstractTransitionTrace ConnStateId))
trTracer
                                   Tracer
  m
  (WithName
     name
     (Trace
        peerAddr
        (ConnectionHandlerTrace UnversionedProtocol DataFlowProtocolData)))
tracer Tracer m (WithName name (Trace peerAddr))
inboundTracer Tracer m (WithName name (Debug peerAddr DataFlowProtocolData))
debugTracer
                                   StdGen
stdGen
                                   Snocket m socket peerAddr
snocket MakeBearer m socket
makeBearer ConnStateIdSupply m
connStateIdSupply
                                   socket -> m ()
confSock socket
socket
                                   Maybe peerAddr
localAddress
                                   acc
accumulatorInit TemperatureBundle (ConnectionId peerAddr -> STM m [req])
nextRequests
                                   ProtocolTimeLimits (Handshake UnversionedProtocol Term)
handshakeTimeLimits
                                   AcceptedConnectionsLimit
acceptedConnLimit ConnectionManagerWithExpandedCtx
  'InitiatorResponderMode
  socket
  peerAddr
  DataFlowProtocolData
  UnversionedProtocol
  ByteString
  m
  [resp]
  acc
-> peerAddr -> Async m Void -> m a
k = do
    mainThreadId <- m (ThreadId m)
forall (m :: * -> *). MonadThread m => m (ThreadId m)
myThreadId
    inbgovInfoChannel <- newInformationChannel
    let muxTracer = name
-> WithBearer (ConnectionId peerAddr) Trace
-> WithName name (WithBearer (ConnectionId peerAddr) Trace)
forall name event. name -> event -> WithName name event
WithName name
name (WithBearer (ConnectionId peerAddr) Trace
 -> WithName name (WithBearer (ConnectionId peerAddr) Trace))
-> Tracer
     m (WithName name (WithBearer (ConnectionId peerAddr) Trace))
-> Tracer m (WithBearer (ConnectionId peerAddr) Trace)
forall a' a. (a' -> a) -> Tracer m a -> Tracer m a'
forall (f :: * -> *) a' a.
Contravariant f =>
(a' -> a) -> f a -> f a'
`contramap` Tracer m (WithName name (WithBearer (ConnectionId peerAddr) Trace))
forall (m :: * -> *) a. Applicative m => Tracer m a
nullTracer -- mux tracer

    CM.with
      CM.Arguments {
          -- ConnectionManagerTrace
          CM.tracer    = WithName name
                        `contramap` tracer,
          CM.trTracer  = (WithName name . fmap CM.abstractState)
                        `contramap` trTracer,
          -- MuxTracer
          CM.muxTracer    = muxTracer,
          CM.ipv4Address  = localAddress,
          CM.ipv6Address  = Nothing,
          CM.addressType  = \peerAddr
_ -> AddressType -> Maybe AddressType
forall a. a -> Maybe a
Just AddressType
IPv4Address,
          CM.snocket      = snocket,
          CM.makeBearer   = makeBearer,
          CM.configureSocket = \socket
sock Maybe peerAddr
_ -> socket -> m ()
confSock socket
sock,
          CM.timeWaitTimeout = tTimeWaitTimeout timeouts,
          CM.outboundIdleTimeout = tOutboundIdleTimeout timeouts,
          CM.connectionDataFlow = \(DataFlowProtocolData DataFlow
df PeerSharing
_) -> DataFlow
df,
          CM.prunePolicy = simplePrunePolicy,
          CM.stdGen,
          CM.connectionsLimits = acceptedConnLimit,
          CM.updateVersionData = \DataFlowProtocolData
versionData DiffusionMode
diffusionMode ->
                                  DataFlowProtocolData
versionData { getProtocolDataFlow =
                                                  case diffusionMode of
                                                    DiffusionMode
InitiatorOnlyDiffusionMode         -> DataFlow
Unidirectional
                                                    DiffusionMode
InitiatorAndResponderDiffusionMode -> DataFlow
Duplex
                                              },
          CM.connStateIdSupply
        }
        (makeConnectionHandler
          muxTracer
          SingInitiatorResponderMode
          HandshakeArguments {
              -- TraceSendRecv
              haHandshakeTracer = WithName name `contramap` nullTracer,
              haHandshakeCodec = unversionedHandshakeCodec,
              haVersionDataCodec = cborTermVersionDataCodec dataFlowProtocolDataCodec,
              haAcceptVersion = acceptableVersion,
              haQueryVersion = queryVersion,
              haTimeLimits = handshakeTimeLimits
            }
          (dataFlowProtocol Duplex serverApplication)
          (mainThreadId,   debugMuxErrorRethrowPolicy
                        <> debugMuxRuntimeErrorRethrowPolicy
                        <> debugIOErrorRethrowPolicy
                        <> assertRethrowPolicy))
        (\HandleError 'InitiatorResponderMode UnversionedProtocol
_ -> HandleErrorType
HandshakeFailure)
        (InResponderMode inbgovInfoChannel)
      $ \ConnectionManagerWithExpandedCtx
  'InitiatorResponderMode
  socket
  peerAddr
  DataFlowProtocolData
  UnversionedProtocol
  ByteString
  m
  [resp]
  acc
connectionManager ->
          do
            serverAddr <- Snocket m socket peerAddr -> socket -> m peerAddr
forall (m :: * -> *) fd addr. Snocket m fd addr -> fd -> m addr
Snocket.getLocalAddr Snocket m socket peerAddr
snocket socket
socket
            Server.with
              Server.Arguments {
                  Server.sockets = socket :| [],
                  Server.snocket = snocket,
                  Server.trTracer =
                    WithName name `contramap` inboundTrTracer,
                  Server.tracer =
                    WithName name `contramap` nullTracer, -- ServerTrace
                  Server.debugInboundGovernor =
                    WithName name `contramap` debugTracer,
                  Server.inboundGovernorTracer =
                    WithName name `contramap` inboundTracer, -- InboundGovernorTrace
                  Server.connectionLimits = acceptedConnLimit,
                  Server.connectionManager = connectionManager,
                  Server.connectionDataFlow = \(DataFlowProtocolData DataFlow
df PeerSharing
_) -> DataFlow
df,
                  Server.inboundIdleTimeout = Just (tProtocolIdleTimeout timeouts),
                  Server.inboundInfoChannel = inbgovInfoChannel
                }
              (\Async m Void
inboundGovernorAsync m (PublicState peerAddr DataFlowProtocolData)
_ -> ConnectionManagerWithExpandedCtx
  'InitiatorResponderMode
  socket
  peerAddr
  DataFlowProtocolData
  UnversionedProtocol
  ByteString
  m
  [resp]
  acc
-> peerAddr -> Async m Void -> m a
k ConnectionManagerWithExpandedCtx
  'InitiatorResponderMode
  socket
  peerAddr
  DataFlowProtocolData
  UnversionedProtocol
  ByteString
  m
  [resp]
  acc
connectionManager peerAddr
serverAddr Async m Void
inboundGovernorAsync)
          m a -> (SomeException -> m a) -> m a
forall e a. Exception e => m a -> (e -> m a) -> m a
forall (m :: * -> *) e a.
(MonadCatch m, Exception e) =>
m a -> (e -> m a) -> m a
`catch` \(SomeException
e :: SomeException) -> do
            SomeException -> m a
forall e a. Exception e => e -> m a
forall (m :: * -> *) e a. (MonadThrow m, Exception e) => e -> m a
throwIO SomeException
e
  where
    serverApplication :: TemperatureBundle
                          [MiniProtocol Mx.InitiatorResponderMode
                                        (ExpandedInitiatorContext peerAddr m)
                                        (ResponderContext peerAddr)
                                        ByteString m [resp] acc]
    serverApplication :: OuroborosBundle
  'InitiatorResponderMode
  (ExpandedInitiatorContext peerAddr m)
  (ResponderContext peerAddr)
  ByteString
  m
  [resp]
  acc
serverApplication = MiniProtocolNum
-> (ConnectionId peerAddr -> STM m [req])
-> [MiniProtocol
      'InitiatorResponderMode
      (ExpandedInitiatorContext peerAddr m)
      (ResponderContext peerAddr)
      ByteString
      m
      [resp]
      acc]
mkProto (MiniProtocolNum
 -> (ConnectionId peerAddr -> STM m [req])
 -> [MiniProtocol
       'InitiatorResponderMode
       (ExpandedInitiatorContext peerAddr m)
       (ResponderContext peerAddr)
       ByteString
       m
       [resp]
       acc])
-> TemperatureBundle MiniProtocolNum
-> TemperatureBundle
     ((ConnectionId peerAddr -> STM m [req])
      -> [MiniProtocol
            'InitiatorResponderMode
            (ExpandedInitiatorContext peerAddr m)
            (ResponderContext peerAddr)
            ByteString
            m
            [resp]
            acc])
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> (Word16 -> MiniProtocolNum
Mx.MiniProtocolNum (Word16 -> MiniProtocolNum)
-> TemperatureBundle Word16 -> TemperatureBundle MiniProtocolNum
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> TemperatureBundle Word16
nums) TemperatureBundle
  ((ConnectionId peerAddr -> STM m [req])
   -> [MiniProtocol
         'InitiatorResponderMode
         (ExpandedInitiatorContext peerAddr m)
         (ResponderContext peerAddr)
         ByteString
         m
         [resp]
         acc])
-> TemperatureBundle (ConnectionId peerAddr -> STM m [req])
-> OuroborosBundle
     'InitiatorResponderMode
     (ExpandedInitiatorContext peerAddr m)
     (ResponderContext peerAddr)
     ByteString
     m
     [resp]
     acc
forall a b.
TemperatureBundle (a -> b)
-> TemperatureBundle a -> TemperatureBundle b
forall (f :: * -> *) a b. Applicative f => f (a -> b) -> f a -> f b
<*> TemperatureBundle (ConnectionId peerAddr -> STM m [req])
nextRequests
      where nums :: TemperatureBundle Word16
nums = WithProtocolTemperature 'Hot Word16
-> WithProtocolTemperature 'Warm Word16
-> WithProtocolTemperature 'Established Word16
-> TemperatureBundle Word16
forall a.
WithProtocolTemperature 'Hot a
-> WithProtocolTemperature 'Warm a
-> WithProtocolTemperature 'Established a
-> TemperatureBundle a
TemperatureBundle (Word16 -> WithProtocolTemperature 'Hot Word16
forall a. a -> WithProtocolTemperature 'Hot a
WithHot Word16
1) (Word16 -> WithProtocolTemperature 'Warm Word16
forall a. a -> WithProtocolTemperature 'Warm a
WithWarm Word16
2) (Word16 -> WithProtocolTemperature 'Established Word16
forall a. a -> WithProtocolTemperature 'Established a
WithEstablished Word16
3)
            mkProto :: MiniProtocolNum
-> (ConnectionId peerAddr -> STM m [req])
-> [MiniProtocol
      'InitiatorResponderMode
      (ExpandedInitiatorContext peerAddr m)
      (ResponderContext peerAddr)
      ByteString
      m
      [resp]
      acc]
mkProto MiniProtocolNum
miniProtocolNum ConnectionId peerAddr -> STM m [req]
nextRequest =
              [MiniProtocol {
                  MiniProtocolNum
miniProtocolNum :: MiniProtocolNum
miniProtocolNum :: MiniProtocolNum
miniProtocolNum,
                  miniProtocolLimits :: MiniProtocolLimits
miniProtocolLimits = Int -> MiniProtocolLimits
Mx.MiniProtocolLimits Int
forall a. Bounded a => a
maxBound,
                  miniProtocolRun :: RunMiniProtocol
  'InitiatorResponderMode
  (ExpandedInitiatorContext peerAddr m)
  (ResponderContext peerAddr)
  ByteString
  m
  [resp]
  acc
miniProtocolRun = MiniProtocolNum
-> acc
-> (ConnectionId peerAddr -> STM m [req])
-> RunMiniProtocol
     'InitiatorResponderMode
     (ExpandedInitiatorContext peerAddr m)
     (ResponderContext peerAddr)
     ByteString
     m
     [resp]
     acc
reqRespInitiatorAndResponder
                                        MiniProtocolNum
miniProtocolNum
                                        acc
accumulatorInit
                                        ConnectionId peerAddr -> STM m [req]
nextRequest
              }]

    reqRespInitiatorAndResponder
      :: Mx.MiniProtocolNum
      -> acc
      -> (ConnectionId peerAddr -> STM m [req])
      -> RunMiniProtocol Mx.InitiatorResponderMode
                         (ExpandedInitiatorContext peerAddr m)
                         (ResponderContext peerAddr)
                         ByteString m [resp] acc
    reqRespInitiatorAndResponder :: MiniProtocolNum
-> acc
-> (ConnectionId peerAddr -> STM m [req])
-> RunMiniProtocol
     'InitiatorResponderMode
     (ExpandedInitiatorContext peerAddr m)
     (ResponderContext peerAddr)
     ByteString
     m
     [resp]
     acc
reqRespInitiatorAndResponder MiniProtocolNum
protocolNum acc
accInit ConnectionId peerAddr -> STM m [req]
nextRequest =
      MiniProtocolCb
  (ExpandedInitiatorContext peerAddr m) ByteString m [resp]
-> MiniProtocolCb (ResponderContext peerAddr) ByteString m acc
-> RunMiniProtocol
     'InitiatorResponderMode
     (ExpandedInitiatorContext peerAddr m)
     (ResponderContext peerAddr)
     ByteString
     m
     [resp]
     acc
forall initiatorCtx bytes (m :: * -> *) a responderCtx b.
MiniProtocolCb initiatorCtx bytes m a
-> MiniProtocolCb responderCtx bytes m b
-> RunMiniProtocol
     'InitiatorResponderMode initiatorCtx responderCtx bytes m a b
InitiatorAndResponderProtocol
        ((ExpandedInitiatorContext peerAddr m
 -> Channel m ByteString -> m ([resp], Maybe ByteString))
-> MiniProtocolCb
     (ExpandedInitiatorContext peerAddr m) ByteString m [resp]
forall ctx bytes (m :: * -> *) a.
(ctx -> Channel m bytes -> m (a, Maybe bytes))
-> MiniProtocolCb ctx bytes m a
MiniProtocolCb ((ExpandedInitiatorContext peerAddr m
  -> Channel m ByteString -> m ([resp], Maybe ByteString))
 -> MiniProtocolCb
      (ExpandedInitiatorContext peerAddr m) ByteString m [resp])
-> (ExpandedInitiatorContext peerAddr m
    -> Channel m ByteString -> m ([resp], Maybe ByteString))
-> MiniProtocolCb
     (ExpandedInitiatorContext peerAddr m) ByteString m [resp]
forall a b. (a -> b) -> a -> b
$ \ExpandedInitiatorContext { eicConnectionId :: forall addr (m :: * -> *).
ExpandedInitiatorContext addr m -> ConnectionId addr
eicConnectionId = ConnectionId peerAddr
connId } Channel m ByteString
channel ->
           Tracer m (TraceSendRecv (ReqResp req resp))
-> Codec (ReqResp req resp) DeserialiseFailure m ByteString
-> ProtocolSizeLimits (ReqResp req resp) ByteString
-> ProtocolTimeLimits (ReqResp req resp)
-> Channel m ByteString
-> Peer (ReqResp req resp) 'AsClient 'NonPipelined 'StIdle m [resp]
-> m ([resp], Maybe ByteString)
forall ps (st :: ps) (pr :: PeerRole) failure bytes (m :: * -> *)
       a.
(MonadAsync m, MonadFork m, MonadMask m, MonadThrow (STM m),
 MonadTimer m, ShowProxy ps,
 forall (st' :: ps) stok. (stok ~ StateToken st') => Show stok,
 Show failure) =>
Tracer m (TraceSendRecv ps)
-> Codec ps failure m bytes
-> ProtocolSizeLimits ps bytes
-> ProtocolTimeLimits ps
-> Channel m bytes
-> Peer ps pr 'NonPipelined st m a
-> m (a, Maybe bytes)
runPeerWithLimits
             ((name, String, MiniProtocolNum)
-> TraceSendRecv (ReqResp req resp)
-> WithName
     (name, String, MiniProtocolNum) (TraceSendRecv (ReqResp req resp))
forall name event. name -> event -> WithName name event
WithName (name
name,String
"Initiator",MiniProtocolNum
protocolNum) (TraceSendRecv (ReqResp req resp)
 -> WithName
      (name, String, MiniProtocolNum) (TraceSendRecv (ReqResp req resp)))
-> Tracer
     m
     (WithName
        (name, String, MiniProtocolNum) (TraceSendRecv (ReqResp req resp)))
-> Tracer m (TraceSendRecv (ReqResp req resp))
forall a' a. (a' -> a) -> Tracer m a -> Tracer m a'
forall (f :: * -> *) a' a.
Contravariant f =>
(a' -> a) -> f a -> f a'
`contramap` Tracer
  m
  (WithName
     (name, String, MiniProtocolNum) (TraceSendRecv (ReqResp req resp)))
forall (m :: * -> *) a. Applicative m => Tracer m a
nullTracer)
             -- TraceSendRecv
             Codec (ReqResp req resp) DeserialiseFailure m ByteString
forall req resp (m :: * -> *).
(MonadST m, Serialise req, Serialise resp) =>
Codec (ReqResp req resp) DeserialiseFailure m ByteString
codecReqResp
             ProtocolSizeLimits (ReqResp req resp) ByteString
forall {k} {k1} (req :: k) (resp :: k1).
ProtocolSizeLimits (ReqResp req resp) ByteString
reqRespSizeLimits
             ProtocolTimeLimits (ReqResp req resp)
forall {k} {k1} (req :: k) (resp :: k1).
ProtocolTimeLimits (ReqResp req resp)
reqRespTimeLimits
             Channel m ByteString
channel
             (m (Peer
     (ReqResp req resp) 'AsClient 'NonPipelined 'StIdle m [resp])
-> Peer (ReqResp req resp) 'AsClient 'NonPipelined 'StIdle m [resp]
forall ps (pl :: IsPipelined) (st :: ps) (m :: * -> *) a.
m (Client ps pl st m a) -> Client ps pl st m a
Effect (m (Peer
      (ReqResp req resp) 'AsClient 'NonPipelined 'StIdle m [resp])
 -> Peer
      (ReqResp req resp) 'AsClient 'NonPipelined 'StIdle m [resp])
-> m (Peer
        (ReqResp req resp) 'AsClient 'NonPipelined 'StIdle m [resp])
-> Peer (ReqResp req resp) 'AsClient 'NonPipelined 'StIdle m [resp]
forall a b. (a -> b) -> a -> b
$ do
               reqs <- STM m [req] -> m [req]
forall a. HasCallStack => STM m a -> m a
forall (m :: * -> *) a.
(MonadSTM m, HasCallStack) =>
STM m a -> m a
atomically (ConnectionId peerAddr -> STM m [req]
nextRequest ConnectionId peerAddr
connId)
               pure $ reqRespClientPeer (reqRespClientMap reqs)))
        ((ResponderContext peerAddr
 -> Channel m ByteString -> m (acc, Maybe ByteString))
-> MiniProtocolCb (ResponderContext peerAddr) ByteString m acc
forall ctx bytes (m :: * -> *) a.
(ctx -> Channel m bytes -> m (a, Maybe bytes))
-> MiniProtocolCb ctx bytes m a
MiniProtocolCb ((ResponderContext peerAddr
  -> Channel m ByteString -> m (acc, Maybe ByteString))
 -> MiniProtocolCb (ResponderContext peerAddr) ByteString m acc)
-> (ResponderContext peerAddr
    -> Channel m ByteString -> m (acc, Maybe ByteString))
-> MiniProtocolCb (ResponderContext peerAddr) ByteString m acc
forall a b. (a -> b) -> a -> b
$ \ResponderContext peerAddr
_ctx Channel m ByteString
channel ->
           Tracer m (TraceSendRecv (ReqResp req resp))
-> Codec (ReqResp req resp) DeserialiseFailure m ByteString
-> ProtocolSizeLimits (ReqResp req resp) ByteString
-> ProtocolTimeLimits (ReqResp req resp)
-> Channel m ByteString
-> Peer (ReqResp req resp) 'AsServer 'NonPipelined 'StIdle m acc
-> m (acc, Maybe ByteString)
forall ps (st :: ps) (pr :: PeerRole) failure bytes (m :: * -> *)
       a.
(MonadAsync m, MonadFork m, MonadMask m, MonadThrow (STM m),
 MonadTimer m, ShowProxy ps,
 forall (st' :: ps) stok. (stok ~ StateToken st') => Show stok,
 Show failure) =>
Tracer m (TraceSendRecv ps)
-> Codec ps failure m bytes
-> ProtocolSizeLimits ps bytes
-> ProtocolTimeLimits ps
-> Channel m bytes
-> Peer ps pr 'NonPipelined st m a
-> m (a, Maybe bytes)
runPeerWithLimits
             ((name, String, MiniProtocolNum)
-> TraceSendRecv (ReqResp req resp)
-> WithName
     (name, String, MiniProtocolNum) (TraceSendRecv (ReqResp req resp))
forall name event. name -> event -> WithName name event
WithName (name
name,String
"Responder",MiniProtocolNum
protocolNum) (TraceSendRecv (ReqResp req resp)
 -> WithName
      (name, String, MiniProtocolNum) (TraceSendRecv (ReqResp req resp)))
-> Tracer
     m
     (WithName
        (name, String, MiniProtocolNum) (TraceSendRecv (ReqResp req resp)))
-> Tracer m (TraceSendRecv (ReqResp req resp))
forall a' a. (a' -> a) -> Tracer m a -> Tracer m a'
forall (f :: * -> *) a' a.
Contravariant f =>
(a' -> a) -> f a -> f a'
`contramap` Tracer
  m
  (WithName
     (name, String, MiniProtocolNum) (TraceSendRecv (ReqResp req resp)))
forall (m :: * -> *) a. Applicative m => Tracer m a
nullTracer)
             -- TraceSendRecv
             Codec (ReqResp req resp) DeserialiseFailure m ByteString
forall req resp (m :: * -> *).
(MonadST m, Serialise req, Serialise resp) =>
Codec (ReqResp req resp) DeserialiseFailure m ByteString
codecReqResp
             ProtocolSizeLimits (ReqResp req resp) ByteString
forall {k} {k1} (req :: k) (resp :: k1).
ProtocolSizeLimits (ReqResp req resp) ByteString
reqRespSizeLimits
             ProtocolTimeLimits (ReqResp req resp)
forall {k} {k1} (req :: k) (resp :: k1).
ProtocolTimeLimits (ReqResp req resp)
reqRespTimeLimits
             Channel m ByteString
channel
             (ReqRespServer req resp m acc
-> Peer (ReqResp req resp) 'AsServer 'NonPipelined 'StIdle m acc
forall (m :: * -> *) req resp a.
Monad m =>
ReqRespServer req resp m a
-> Server (ReqResp req resp) 'NonPipelined 'StIdle m a
reqRespServerPeer (ReqRespServer req resp m acc
 -> Peer (ReqResp req resp) 'AsServer 'NonPipelined 'StIdle m acc)
-> ReqRespServer req resp m acc
-> Peer (ReqResp req resp) 'AsServer 'NonPipelined 'StIdle m acc
forall a b. (a -> b) -> a -> b
$ acc -> ReqRespServer req resp m acc
reqRespServerMapAccumL' acc
accInit))

    reqRespServerMapAccumL' :: acc -> ReqRespServer req resp m acc
    reqRespServerMapAccumL' :: acc -> ReqRespServer req resp m acc
reqRespServerMapAccumL' = acc -> ReqRespServer req resp m acc
[req] -> ReqRespServer req [req] m [req]
forall {m :: * -> *} {a}.
Monad m =>
[a] -> ReqRespServer a [a] m [a]
go
      where
        fn :: [a] -> a -> ([a], [a])
fn [a]
acc a
x = (a
x a -> [a] -> [a]
forall a. a -> [a] -> [a]
: [a]
acc, a
x a -> [a] -> [a]
forall a. a -> [a] -> [a]
: [a]
acc)
        go :: [a] -> ReqRespServer a [a] m [a]
go [a]
acc =
          ReqRespServer {
              recvMsgReq :: a -> m ([a], ReqRespServer a [a] m [a])
recvMsgReq = \a
req ->
                  let ([a]
acc', [a]
resp) = [a] -> a -> ([a], [a])
forall {a}. [a] -> a -> ([a], [a])
fn [a]
acc a
req
                  in ([a], ReqRespServer a [a] m [a])
-> m ([a], ReqRespServer a [a] m [a])
forall a. a -> m a
forall (m :: * -> *) a. Monad m => a -> m a
return ([a]
resp, [a] -> ReqRespServer a [a] m [a]
go [a]
acc'),
              recvMsgDone :: m [a]
recvMsgDone = [a] -> m [a]
forall a. a -> m a
forall (m :: * -> *) a. Monad m => a -> m a
return [a]
acc
            }


reqRespSizeLimits :: forall req resp. ProtocolSizeLimits (ReqResp req resp)
                                                         ByteString
reqRespSizeLimits :: forall {k} {k1} (req :: k) (resp :: k1).
ProtocolSizeLimits (ReqResp req resp) ByteString
reqRespSizeLimits = ProtocolSizeLimits
    { StateToken st -> Word
forall (st :: ReqResp req resp).
ActiveState st =>
StateToken st -> Word
forall (st :: ReqResp req resp). StateToken st -> Word
sizeLimitForState :: forall (st :: ReqResp req resp). StateToken st -> Word
sizeLimitForState :: forall (st :: ReqResp req resp).
ActiveState st =>
StateToken st -> Word
sizeLimitForState
    , dataSize :: ByteString -> Word
dataSize = Int64 -> Word
forall a b. (Integral a, Num b) => a -> b
fromIntegral (Int64 -> Word) -> (ByteString -> Int64) -> ByteString -> Word
forall b c a. (b -> c) -> (a -> b) -> a -> c
. ByteString -> Int64
LBS.length
    }
  where
    sizeLimitForState :: forall (st :: ReqResp req resp).
                         StateToken st -> Word
    sizeLimitForState :: forall (st :: ReqResp req resp). StateToken st -> Word
sizeLimitForState StateToken st
_ = Word
forall a. Bounded a => a
maxBound

reqRespTimeLimits :: forall req resp. ProtocolTimeLimits (ReqResp req resp)
reqRespTimeLimits :: forall {k} {k1} (req :: k) (resp :: k1).
ProtocolTimeLimits (ReqResp req resp)
reqRespTimeLimits = ProtocolTimeLimits { StateToken st -> Maybe DiffTime
forall (st :: ReqResp req resp).
ActiveState st =>
StateToken st -> Maybe DiffTime
timeLimitForState :: forall (st :: ReqResp req resp).
ActiveState st =>
StateToken st -> Maybe DiffTime
timeLimitForState :: forall (st :: ReqResp req resp).
ActiveState st =>
StateToken st -> Maybe DiffTime
timeLimitForState }
  where
    timeLimitForState :: forall (st :: ReqResp req resp).
                         ActiveState st
                      => StateToken st -> Maybe DiffTime
    timeLimitForState :: forall (st :: ReqResp req resp).
ActiveState st =>
StateToken st -> Maybe DiffTime
timeLimitForState StateToken st
SReqResp st
SingIdle           = Maybe DiffTime
forall a. Maybe a
Nothing
    timeLimitForState StateToken st
SReqResp st
SingBusy           = DiffTime -> Maybe DiffTime
forall a. a -> Maybe a
Just DiffTime
60
    timeLimitForState a :: StateToken st
a@StateToken st
SReqResp st
ReqResp.SingDone = StateToken 'StDone -> forall a. a
forall ps (st :: ps).
(StateAgency st ~ 'NobodyAgency, ActiveState st) =>
StateToken st -> forall a. a
notActiveState StateToken st
StateToken 'StDone
a



-- | Run all initiator mini-protocols and collect results. Throw exception if
-- any of the thread returned an exception.
--
-- This function assumes that there's one established, one warm and one hot
-- mini-protocol, which is compatible with both
--
-- * 'withInitiatorOnlyConnectionManager', and
-- * 'withBidirectionalConnectionManager'.
--
runInitiatorProtocols
    :: forall muxMode addr m a b.
       ( Alternative (STM m)
       , MonadAsync       m
       , MonadCatch       m
       , MonadLabelledSTM m
       , MonadMask        m
       , MonadSTM         m
       , MonadThrow  (STM m)
       , HasInitiator muxMode ~ True
       , MonadSay         m
       )
    => SingMuxMode muxMode
    -> Mx.Mux muxMode m
    -> OuroborosBundle muxMode (ExpandedInitiatorContext addr m)
                               (ResponderContext addr)
                               ByteString m a b
    -> TemperatureBundle (StrictTVar m ControlMessage)
    -> ConnectionId addr
    -> m (TemperatureBundle a)
runInitiatorProtocols :: forall (muxMode :: Mode) addr (m :: * -> *) a b.
(Alternative (STM m), MonadAsync m, MonadCatch m,
 MonadLabelledSTM m, MonadMask m, MonadSTM m, MonadThrow (STM m),
 HasInitiator muxMode ~ 'True, MonadSay m) =>
SingMuxMode muxMode
-> Mux muxMode m
-> OuroborosBundle
     muxMode
     (ExpandedInitiatorContext addr m)
     (ResponderContext addr)
     ByteString
     m
     a
     b
-> TemperatureBundle (StrictTVar m ControlMessage)
-> ConnectionId addr
-> m (TemperatureBundle a)
runInitiatorProtocols SingMuxMode muxMode
singMuxMode Mux muxMode m
mux OuroborosBundle
  muxMode
  (ExpandedInitiatorContext addr m)
  (ResponderContext addr)
  ByteString
  m
  a
  b
bundle TemperatureBundle (StrictTVar m ControlMessage)
controlBundle ConnectionId addr
connId = do
    -- start all mini-protocols
    bundle' <- ((MiniProtocolWithExpandedCtx muxMode addr ByteString m a b,
  STM m ControlMessage)
 -> m (STM m (Either SomeException a)))
-> TemperatureBundle
     (MiniProtocolWithExpandedCtx muxMode addr ByteString m a b,
      STM m ControlMessage)
-> m (TemperatureBundle (STM m (Either SomeException a)))
forall (t :: * -> *) (f :: * -> *) a b.
(Traversable t, Applicative f) =>
(a -> f b) -> t a -> f (t b)
forall (f :: * -> *) a b.
Applicative f =>
(a -> f b) -> TemperatureBundle a -> f (TemperatureBundle b)
traverse ((MiniProtocolWithExpandedCtx muxMode addr ByteString m a b
 -> STM m ControlMessage -> m (STM m (Either SomeException a)))
-> (MiniProtocolWithExpandedCtx muxMode addr ByteString m a b,
    STM m ControlMessage)
-> m (STM m (Either SomeException a))
forall a b c. (a -> b -> c) -> (a, b) -> c
uncurry MiniProtocolWithExpandedCtx muxMode addr ByteString m a b
-> STM m ControlMessage -> m (STM m (Either SomeException a))
runInitiator) ((,) (MiniProtocolWithExpandedCtx muxMode addr ByteString m a b
 -> STM m ControlMessage
 -> (MiniProtocolWithExpandedCtx muxMode addr ByteString m a b,
     STM m ControlMessage))
-> TemperatureBundle
     (MiniProtocolWithExpandedCtx muxMode addr ByteString m a b)
-> TemperatureBundle
     (STM m ControlMessage
      -> (MiniProtocolWithExpandedCtx muxMode addr ByteString m a b,
          STM m ControlMessage))
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> ([MiniProtocolWithExpandedCtx muxMode addr ByteString m a b]
-> MiniProtocolWithExpandedCtx muxMode addr ByteString m a b
forall a. HasCallStack => [a] -> a
head ([MiniProtocolWithExpandedCtx muxMode addr ByteString m a b]
 -> MiniProtocolWithExpandedCtx muxMode addr ByteString m a b)
-> OuroborosBundle
     muxMode
     (ExpandedInitiatorContext addr m)
     (ResponderContext addr)
     ByteString
     m
     a
     b
-> TemperatureBundle
     (MiniProtocolWithExpandedCtx muxMode addr ByteString m a b)
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> OuroborosBundle
  muxMode
  (ExpandedInitiatorContext addr m)
  (ResponderContext addr)
  ByteString
  m
  a
  b
bundle)
                                                    TemperatureBundle
  (STM m ControlMessage
   -> (MiniProtocolWithExpandedCtx muxMode addr ByteString m a b,
       STM m ControlMessage))
-> TemperatureBundle (STM m ControlMessage)
-> TemperatureBundle
     (MiniProtocolWithExpandedCtx muxMode addr ByteString m a b,
      STM m ControlMessage)
forall a b.
TemperatureBundle (a -> b)
-> TemperatureBundle a -> TemperatureBundle b
forall (f :: * -> *) a b. Applicative f => f (a -> b) -> f a -> f b
<*> (StrictTVar m ControlMessage -> STM m ControlMessage
forall (m :: * -> *) a. MonadSTM m => StrictTVar m a -> STM m a
readTVar (StrictTVar m ControlMessage -> STM m ControlMessage)
-> TemperatureBundle (StrictTVar m ControlMessage)
-> TemperatureBundle (STM m ControlMessage)
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> TemperatureBundle (StrictTVar m ControlMessage)
controlBundle))
    -- await for their termination
    traverse (atomically >=> either throwIO return)
             bundle'
  where
    runInitiator :: MiniProtocolWithExpandedCtx muxMode addr ByteString m a b
                 -> ControlMessageSTM m
                 -> m (STM m (Either SomeException a))
    runInitiator :: MiniProtocolWithExpandedCtx muxMode addr ByteString m a b
-> STM m ControlMessage -> m (STM m (Either SomeException a))
runInitiator MiniProtocolWithExpandedCtx muxMode addr ByteString m a b
ptcl STM m ControlMessage
controlMessage =
        Mux muxMode m
-> MiniProtocolNum
-> MiniProtocolDirection muxMode
-> StartOnDemandOrEagerly
-> (ByteChannel m -> m (a, Maybe ByteString))
-> m (STM m (Either SomeException a))
forall (mode :: Mode) (m :: * -> *) a.
(Alternative (STM m), MonadSTM m, MonadThrow m,
 MonadThrow (STM m)) =>
Mux mode m
-> MiniProtocolNum
-> MiniProtocolDirection mode
-> StartOnDemandOrEagerly
-> (ByteChannel m -> m (a, Maybe ByteString))
-> m (STM m (Either SomeException a))
Mx.runMiniProtocol
          Mux muxMode m
mux
          (MiniProtocolWithExpandedCtx muxMode addr ByteString m a b
-> MiniProtocolNum
forall (mode :: Mode) initiatorCtx responderCtx bytes (m :: * -> *)
       a b.
MiniProtocol mode initiatorCtx responderCtx bytes m a b
-> MiniProtocolNum
miniProtocolNum MiniProtocolWithExpandedCtx muxMode addr ByteString m a b
ptcl)
          (case SingMuxMode muxMode
singMuxMode of
            SingMuxMode muxMode
SingInitiatorMode          -> MiniProtocolDirection muxMode
MiniProtocolDirection 'InitiatorMode
Mx.InitiatorDirectionOnly
            SingMuxMode muxMode
SingInitiatorResponderMode -> MiniProtocolDirection muxMode
MiniProtocolDirection 'InitiatorResponderMode
Mx.InitiatorDirection)
          StartOnDemandOrEagerly
Mx.StartEagerly
          (MiniProtocolCb (ExpandedInitiatorContext addr m) ByteString m a
-> ExpandedInitiatorContext addr m
-> ByteChannel m
-> m (a, Maybe ByteString)
forall ctx bytes (m :: * -> *) a.
MiniProtocolCb ctx bytes m a
-> ctx -> Channel m bytes -> m (a, Maybe bytes)
runMiniProtocolCb
            (case MiniProtocolWithExpandedCtx muxMode addr ByteString m a b
-> RunMiniProtocol
     muxMode
     (ExpandedInitiatorContext addr m)
     (ResponderContext addr)
     ByteString
     m
     a
     b
forall (mode :: Mode) initiatorCtx responderCtx bytes (m :: * -> *)
       a b.
MiniProtocol mode initiatorCtx responderCtx bytes m a b
-> RunMiniProtocol mode initiatorCtx responderCtx bytes m a b
miniProtocolRun MiniProtocolWithExpandedCtx muxMode addr ByteString m a b
ptcl of
              InitiatorProtocolOnly MiniProtocolCb (ExpandedInitiatorContext addr m) ByteString m a
initiator           -> MiniProtocolCb (ExpandedInitiatorContext addr m) ByteString m a
initiator
              InitiatorAndResponderProtocol MiniProtocolCb (ExpandedInitiatorContext addr m) ByteString m a
initiator MiniProtocolCb (ResponderContext addr) ByteString m b
_ -> MiniProtocolCb (ExpandedInitiatorContext addr m) ByteString m a
initiator)
            ExpandedInitiatorContext addr m
initiatorCtx)
      where
        initiatorCtx :: ExpandedInitiatorContext addr m
initiatorCtx = ExpandedInitiatorContext {
            eicConnectionId :: ConnectionId addr
eicConnectionId    = ConnectionId addr
connId,
            eicControlMessage :: STM m ControlMessage
eicControlMessage  = STM m ControlMessage
controlMessage,
            eicIsBigLedgerPeer :: IsBigLedgerPeer
eicIsBigLedgerPeer = IsBigLedgerPeer
IsNotBigLedgerPeer
          }

--
-- Experiments \/ Demos & Properties
--

-- | Max bound AcceptedConnectionsLimit
maxAcceptedConnectionsLimit :: AcceptedConnectionsLimit
maxAcceptedConnectionsLimit :: AcceptedConnectionsLimit
maxAcceptedConnectionsLimit = Word32 -> Word32 -> DiffTime -> AcceptedConnectionsLimit
AcceptedConnectionsLimit Word32
forall a. Bounded a => a
maxBound Word32
forall a. Bounded a => a
maxBound DiffTime
0


-- | This test runs an initiator only connection manager (client side) and bidirectional
-- connection manager (which runs a server).  The client connect to the
-- server and runs protocols to completion.
--
-- There is a good reason why we don't run two bidirectional connection managers;
-- If we would do that, when the either side terminates the connection the
-- client side server would through an exception as it is listening.
--
unidirectionalExperiment
    :: forall peerAddr socket acc req resp m.
       ( ConnectionManagerMonad m
       , MonadAsync m
       , MonadDelay m
       , MonadFix m
       , MonadLabelledSTM m
       , MonadTraceSTM m
       , MonadSay m

       , acc ~ [req], resp ~ [req]
       , Ord peerAddr, Show peerAddr, Typeable peerAddr, Eq peerAddr
       , Serialise req, Show req
       , Serialise resp, Show resp, Eq resp
       , Typeable req, Typeable resp
       )
    => StdGen
    -> Timeouts
    -> Snocket m socket peerAddr
    -> Mx.MakeBearer m socket
    -> (socket -> m ())
    -> socket
    -> ClientAndServerData req
    -> m Property
unidirectionalExperiment :: forall peerAddr socket acc req resp (m :: * -> *).
(ConnectionManagerMonad m, MonadAsync m, MonadDelay m, MonadFix m,
 MonadLabelledSTM m, MonadTraceSTM m, MonadSay m, acc ~ [req],
 resp ~ [req], Ord peerAddr, Show peerAddr, Typeable peerAddr,
 Eq peerAddr, Serialise req, Show req, Serialise resp, Show resp,
 Eq resp, Typeable req, Typeable resp) =>
StdGen
-> Timeouts
-> Snocket m socket peerAddr
-> MakeBearer m socket
-> (socket -> m ())
-> socket
-> ClientAndServerData req
-> m Property
unidirectionalExperiment StdGen
stdGen Timeouts
timeouts Snocket m socket peerAddr
snocket MakeBearer m socket
makeBearer socket -> m ()
confSock socket
socket ClientAndServerData req
clientAndServerData = do
    let (StdGen
stdGen', StdGen
stdGen'') = StdGen -> (StdGen, StdGen)
forall g. RandomGen g => g -> (g, g)
split StdGen
stdGen
    nextReqs <- ClientAndServerData req
-> m (TemperatureBundle (ConnectionId peerAddr -> STM m [req]))
forall req peerAddr (m :: * -> *).
MonadSTM m =>
ClientAndServerData req
-> m (TemperatureBundle (ConnectionId peerAddr -> STM m [req]))
oneshotNextRequests ClientAndServerData req
clientAndServerData
    connStateIdSupply <- atomically $ CM.newConnStateIdSupply (Proxy @m)
    withInitiatorOnlyConnectionManager
      "client" timeouts nullTracer nullTracer stdGen' snocket makeBearer connStateIdSupply Nothing nextReqs
      timeLimitsHandshake maxAcceptedConnectionsLimit
      $ \ConnectionManagerWithExpandedCtx
  'InitiatorMode
  socket
  peerAddr
  DataFlowProtocolData
  UnversionedProtocol
  ByteString
  m
  [resp]
  Void
connectionManager ->
        String
-> Timeouts
-> Tracer m (WithName String (RemoteTransitionTrace peerAddr))
-> Tracer m (WithName String (AbstractTransitionTrace ConnStateId))
-> Tracer
     m
     (WithName
        String
        (Trace
           peerAddr
           (ConnectionHandlerTrace UnversionedProtocol DataFlowProtocolData)))
-> Tracer m (WithName String (Trace peerAddr))
-> Tracer m (WithName String (Debug peerAddr DataFlowProtocolData))
-> StdGen
-> Snocket m socket peerAddr
-> MakeBearer m socket
-> ConnStateIdSupply m
-> (socket -> m ())
-> socket
-> Maybe peerAddr
-> [req]
-> TemperatureBundle (ConnectionId peerAddr -> STM m [req])
-> ProtocolTimeLimits (Handshake UnversionedProtocol Term)
-> AcceptedConnectionsLimit
-> (ConnectionManager
      'InitiatorResponderMode
      socket
      peerAddr
      (Handle
         'InitiatorResponderMode
         (ExpandedInitiatorContext peerAddr m)
         (ResponderContext peerAddr)
         DataFlowProtocolData
         ByteString
         m
         [[req]]
         [req])
      (HandleError 'InitiatorResponderMode UnversionedProtocol)
      m
    -> peerAddr -> Async m Void -> m Property)
-> m Property
forall name peerAddr socket acc req resp (m :: * -> *) a.
(ConnectionManagerMonad m, acc ~ [req], resp ~ [req], Ord peerAddr,
 Show peerAddr, Typeable peerAddr, Serialise req, Typeable req,
 MonadAsync m, MonadDelay m, MonadFix m, MonadLabelledSTM m,
 MonadTraceSTM m, MonadSay m, Show req, Show name) =>
name
-> Timeouts
-> Tracer m (WithName name (RemoteTransitionTrace peerAddr))
-> Tracer m (WithName name (AbstractTransitionTrace ConnStateId))
-> Tracer
     m
     (WithName
        name
        (Trace
           peerAddr
           (ConnectionHandlerTrace UnversionedProtocol DataFlowProtocolData)))
-> Tracer m (WithName name (Trace peerAddr))
-> Tracer m (WithName name (Debug peerAddr DataFlowProtocolData))
-> StdGen
-> Snocket m socket peerAddr
-> MakeBearer m socket
-> ConnStateIdSupply m
-> (socket -> m ())
-> socket
-> Maybe peerAddr
-> acc
-> TemperatureBundle (ConnectionId peerAddr -> STM m [req])
-> ProtocolTimeLimits (Handshake UnversionedProtocol Term)
-> AcceptedConnectionsLimit
-> (ConnectionManagerWithExpandedCtx
      'InitiatorResponderMode
      socket
      peerAddr
      DataFlowProtocolData
      UnversionedProtocol
      ByteString
      m
      [resp]
      acc
    -> peerAddr -> Async m Void -> m a)
-> m a
withBidirectionalConnectionManager String
"server" Timeouts
timeouts
                                           Tracer m (WithName String (RemoteTransitionTrace peerAddr))
forall (m :: * -> *) a. Applicative m => Tracer m a
nullTracer Tracer m (WithName String (AbstractTransitionTrace ConnStateId))
forall (m :: * -> *) a. Applicative m => Tracer m a
nullTracer Tracer
  m
  (WithName
     String
     (Trace
        peerAddr
        (ConnectionHandlerTrace UnversionedProtocol DataFlowProtocolData)))
forall (m :: * -> *) a. Applicative m => Tracer m a
nullTracer
                                           Tracer m (WithName String (Trace peerAddr))
forall (m :: * -> *) a. Applicative m => Tracer m a
nullTracer Tracer m (WithName String (Debug peerAddr DataFlowProtocolData))
forall (m :: * -> *) a. Applicative m => Tracer m a
nullTracer
                                           StdGen
stdGen''
                                           Snocket m socket peerAddr
snocket MakeBearer m socket
makeBearer ConnStateIdSupply m
connStateIdSupply
                                           socket -> m ()
confSock socket
socket Maybe peerAddr
forall a. Maybe a
Nothing
                                           [ClientAndServerData req -> req
forall req. ClientAndServerData req -> req
accumulatorInit ClientAndServerData req
clientAndServerData]
                                           TemperatureBundle (ConnectionId peerAddr -> STM m [req])
forall (stm :: * -> *) req peerAddr.
Applicative stm =>
TemperatureBundle (ConnectionId peerAddr -> stm [req])
noNextRequests
                                           ProtocolTimeLimits (Handshake UnversionedProtocol Term)
forall {k} (vNumber :: k).
ProtocolTimeLimits (Handshake vNumber Term)
timeLimitsHandshake
                                           AcceptedConnectionsLimit
maxAcceptedConnectionsLimit
          ((ConnectionManager
    'InitiatorResponderMode
    socket
    peerAddr
    (Handle
       'InitiatorResponderMode
       (ExpandedInitiatorContext peerAddr m)
       (ResponderContext peerAddr)
       DataFlowProtocolData
       ByteString
       m
       [[req]]
       [req])
    (HandleError 'InitiatorResponderMode UnversionedProtocol)
    m
  -> peerAddr -> Async m Void -> m Property)
 -> m Property)
-> (ConnectionManager
      'InitiatorResponderMode
      socket
      peerAddr
      (Handle
         'InitiatorResponderMode
         (ExpandedInitiatorContext peerAddr m)
         (ResponderContext peerAddr)
         DataFlowProtocolData
         ByteString
         m
         [[req]]
         [req])
      (HandleError 'InitiatorResponderMode UnversionedProtocol)
      m
    -> peerAddr -> Async m Void -> m Property)
-> m Property
forall a b. (a -> b) -> a -> b
$ \ConnectionManager
  'InitiatorResponderMode
  socket
  peerAddr
  (Handle
     'InitiatorResponderMode
     (ExpandedInitiatorContext peerAddr m)
     (ResponderContext peerAddr)
     DataFlowProtocolData
     ByteString
     m
     [[req]]
     [req])
  (HandleError 'InitiatorResponderMode UnversionedProtocol)
  m
_ peerAddr
serverAddr Async m Void
_serverAsync -> do
            -- client → server: connect
            (rs :: [Either SomeException (TemperatureBundle [resp])]) <-
                Int
-> m (Either SomeException (TemperatureBundle [resp]))
-> m [Either SomeException (TemperatureBundle [resp])]
forall (m :: * -> *) a. Applicative m => Int -> m a -> m [a]
replicateM
                  (ClientAndServerData req -> Int
forall req. ClientAndServerData req -> Int
numberOfRounds ClientAndServerData req
clientAndServerData)
                  (m (Connected
     peerAddr
     (HandleWithExpandedCtx
        'InitiatorMode
        peerAddr
        DataFlowProtocolData
        ByteString
        m
        [resp]
        Void)
     (HandleError 'InitiatorMode UnversionedProtocol))
-> (Connected
      peerAddr
      (HandleWithExpandedCtx
         'InitiatorMode
         peerAddr
         DataFlowProtocolData
         ByteString
         m
         [resp]
         Void)
      (HandleError 'InitiatorMode UnversionedProtocol)
    -> m (OperationResult AbstractState))
-> (Connected
      peerAddr
      (HandleWithExpandedCtx
         'InitiatorMode
         peerAddr
         DataFlowProtocolData
         ByteString
         m
         [resp]
         Void)
      (HandleError 'InitiatorMode UnversionedProtocol)
    -> m (Either SomeException (TemperatureBundle [resp])))
-> m (Either SomeException (TemperatureBundle [resp]))
forall a b c. m a -> (a -> m b) -> (a -> m c) -> m c
forall (m :: * -> *) a b c.
MonadThrow m =>
m a -> (a -> m b) -> (a -> m c) -> m c
bracket
                     (ConnectionManagerWithExpandedCtx
  'InitiatorMode
  socket
  peerAddr
  DataFlowProtocolData
  UnversionedProtocol
  ByteString
  m
  [resp]
  Void
-> AcquireOutboundConnection
     peerAddr
     (HandleWithExpandedCtx
        'InitiatorMode
        peerAddr
        DataFlowProtocolData
        ByteString
        m
        [resp]
        Void)
     (HandleError 'InitiatorMode UnversionedProtocol)
     m
forall (muxMode :: Mode) socket peerAddr handle handleError
       (m :: * -> *).
(HasInitiator muxMode ~ 'True) =>
ConnectionManager muxMode socket peerAddr handle handleError m
-> AcquireOutboundConnection peerAddr handle handleError m
acquireOutboundConnection ConnectionManagerWithExpandedCtx
  'InitiatorMode
  socket
  peerAddr
  DataFlowProtocolData
  UnversionedProtocol
  ByteString
  m
  [resp]
  Void
connectionManager DiffusionMode
InitiatorOnlyDiffusionMode peerAddr
serverAddr)
                     (\case
                        Connected ConnectionId peerAddr
connId DataFlow
_ HandleWithExpandedCtx
  'InitiatorMode
  peerAddr
  DataFlowProtocolData
  ByteString
  m
  [resp]
  Void
_ -> ConnectionManagerWithExpandedCtx
  'InitiatorMode
  socket
  peerAddr
  DataFlowProtocolData
  UnversionedProtocol
  ByteString
  m
  [resp]
  Void
-> ConnectionId peerAddr -> m (OperationResult AbstractState)
forall (muxMode :: Mode) socket peerAddr handle handleError
       (m :: * -> *).
(HasInitiator muxMode ~ 'True) =>
ConnectionManager muxMode socket peerAddr handle handleError m
-> ConnectionId peerAddr -> m (OperationResult AbstractState)
releaseOutboundConnection ConnectionManagerWithExpandedCtx
  'InitiatorMode
  socket
  peerAddr
  DataFlowProtocolData
  UnversionedProtocol
  ByteString
  m
  [resp]
  Void
connectionManager ConnectionId peerAddr
connId
                        Disconnected {} -> String -> m (OperationResult AbstractState)
forall a. HasCallStack => String -> a
error String
"unidirectionalExperiment: impossible happened")
                     (\Connected
  peerAddr
  (HandleWithExpandedCtx
     'InitiatorMode
     peerAddr
     DataFlowProtocolData
     ByteString
     m
     [resp]
     Void)
  (HandleError 'InitiatorMode UnversionedProtocol)
connHandle -> do
                      case Connected
  peerAddr
  (HandleWithExpandedCtx
     'InitiatorMode
     peerAddr
     DataFlowProtocolData
     ByteString
     m
     [resp]
     Void)
  (HandleError 'InitiatorMode UnversionedProtocol)
connHandle of
                        Connected ConnectionId peerAddr
connId DataFlow
_ (Handle Mux 'InitiatorMode m
mux OuroborosBundle
  'InitiatorMode
  (ExpandedInitiatorContext peerAddr m)
  (ResponderContext peerAddr)
  ByteString
  m
  [resp]
  Void
muxBundle TemperatureBundle (StrictTVar m ControlMessage)
controlBundle DataFlowProtocolData
_
                                        :: HandleWithExpandedCtx Mx.InitiatorMode peerAddr
                                              DataFlowProtocolData ByteString m [resp] Void) ->
                          forall (m :: * -> *) e a.
(MonadCatch m, Exception e) =>
m a -> m (Either e a)
try @_ @SomeException (m (TemperatureBundle [resp])
 -> m (Either SomeException (TemperatureBundle [resp])))
-> m (TemperatureBundle [resp])
-> m (Either SomeException (TemperatureBundle [resp]))
forall a b. (a -> b) -> a -> b
$
                            (SingMuxMode 'InitiatorMode
-> Mux 'InitiatorMode m
-> OuroborosBundle
     'InitiatorMode
     (ExpandedInitiatorContext peerAddr m)
     (ResponderContext peerAddr)
     ByteString
     m
     [resp]
     Void
-> TemperatureBundle (StrictTVar m ControlMessage)
-> ConnectionId peerAddr
-> m (TemperatureBundle [resp])
forall (muxMode :: Mode) addr (m :: * -> *) a b.
(Alternative (STM m), MonadAsync m, MonadCatch m,
 MonadLabelledSTM m, MonadMask m, MonadSTM m, MonadThrow (STM m),
 HasInitiator muxMode ~ 'True, MonadSay m) =>
SingMuxMode muxMode
-> Mux muxMode m
-> OuroborosBundle
     muxMode
     (ExpandedInitiatorContext addr m)
     (ResponderContext addr)
     ByteString
     m
     a
     b
-> TemperatureBundle (StrictTVar m ControlMessage)
-> ConnectionId addr
-> m (TemperatureBundle a)
runInitiatorProtocols
                              SingMuxMode 'InitiatorMode
SingInitiatorMode Mux 'InitiatorMode m
mux OuroborosBundle
  'InitiatorMode
  (ExpandedInitiatorContext peerAddr m)
  (ResponderContext peerAddr)
  ByteString
  m
  [resp]
  Void
muxBundle TemperatureBundle (StrictTVar m ControlMessage)
controlBundle ConnectionId peerAddr
connId
                              :: m (TemperatureBundle [resp])
                            )
                        Disconnected ConnectionId peerAddr
_ Maybe (HandleError 'InitiatorMode UnversionedProtocol)
err ->
                          IOError -> m (Either SomeException (TemperatureBundle [resp]))
forall e a. Exception e => e -> m a
forall (m :: * -> *) e a. (MonadThrow m, Exception e) => e -> m a
throwIO (String -> IOError
userError (String -> IOError) -> String -> IOError
forall a b. (a -> b) -> a -> b
$ String
"unidirectionalExperiment: " String -> ShowS
forall a. [a] -> [a] -> [a]
++ Maybe (HandleError 'InitiatorMode UnversionedProtocol) -> String
forall a. Show a => a -> String
show Maybe (HandleError 'InitiatorMode UnversionedProtocol)
err))
                  )
            pure $
              foldr
                (\ (Either SomeException (TemperatureBundle [resp])
r, TemperatureBundle [resp]
expected) Property
acc ->
                  case Either SomeException (TemperatureBundle [resp])
r of
                    Left SomeException
err -> String -> Bool -> Property
forall prop. Testable prop => String -> prop -> Property
counterexample (SomeException -> String
forall a. Show a => a -> String
show SomeException
err) Bool
False
                    Right TemperatureBundle [resp]
a  -> TemperatureBundle [resp]
a TemperatureBundle [resp] -> TemperatureBundle [resp] -> Property
forall a. (Eq a, Show a) => a -> a -> Property
=== TemperatureBundle [resp]
expected Property -> Property -> Property
forall prop1 prop2.
(Testable prop1, Testable prop2) =>
prop1 -> prop2 -> Property
.&&. Property
acc)
                (property True)
                $ zip rs (expectedResult clientAndServerData clientAndServerData)


-- | Bidirectional send and receive.
--
bidirectionalExperiment
    :: forall peerAddr socket acc req resp m.
       ( ConnectionManagerMonad m
       , MonadAsync m
       , MonadDelay m
       , MonadFix m
       , MonadLabelledSTM m
       , MonadTraceSTM m
       , MonadSay m

       , acc ~ [req], resp ~ [req]
       , Ord peerAddr, Show peerAddr, Typeable peerAddr, Eq peerAddr

       , Serialise req, Show req
       , Serialise resp, Show resp, Eq resp
       , Typeable req, Typeable resp
       , Show acc
       )
    => Bool
    -> StdGen
    -> Timeouts
    -> Snocket m socket peerAddr
    -> Mx.MakeBearer m socket
    -> (socket -> m ()) -- ^ configure socket
    -> socket
    -> socket
    -> peerAddr
    -> peerAddr
    -> ClientAndServerData req
    -> ClientAndServerData req
    -> m Property
bidirectionalExperiment :: forall peerAddr socket acc req resp (m :: * -> *).
(ConnectionManagerMonad m, MonadAsync m, MonadDelay m, MonadFix m,
 MonadLabelledSTM m, MonadTraceSTM m, MonadSay m, acc ~ [req],
 resp ~ [req], Ord peerAddr, Show peerAddr, Typeable peerAddr,
 Eq peerAddr, Serialise req, Show req, Serialise resp, Show resp,
 Eq resp, Typeable req, Typeable resp, Show acc) =>
Bool
-> StdGen
-> Timeouts
-> Snocket m socket peerAddr
-> MakeBearer m socket
-> (socket -> m ())
-> socket
-> socket
-> peerAddr
-> peerAddr
-> ClientAndServerData req
-> ClientAndServerData req
-> m Property
bidirectionalExperiment
    Bool
useLock StdGen
stdGen Timeouts
timeouts Snocket m socket peerAddr
snocket MakeBearer m socket
makeBearer socket -> m ()
confSock socket
socket0 socket
socket1 peerAddr
localAddr0 peerAddr
localAddr1
    ClientAndServerData req
clientAndServerData0 ClientAndServerData req
clientAndServerData1 = do
      let (StdGen
stdGen', StdGen
stdGen'') = StdGen -> (StdGen, StdGen)
forall g. RandomGen g => g -> (g, g)
split StdGen
stdGen
      lock <- () -> m (StrictTMVar m ())
forall (m :: * -> *) a. MonadSTM m => a -> m (StrictTMVar m a)
newTMVarIO ()
      connStateIdSupply <- atomically $ CM.newConnStateIdSupply (Proxy @m)
      nextRequests0 <- oneshotNextRequests clientAndServerData0
      nextRequests1 <- oneshotNextRequests clientAndServerData1
      withBidirectionalConnectionManager "node-0" timeouts
                                         nullTracer nullTracer nullTracer nullTracer
                                         nullTracer stdGen' snocket makeBearer
                                         connStateIdSupply confSock
                                         socket0 (Just localAddr0)
                                         [accumulatorInit clientAndServerData0]
                                         nextRequests0
                                         noTimeLimitsHandshake
                                         maxAcceptedConnectionsLimit
        (\ConnectionManagerWithExpandedCtx
  'InitiatorResponderMode
  socket
  peerAddr
  DataFlowProtocolData
  UnversionedProtocol
  ByteString
  m
  [resp]
  [req]
connectionManager0 peerAddr
_serverAddr0 Async m Void
_serverAsync0 -> do
          String
-> Timeouts
-> Tracer m (WithName String (RemoteTransitionTrace peerAddr))
-> Tracer m (WithName String (AbstractTransitionTrace ConnStateId))
-> Tracer
     m
     (WithName
        String
        (Trace
           peerAddr
           (ConnectionHandlerTrace UnversionedProtocol DataFlowProtocolData)))
-> Tracer m (WithName String (Trace peerAddr))
-> Tracer m (WithName String (Debug peerAddr DataFlowProtocolData))
-> StdGen
-> Snocket m socket peerAddr
-> MakeBearer m socket
-> ConnStateIdSupply m
-> (socket -> m ())
-> socket
-> Maybe peerAddr
-> [req]
-> TemperatureBundle (ConnectionId peerAddr -> STM m [req])
-> ProtocolTimeLimits (Handshake UnversionedProtocol Term)
-> AcceptedConnectionsLimit
-> (ConnectionManagerWithExpandedCtx
      'InitiatorResponderMode
      socket
      peerAddr
      DataFlowProtocolData
      UnversionedProtocol
      ByteString
      m
      [resp]
      [req]
    -> peerAddr -> Async m Void -> m Property)
-> m Property
forall name peerAddr socket acc req resp (m :: * -> *) a.
(ConnectionManagerMonad m, acc ~ [req], resp ~ [req], Ord peerAddr,
 Show peerAddr, Typeable peerAddr, Serialise req, Typeable req,
 MonadAsync m, MonadDelay m, MonadFix m, MonadLabelledSTM m,
 MonadTraceSTM m, MonadSay m, Show req, Show name) =>
name
-> Timeouts
-> Tracer m (WithName name (RemoteTransitionTrace peerAddr))
-> Tracer m (WithName name (AbstractTransitionTrace ConnStateId))
-> Tracer
     m
     (WithName
        name
        (Trace
           peerAddr
           (ConnectionHandlerTrace UnversionedProtocol DataFlowProtocolData)))
-> Tracer m (WithName name (Trace peerAddr))
-> Tracer m (WithName name (Debug peerAddr DataFlowProtocolData))
-> StdGen
-> Snocket m socket peerAddr
-> MakeBearer m socket
-> ConnStateIdSupply m
-> (socket -> m ())
-> socket
-> Maybe peerAddr
-> acc
-> TemperatureBundle (ConnectionId peerAddr -> STM m [req])
-> ProtocolTimeLimits (Handshake UnversionedProtocol Term)
-> AcceptedConnectionsLimit
-> (ConnectionManagerWithExpandedCtx
      'InitiatorResponderMode
      socket
      peerAddr
      DataFlowProtocolData
      UnversionedProtocol
      ByteString
      m
      [resp]
      acc
    -> peerAddr -> Async m Void -> m a)
-> m a
withBidirectionalConnectionManager String
"node-1" Timeouts
timeouts
                                             Tracer m (WithName String (RemoteTransitionTrace peerAddr))
forall (m :: * -> *) a. Applicative m => Tracer m a
nullTracer Tracer m (WithName String (AbstractTransitionTrace ConnStateId))
forall (m :: * -> *) a. Applicative m => Tracer m a
nullTracer Tracer
  m
  (WithName
     String
     (Trace
        peerAddr
        (ConnectionHandlerTrace UnversionedProtocol DataFlowProtocolData)))
forall (m :: * -> *) a. Applicative m => Tracer m a
nullTracer Tracer m (WithName String (Trace peerAddr))
forall (m :: * -> *) a. Applicative m => Tracer m a
nullTracer
                                             Tracer m (WithName String (Debug peerAddr DataFlowProtocolData))
forall (m :: * -> *) a. Applicative m => Tracer m a
nullTracer StdGen
stdGen'' Snocket m socket peerAddr
snocket MakeBearer m socket
makeBearer
                                             ConnStateIdSupply m
connStateIdSupply socket -> m ()
confSock
                                             socket
socket1 (peerAddr -> Maybe peerAddr
forall a. a -> Maybe a
Just peerAddr
localAddr1)
                                             [ClientAndServerData req -> req
forall req. ClientAndServerData req -> req
accumulatorInit ClientAndServerData req
clientAndServerData1]
                                             TemperatureBundle (ConnectionId peerAddr -> STM m [req])
nextRequests1
                                             ProtocolTimeLimits (Handshake UnversionedProtocol Term)
forall {k} (vNumber :: k).
ProtocolTimeLimits (Handshake vNumber Term)
noTimeLimitsHandshake
                                             AcceptedConnectionsLimit
maxAcceptedConnectionsLimit
            (\ConnectionManagerWithExpandedCtx
  'InitiatorResponderMode
  socket
  peerAddr
  DataFlowProtocolData
  UnversionedProtocol
  ByteString
  m
  [resp]
  [req]
connectionManager1 peerAddr
_serverAddr1 Async m Void
serverAsync1 -> do
              Async m Void -> m ()
forall (m :: * -> *) a.
(MonadAsync m, MonadFork m, MonadMask m) =>
Async m a -> m ()
link Async m Void
serverAsync1
              -- runInitiatorProtocols returns a list of results per each
              -- protocol in each bucket (warm \/ hot \/ established); but
              -- we run only one mini-protocol. We can use `concat` to
              -- flatten the results.
              ( rs0 :: [Either SomeException (TemperatureBundle [resp])]
                , rs1 :: [Either SomeException (TemperatureBundle [resp])]
                ) <-
                -- Run initiator twice; this tests if the responders on
                -- the other end are restarted.
                Int
-> m (Either SomeException (TemperatureBundle [resp]))
-> m [Either SomeException (TemperatureBundle [resp])]
forall (m :: * -> *) a. Applicative m => Int -> m a -> m [a]
replicateM
                  (ClientAndServerData req -> Int
forall req. ClientAndServerData req -> Int
numberOfRounds ClientAndServerData req
clientAndServerData0)
                  (m (Connected
     peerAddr
     (HandleWithExpandedCtx
        'InitiatorResponderMode
        peerAddr
        DataFlowProtocolData
        ByteString
        m
        [resp]
        [req])
     (HandleError 'InitiatorResponderMode UnversionedProtocol))
-> (Connected
      peerAddr
      (HandleWithExpandedCtx
         'InitiatorResponderMode
         peerAddr
         DataFlowProtocolData
         ByteString
         m
         [resp]
         [req])
      (HandleError 'InitiatorResponderMode UnversionedProtocol)
    -> m (OperationResult AbstractState))
-> (Connected
      peerAddr
      (HandleWithExpandedCtx
         'InitiatorResponderMode
         peerAddr
         DataFlowProtocolData
         ByteString
         m
         [resp]
         [req])
      (HandleError 'InitiatorResponderMode UnversionedProtocol)
    -> m (Either SomeException (TemperatureBundle [resp])))
-> m (Either SomeException (TemperatureBundle [resp]))
forall a b c. m a -> (a -> m b) -> (a -> m c) -> m c
forall (m :: * -> *) a b c.
MonadThrow m =>
m a -> (a -> m b) -> (a -> m c) -> m c
bracket
                    (Bool
-> StrictTMVar m ()
-> m (Connected
        peerAddr
        (HandleWithExpandedCtx
           'InitiatorResponderMode
           peerAddr
           DataFlowProtocolData
           ByteString
           m
           [resp]
           [req])
        (HandleError 'InitiatorResponderMode UnversionedProtocol))
-> m (Connected
        peerAddr
        (HandleWithExpandedCtx
           'InitiatorResponderMode
           peerAddr
           DataFlowProtocolData
           ByteString
           m
           [resp]
           [req])
        (HandleError 'InitiatorResponderMode UnversionedProtocol))
forall (m :: * -> *) a.
(MonadSTM m, MonadThrow m) =>
Bool -> StrictTMVar m () -> m a -> m a
withLock Bool
useLock StrictTMVar m ()
lock
                      (ConnectionManagerWithExpandedCtx
  'InitiatorResponderMode
  socket
  peerAddr
  DataFlowProtocolData
  UnversionedProtocol
  ByteString
  m
  [resp]
  [req]
-> AcquireOutboundConnection
     peerAddr
     (HandleWithExpandedCtx
        'InitiatorResponderMode
        peerAddr
        DataFlowProtocolData
        ByteString
        m
        [resp]
        [req])
     (HandleError 'InitiatorResponderMode UnversionedProtocol)
     m
forall (muxMode :: Mode) socket peerAddr handle handleError
       (m :: * -> *).
(HasInitiator muxMode ~ 'True) =>
ConnectionManager muxMode socket peerAddr handle handleError m
-> AcquireOutboundConnection peerAddr handle handleError m
acquireOutboundConnection
                        ConnectionManagerWithExpandedCtx
  'InitiatorResponderMode
  socket
  peerAddr
  DataFlowProtocolData
  UnversionedProtocol
  ByteString
  m
  [resp]
  [req]
connectionManager0
                        DiffusionMode
InitiatorAndResponderDiffusionMode
                        peerAddr
localAddr1))
                    (\case
                      Connected ConnectionId peerAddr
connId DataFlow
_ HandleWithExpandedCtx
  'InitiatorResponderMode
  peerAddr
  DataFlowProtocolData
  ByteString
  m
  [resp]
  [req]
_ ->
                        ConnectionManagerWithExpandedCtx
  'InitiatorResponderMode
  socket
  peerAddr
  DataFlowProtocolData
  UnversionedProtocol
  ByteString
  m
  [resp]
  [req]
-> ConnectionId peerAddr -> m (OperationResult AbstractState)
forall (muxMode :: Mode) socket peerAddr handle handleError
       (m :: * -> *).
(HasInitiator muxMode ~ 'True) =>
ConnectionManager muxMode socket peerAddr handle handleError m
-> ConnectionId peerAddr -> m (OperationResult AbstractState)
releaseOutboundConnection
                          ConnectionManagerWithExpandedCtx
  'InitiatorResponderMode
  socket
  peerAddr
  DataFlowProtocolData
  UnversionedProtocol
  ByteString
  m
  [resp]
  [req]
connectionManager0
                          ConnectionId peerAddr
connId
                      Disconnected {} ->
                        String -> m (OperationResult AbstractState)
forall a. HasCallStack => String -> a
error String
"bidirectionalExperiment: impossible happened")
                    (\Connected
  peerAddr
  (HandleWithExpandedCtx
     'InitiatorResponderMode
     peerAddr
     DataFlowProtocolData
     ByteString
     m
     [resp]
     [req])
  (HandleError 'InitiatorResponderMode UnversionedProtocol)
connHandle ->
                      case Connected
  peerAddr
  (HandleWithExpandedCtx
     'InitiatorResponderMode
     peerAddr
     DataFlowProtocolData
     ByteString
     m
     [resp]
     [req])
  (HandleError 'InitiatorResponderMode UnversionedProtocol)
connHandle of
                        Connected ConnectionId peerAddr
connId DataFlow
_ (Handle Mux 'InitiatorResponderMode m
mux OuroborosBundle
  'InitiatorResponderMode
  (ExpandedInitiatorContext peerAddr m)
  (ResponderContext peerAddr)
  ByteString
  m
  [resp]
  [req]
muxBundle TemperatureBundle (StrictTVar m ControlMessage)
controlBundle DataFlowProtocolData
_) -> do
                          forall (m :: * -> *) e a.
(MonadCatch m, Exception e) =>
m a -> m (Either e a)
try @_ @SomeException (m (TemperatureBundle [resp])
 -> m (Either SomeException (TemperatureBundle [resp])))
-> m (TemperatureBundle [resp])
-> m (Either SomeException (TemperatureBundle [resp]))
forall a b. (a -> b) -> a -> b
$
                            SingMuxMode 'InitiatorResponderMode
-> Mux 'InitiatorResponderMode m
-> OuroborosBundle
     'InitiatorResponderMode
     (ExpandedInitiatorContext peerAddr m)
     (ResponderContext peerAddr)
     ByteString
     m
     [resp]
     [req]
-> TemperatureBundle (StrictTVar m ControlMessage)
-> ConnectionId peerAddr
-> m (TemperatureBundle [resp])
forall (muxMode :: Mode) addr (m :: * -> *) a b.
(Alternative (STM m), MonadAsync m, MonadCatch m,
 MonadLabelledSTM m, MonadMask m, MonadSTM m, MonadThrow (STM m),
 HasInitiator muxMode ~ 'True, MonadSay m) =>
SingMuxMode muxMode
-> Mux muxMode m
-> OuroborosBundle
     muxMode
     (ExpandedInitiatorContext addr m)
     (ResponderContext addr)
     ByteString
     m
     a
     b
-> TemperatureBundle (StrictTVar m ControlMessage)
-> ConnectionId addr
-> m (TemperatureBundle a)
runInitiatorProtocols
                              SingMuxMode 'InitiatorResponderMode
SingInitiatorResponderMode
                              Mux 'InitiatorResponderMode m
mux OuroborosBundle
  'InitiatorResponderMode
  (ExpandedInitiatorContext peerAddr m)
  (ResponderContext peerAddr)
  ByteString
  m
  [resp]
  [req]
muxBundle TemperatureBundle (StrictTVar m ControlMessage)
controlBundle ConnectionId peerAddr
connId
                        Disconnected ConnectionId peerAddr
_ Maybe (HandleError 'InitiatorResponderMode UnversionedProtocol)
err ->
                          IOError -> m (Either SomeException (TemperatureBundle [resp]))
forall e a. Exception e => e -> m a
forall (m :: * -> *) e a. (MonadThrow m, Exception e) => e -> m a
throwIO (String -> IOError
userError (String -> IOError) -> String -> IOError
forall a b. (a -> b) -> a -> b
$ String
"bidirectionalExperiment: " String -> ShowS
forall a. [a] -> [a] -> [a]
++ Maybe (HandleError 'InitiatorResponderMode UnversionedProtocol)
-> String
forall a. Show a => a -> String
show Maybe (HandleError 'InitiatorResponderMode UnversionedProtocol)
err)
                  ))
                m [Either SomeException (TemperatureBundle [resp])]
-> m [Either SomeException (TemperatureBundle [resp])]
-> m ([Either SomeException (TemperatureBundle [resp])],
      [Either SomeException (TemperatureBundle [resp])])
forall a b. m a -> m b -> m (a, b)
forall (m :: * -> *) a b. MonadAsync m => m a -> m b -> m (a, b)
`concurrently`
                Int
-> m (Either SomeException (TemperatureBundle [resp]))
-> m [Either SomeException (TemperatureBundle [resp])]
forall (m :: * -> *) a. Applicative m => Int -> m a -> m [a]
replicateM
                  (ClientAndServerData req -> Int
forall req. ClientAndServerData req -> Int
numberOfRounds ClientAndServerData req
clientAndServerData1)
                  (m (Connected
     peerAddr
     (HandleWithExpandedCtx
        'InitiatorResponderMode
        peerAddr
        DataFlowProtocolData
        ByteString
        m
        [resp]
        [req])
     (HandleError 'InitiatorResponderMode UnversionedProtocol))
-> (Connected
      peerAddr
      (HandleWithExpandedCtx
         'InitiatorResponderMode
         peerAddr
         DataFlowProtocolData
         ByteString
         m
         [resp]
         [req])
      (HandleError 'InitiatorResponderMode UnversionedProtocol)
    -> m (OperationResult AbstractState))
-> (Connected
      peerAddr
      (HandleWithExpandedCtx
         'InitiatorResponderMode
         peerAddr
         DataFlowProtocolData
         ByteString
         m
         [resp]
         [req])
      (HandleError 'InitiatorResponderMode UnversionedProtocol)
    -> m (Either SomeException (TemperatureBundle [resp])))
-> m (Either SomeException (TemperatureBundle [resp]))
forall a b c. m a -> (a -> m b) -> (a -> m c) -> m c
forall (m :: * -> *) a b c.
MonadThrow m =>
m a -> (a -> m b) -> (a -> m c) -> m c
bracket
                    (Bool
-> StrictTMVar m ()
-> m (Connected
        peerAddr
        (HandleWithExpandedCtx
           'InitiatorResponderMode
           peerAddr
           DataFlowProtocolData
           ByteString
           m
           [resp]
           [req])
        (HandleError 'InitiatorResponderMode UnversionedProtocol))
-> m (Connected
        peerAddr
        (HandleWithExpandedCtx
           'InitiatorResponderMode
           peerAddr
           DataFlowProtocolData
           ByteString
           m
           [resp]
           [req])
        (HandleError 'InitiatorResponderMode UnversionedProtocol))
forall (m :: * -> *) a.
(MonadSTM m, MonadThrow m) =>
Bool -> StrictTMVar m () -> m a -> m a
withLock Bool
useLock StrictTMVar m ()
lock
                      (ConnectionManagerWithExpandedCtx
  'InitiatorResponderMode
  socket
  peerAddr
  DataFlowProtocolData
  UnversionedProtocol
  ByteString
  m
  [resp]
  [req]
-> AcquireOutboundConnection
     peerAddr
     (HandleWithExpandedCtx
        'InitiatorResponderMode
        peerAddr
        DataFlowProtocolData
        ByteString
        m
        [resp]
        [req])
     (HandleError 'InitiatorResponderMode UnversionedProtocol)
     m
forall (muxMode :: Mode) socket peerAddr handle handleError
       (m :: * -> *).
(HasInitiator muxMode ~ 'True) =>
ConnectionManager muxMode socket peerAddr handle handleError m
-> AcquireOutboundConnection peerAddr handle handleError m
acquireOutboundConnection
                        ConnectionManagerWithExpandedCtx
  'InitiatorResponderMode
  socket
  peerAddr
  DataFlowProtocolData
  UnversionedProtocol
  ByteString
  m
  [resp]
  [req]
connectionManager1
                        DiffusionMode
InitiatorAndResponderDiffusionMode
                        peerAddr
localAddr0))
                    (\case
                      Connected ConnectionId peerAddr
connId DataFlow
_ HandleWithExpandedCtx
  'InitiatorResponderMode
  peerAddr
  DataFlowProtocolData
  ByteString
  m
  [resp]
  [req]
_ ->
                        ConnectionManagerWithExpandedCtx
  'InitiatorResponderMode
  socket
  peerAddr
  DataFlowProtocolData
  UnversionedProtocol
  ByteString
  m
  [resp]
  [req]
-> ConnectionId peerAddr -> m (OperationResult AbstractState)
forall (muxMode :: Mode) socket peerAddr handle handleError
       (m :: * -> *).
(HasInitiator muxMode ~ 'True) =>
ConnectionManager muxMode socket peerAddr handle handleError m
-> ConnectionId peerAddr -> m (OperationResult AbstractState)
releaseOutboundConnection
                          ConnectionManagerWithExpandedCtx
  'InitiatorResponderMode
  socket
  peerAddr
  DataFlowProtocolData
  UnversionedProtocol
  ByteString
  m
  [resp]
  [req]
connectionManager1
                          ConnectionId peerAddr
connId
                      Disconnected {} ->
                        String -> m (OperationResult AbstractState)
forall a. HasCallStack => String -> a
error String
"ibidirectionalExperiment: impossible happened")
                    (\Connected
  peerAddr
  (HandleWithExpandedCtx
     'InitiatorResponderMode
     peerAddr
     DataFlowProtocolData
     ByteString
     m
     [resp]
     [req])
  (HandleError 'InitiatorResponderMode UnversionedProtocol)
connHandle ->
                      case Connected
  peerAddr
  (HandleWithExpandedCtx
     'InitiatorResponderMode
     peerAddr
     DataFlowProtocolData
     ByteString
     m
     [resp]
     [req])
  (HandleError 'InitiatorResponderMode UnversionedProtocol)
connHandle of
                        Connected ConnectionId peerAddr
connId DataFlow
_ (Handle Mux 'InitiatorResponderMode m
mux OuroborosBundle
  'InitiatorResponderMode
  (ExpandedInitiatorContext peerAddr m)
  (ResponderContext peerAddr)
  ByteString
  m
  [resp]
  [req]
muxBundle TemperatureBundle (StrictTVar m ControlMessage)
controlBundle DataFlowProtocolData
_) -> do
                          forall (m :: * -> *) e a.
(MonadCatch m, Exception e) =>
m a -> m (Either e a)
try @_ @SomeException (m (TemperatureBundle [resp])
 -> m (Either SomeException (TemperatureBundle [resp])))
-> m (TemperatureBundle [resp])
-> m (Either SomeException (TemperatureBundle [resp]))
forall a b. (a -> b) -> a -> b
$
                            SingMuxMode 'InitiatorResponderMode
-> Mux 'InitiatorResponderMode m
-> OuroborosBundle
     'InitiatorResponderMode
     (ExpandedInitiatorContext peerAddr m)
     (ResponderContext peerAddr)
     ByteString
     m
     [resp]
     [req]
-> TemperatureBundle (StrictTVar m ControlMessage)
-> ConnectionId peerAddr
-> m (TemperatureBundle [resp])
forall (muxMode :: Mode) addr (m :: * -> *) a b.
(Alternative (STM m), MonadAsync m, MonadCatch m,
 MonadLabelledSTM m, MonadMask m, MonadSTM m, MonadThrow (STM m),
 HasInitiator muxMode ~ 'True, MonadSay m) =>
SingMuxMode muxMode
-> Mux muxMode m
-> OuroborosBundle
     muxMode
     (ExpandedInitiatorContext addr m)
     (ResponderContext addr)
     ByteString
     m
     a
     b
-> TemperatureBundle (StrictTVar m ControlMessage)
-> ConnectionId addr
-> m (TemperatureBundle a)
runInitiatorProtocols
                              SingMuxMode 'InitiatorResponderMode
SingInitiatorResponderMode
                              Mux 'InitiatorResponderMode m
mux OuroborosBundle
  'InitiatorResponderMode
  (ExpandedInitiatorContext peerAddr m)
  (ResponderContext peerAddr)
  ByteString
  m
  [resp]
  [req]
muxBundle TemperatureBundle (StrictTVar m ControlMessage)
controlBundle ConnectionId peerAddr
connId
                        Disconnected ConnectionId peerAddr
_ Maybe (HandleError 'InitiatorResponderMode UnversionedProtocol)
err ->
                          IOError -> m (Either SomeException (TemperatureBundle [resp]))
forall e a. Exception e => e -> m a
forall (m :: * -> *) e a. (MonadThrow m, Exception e) => e -> m a
throwIO (String -> IOError
userError (String -> IOError) -> String -> IOError
forall a b. (a -> b) -> a -> b
$ String
"bidirectionalExperiment: " String -> ShowS
forall a. [a] -> [a] -> [a]
++ Maybe (HandleError 'InitiatorResponderMode UnversionedProtocol)
-> String
forall a. Show a => a -> String
show Maybe (HandleError 'InitiatorResponderMode UnversionedProtocol)
err)
                  ))

              pure $
                foldr
                  (\ (Either SomeException (TemperatureBundle [resp])
r, TemperatureBundle [resp]
expected) Property
acc ->
                    case Either SomeException (TemperatureBundle [resp])
r of
                      Left SomeException
err -> String -> Bool -> Property
forall prop. Testable prop => String -> prop -> Property
counterexample (SomeException -> String
forall a. Show a => a -> String
show SomeException
err) Bool
False
                      Right TemperatureBundle [resp]
a  -> TemperatureBundle [resp]
a TemperatureBundle [resp] -> TemperatureBundle [resp] -> Property
forall a. (Eq a, Show a) => a -> a -> Property
=== TemperatureBundle [resp]
expected Property -> Property -> Property
forall prop1 prop2.
(Testable prop1, Testable prop2) =>
prop1 -> prop2 -> Property
.&&. Property
acc)
                  (property True)
                  (zip rs0 (expectedResult clientAndServerData0 clientAndServerData1))
                .&&.
                foldr
                  (\ (Either SomeException (TemperatureBundle [resp])
r, TemperatureBundle [resp]
expected) Property
acc ->
                    case Either SomeException (TemperatureBundle [resp])
r of
                      Left SomeException
err -> String -> Bool -> Property
forall prop. Testable prop => String -> prop -> Property
counterexample (SomeException -> String
forall a. Show a => a -> String
show SomeException
err) Bool
False
                      Right TemperatureBundle [resp]
a  -> TemperatureBundle [resp]
a TemperatureBundle [resp] -> TemperatureBundle [resp] -> Property
forall a. (Eq a, Show a) => a -> a -> Property
=== TemperatureBundle [resp]
expected Property -> Property -> Property
forall prop1 prop2.
(Testable prop1, Testable prop2) =>
prop1 -> prop2 -> Property
.&&. Property
acc)
                  (property True)
                  (zip rs1 (expectedResult clientAndServerData1 clientAndServerData0))
                ))


--
-- Utils
--


-- | Redefine this tracer to get valuable tracing information from various
-- components:
--
-- * connection-manager
-- * inbound governor
-- * server
--
-- debugTracer :: (MonadSay m, MonadTime m, Show a) => Tracer m a
-- debugTracer = Tracer (\msg -> (,msg) <$> getCurrentTime >>= say . show)
           -- <> Tracer Debug.traceShowM


withLock :: ( MonadSTM   m
            , MonadThrow m
            )
         => Bool
         -> StrictTMVar m ()
         -> m a
         -> m a
withLock :: forall (m :: * -> *) a.
(MonadSTM m, MonadThrow m) =>
Bool -> StrictTMVar m () -> m a -> m a
withLock Bool
False StrictTMVar m ()
_v m a
m = m a
m
withLock Bool
True   StrictTMVar m ()
v m a
m =
    m () -> (() -> m ()) -> (() -> m a) -> m a
forall a b c. m a -> (a -> m b) -> (a -> m c) -> m c
forall (m :: * -> *) a b c.
MonadThrow m =>
m a -> (a -> m b) -> (a -> m c) -> m c
bracket (STM m () -> m ()
forall a. HasCallStack => STM m a -> m a
forall (m :: * -> *) a.
(MonadSTM m, HasCallStack) =>
STM m a -> m a
atomically (STM m () -> m ()) -> STM m () -> m ()
forall a b. (a -> b) -> a -> b
$ StrictTMVar m () -> STM m ()
forall (m :: * -> *) a. MonadSTM m => StrictTMVar m a -> STM m a
takeTMVar StrictTMVar m ()
v)
            (STM m () -> m ()
forall a. HasCallStack => STM m a -> m a
forall (m :: * -> *) a.
(MonadSTM m, HasCallStack) =>
STM m a -> m a
atomically (STM m () -> m ()) -> (() -> STM m ()) -> () -> m ()
forall b c a. (b -> c) -> (a -> b) -> a -> c
. StrictTMVar m () -> () -> STM m ()
forall (m :: * -> *) a.
MonadSTM m =>
StrictTMVar m a -> a -> STM m ()
putTMVar StrictTMVar m ()
v)
            (m a -> () -> m a
forall a b. a -> b -> a
const m a
m)