{-# LANGUAGE BangPatterns #-}
{-# LANGUAGE DataKinds #-}
{-# LANGUAGE FlexibleContexts #-}
{-# LANGUAGE GADTs #-}
{-# LANGUAGE NamedFieldPuns #-}
{-# LANGUAGE PolyKinds #-}
{-# LANGUAGE QuantifiedConstraints #-}
{-# LANGUAGE RankNTypes #-}
{-# LANGUAGE ScopedTypeVariables #-}
{-# LANGUAGE TypeApplications #-}
{-# LANGUAGE TypeFamilies #-}
{-# LANGUAGE TypeOperators #-}
{-# LANGUAGE UndecidableInstances #-}
module Ouroboros.Network.Driver.Limits
(
ProtocolSizeLimits (..)
, ProtocolTimeLimits (..)
, ProtocolLimitFailure (..)
, runPeerWithLimits
, runPipelinedPeerWithLimits
, TraceSendRecv (..)
, driverWithLimits
, runConnectedPeersWithLimits
, runConnectedPipelinedPeersWithLimits
) where
import Data.Maybe (fromMaybe)
import Control.Monad.Class.MonadAsync
import Control.Monad.Class.MonadFork
import Control.Monad.Class.MonadSTM
import Control.Monad.Class.MonadThrow
import Control.Monad.Class.MonadTimer.SI
import Control.Tracer (Tracer (..), contramap, traceWith)
import Network.Mux.Timeout
import Network.TypedProtocol.Codec
import Network.TypedProtocol.Core
import Network.TypedProtocol.Driver
import Network.TypedProtocol.Peer
import Ouroboros.Network.Channel
import Ouroboros.Network.Driver.Simple
import Ouroboros.Network.Protocol.Limits
import Ouroboros.Network.Util.ShowProxy
driverWithLimits :: forall ps (pr :: PeerRole) failure bytes m.
( MonadThrow m
, ShowProxy ps
, forall (st' :: ps) tok. tok ~ StateToken st' => Show tok
, Show failure
)
=> Tracer m (TraceSendRecv ps)
-> TimeoutFn m
-> Codec ps failure m bytes
-> ProtocolSizeLimits ps bytes
-> ProtocolTimeLimits ps
-> Channel m bytes
-> Driver ps pr (Maybe bytes) m
driverWithLimits :: forall ps (pr :: PeerRole) failure bytes (m :: * -> *).
(MonadThrow m, ShowProxy ps,
forall (st' :: ps) tok. (tok ~ StateToken st') => Show tok,
Show failure) =>
Tracer m (TraceSendRecv ps)
-> TimeoutFn m
-> Codec ps failure m bytes
-> ProtocolSizeLimits ps bytes
-> ProtocolTimeLimits ps
-> Channel m bytes
-> Driver ps pr (Maybe bytes) m
driverWithLimits Tracer m (TraceSendRecv ps)
tracer TimeoutFn m
timeoutFn
Codec{forall (st :: ps) (st' :: ps).
(StateTokenI st, ActiveState st) =>
Message ps st st' -> bytes
encode :: forall (st :: ps) (st' :: ps).
(StateTokenI st, ActiveState st) =>
Message ps st st' -> bytes
encode :: forall ps failure (m :: * -> *) bytes.
Codec ps failure m bytes
-> forall (st :: ps) (st' :: ps).
(StateTokenI st, ActiveState st) =>
Message ps st st' -> bytes
encode, forall (st :: ps).
ActiveState st =>
StateToken st -> m (DecodeStep bytes failure m (SomeMessage st))
decode :: forall (st :: ps).
ActiveState st =>
StateToken st -> m (DecodeStep bytes failure m (SomeMessage st))
decode :: forall ps failure (m :: * -> *) bytes.
Codec ps failure m bytes
-> forall (st :: ps).
ActiveState st =>
StateToken st -> m (DecodeStep bytes failure m (SomeMessage st))
decode}
ProtocolSizeLimits{forall (st :: ps). ActiveState st => StateToken st -> Word
sizeLimitForState :: forall (st :: ps). ActiveState st => StateToken st -> Word
sizeLimitForState :: forall ps bytes.
ProtocolSizeLimits ps bytes
-> forall (st :: ps). ActiveState st => StateToken st -> Word
sizeLimitForState, bytes -> Word
dataSize :: bytes -> Word
dataSize :: forall ps bytes. ProtocolSizeLimits ps bytes -> bytes -> Word
dataSize}
ProtocolTimeLimits{forall (st :: ps).
ActiveState st =>
StateToken st -> Maybe DiffTime
timeLimitForState :: forall (st :: ps).
ActiveState st =>
StateToken st -> Maybe DiffTime
timeLimitForState :: forall ps.
ProtocolTimeLimits ps
-> forall (st :: ps).
ActiveState st =>
StateToken st -> Maybe DiffTime
timeLimitForState}
channel :: Channel m bytes
channel@Channel{bytes -> m ()
send :: bytes -> m ()
send :: forall (m :: * -> *) a. Channel m a -> a -> m ()
send} =
Driver { WeHaveAgencyProof pr st -> Message ps st st' -> m ()
forall (st :: ps) (st' :: ps).
(StateTokenI st, ActiveState st) =>
WeHaveAgencyProof pr st -> Message ps st st' -> m ()
forall (st :: ps) (st' :: ps).
(StateTokenI st, StateTokenI st', ActiveState st) =>
WeHaveAgencyProof pr st -> Message ps st st' -> m ()
sendMessage :: forall (st :: ps) (st' :: ps).
(StateTokenI st, ActiveState st) =>
WeHaveAgencyProof pr st -> Message ps st st' -> m ()
sendMessage :: forall (st :: ps) (st' :: ps).
(StateTokenI st, StateTokenI st', ActiveState st) =>
WeHaveAgencyProof pr st -> Message ps st st' -> m ()
sendMessage, TheyHaveAgencyProof pr st
-> Maybe bytes -> m (SomeMessage st, Maybe bytes)
forall (st :: ps).
(StateTokenI st, ActiveState st) =>
TheyHaveAgencyProof pr st
-> Maybe bytes -> m (SomeMessage st, Maybe bytes)
recvMessage :: forall (st :: ps).
(StateTokenI st, ActiveState st) =>
TheyHaveAgencyProof pr st
-> Maybe bytes -> m (SomeMessage st, Maybe bytes)
recvMessage :: forall (st :: ps).
(StateTokenI st, ActiveState st) =>
TheyHaveAgencyProof pr st
-> Maybe bytes -> m (SomeMessage st, Maybe bytes)
recvMessage, initialDState :: Maybe bytes
initialDState = Maybe bytes
forall a. Maybe a
Nothing }
where
sendMessage :: forall (st :: ps) (st' :: ps).
StateTokenI st
=> ActiveState st
=> WeHaveAgencyProof pr st
-> Message ps st st'
-> m ()
sendMessage :: forall (st :: ps) (st' :: ps).
(StateTokenI st, ActiveState st) =>
WeHaveAgencyProof pr st -> Message ps st st' -> m ()
sendMessage !WeHaveAgencyProof pr st
_ Message ps st st'
msg = do
bytes -> m ()
send (Message ps st st' -> bytes
forall (st :: ps) (st' :: ps).
(StateTokenI st, ActiveState st) =>
Message ps st st' -> bytes
encode Message ps st st'
msg)
Tracer m (TraceSendRecv ps) -> TraceSendRecv ps -> m ()
forall (m :: * -> *) a. Tracer m a -> a -> m ()
traceWith Tracer m (TraceSendRecv ps)
tracer (AnyMessage ps -> TraceSendRecv ps
forall ps. AnyMessage ps -> TraceSendRecv ps
TraceSendMsg (Message ps st st' -> AnyMessage ps
forall ps (st :: ps) (st' :: ps).
(StateTokenI st, ActiveState st) =>
Message ps st st' -> AnyMessage ps
AnyMessage Message ps st st'
msg))
recvMessage :: forall (st :: ps).
StateTokenI st
=> ActiveState st
=> TheyHaveAgencyProof pr st
-> Maybe bytes
-> m (SomeMessage st, Maybe bytes)
recvMessage :: forall (st :: ps).
(StateTokenI st, ActiveState st) =>
TheyHaveAgencyProof pr st
-> Maybe bytes -> m (SomeMessage st, Maybe bytes)
recvMessage !TheyHaveAgencyProof pr st
_ Maybe bytes
trailing = do
let tok :: StateToken st
tok = StateToken st
forall {ps} (st :: ps). StateTokenI st => StateToken st
stateToken
decoder <- StateToken st -> m (DecodeStep bytes failure m (SomeMessage st))
forall (st :: ps).
ActiveState st =>
StateToken st -> m (DecodeStep bytes failure m (SomeMessage st))
decode StateToken st
tok
let sizeLimit = forall (st :: ps). ActiveState st => StateToken st -> Word
sizeLimitForState @st StateToken st
forall {ps} (st :: ps). StateTokenI st => StateToken st
stateToken
timeLimit = DiffTime -> Maybe DiffTime -> DiffTime
forall a. a -> Maybe a -> a
fromMaybe (-DiffTime
1) (forall (st :: ps).
ActiveState st =>
StateToken st -> Maybe DiffTime
timeLimitForState @st StateToken st
forall {ps} (st :: ps). StateTokenI st => StateToken st
stateToken)
result <- timeoutFn timeLimit $
runDecoderWithLimit sizeLimit dataSize
channel trailing decoder
case result of
Just (Right x :: (SomeMessage st, Maybe bytes)
x@(SomeMessage Message ps st st'
msg, Maybe bytes
_trailing')) -> do
Tracer m (TraceSendRecv ps) -> TraceSendRecv ps -> m ()
forall (m :: * -> *) a. Tracer m a -> a -> m ()
traceWith Tracer m (TraceSendRecv ps)
tracer (AnyMessage ps -> TraceSendRecv ps
forall ps. AnyMessage ps -> TraceSendRecv ps
TraceRecvMsg (Message ps st st' -> AnyMessage ps
forall ps (st :: ps) (st' :: ps).
(StateTokenI st, ActiveState st) =>
Message ps st st' -> AnyMessage ps
AnyMessage Message ps st st'
msg))
(SomeMessage st, Maybe bytes) -> m (SomeMessage st, Maybe bytes)
forall a. a -> m a
forall (m :: * -> *) a. Monad m => a -> m a
return (SomeMessage st, Maybe bytes)
x
Just (Left (Just failure
failure)) -> DecoderFailure -> m (SomeMessage st, Maybe bytes)
forall e a. Exception e => e -> m a
forall (m :: * -> *) e a. (MonadThrow m, Exception e) => e -> m a
throwIO (StateToken st -> failure -> DecoderFailure
forall ps (st :: ps) failure.
(Show failure, Show (StateToken st), ShowProxy ps,
ActiveState st) =>
StateToken st -> failure -> DecoderFailure
DecoderFailure StateToken st
tok failure
failure)
Just (Left Maybe failure
Nothing) -> ProtocolLimitFailure -> m (SomeMessage st, Maybe bytes)
forall e a. Exception e => e -> m a
forall (m :: * -> *) e a. (MonadThrow m, Exception e) => e -> m a
throwIO (StateToken st -> ProtocolLimitFailure
forall ps (st :: ps).
(Show (StateToken st), ShowProxy ps, ActiveState st) =>
StateToken st -> ProtocolLimitFailure
ExceededSizeLimit StateToken st
tok)
Maybe (Either (Maybe failure) (SomeMessage st, Maybe bytes))
Nothing -> ProtocolLimitFailure -> m (SomeMessage st, Maybe bytes)
forall e a. Exception e => e -> m a
forall (m :: * -> *) e a. (MonadThrow m, Exception e) => e -> m a
throwIO (StateToken st -> ProtocolLimitFailure
forall ps (st :: ps).
(Show (StateToken st), ShowProxy ps, ActiveState st) =>
StateToken st -> ProtocolLimitFailure
ExceededTimeLimit StateToken st
tok)
runDecoderWithLimit
:: forall m bytes failure a. Monad m
=> Word
-> (bytes -> Word)
-> Channel m bytes
-> Maybe bytes
-> DecodeStep bytes failure m a
-> m (Either (Maybe failure) (a, Maybe bytes))
runDecoderWithLimit :: forall (m :: * -> *) bytes failure a.
Monad m =>
Word
-> (bytes -> Word)
-> Channel m bytes
-> Maybe bytes
-> DecodeStep bytes failure m a
-> m (Either (Maybe failure) (a, Maybe bytes))
runDecoderWithLimit Word
limit bytes -> Word
size Channel{m (Maybe bytes)
recv :: m (Maybe bytes)
recv :: forall (m :: * -> *) a. Channel m a -> m (Maybe a)
recv} =
Word
-> Maybe bytes
-> DecodeStep bytes failure m a
-> m (Either (Maybe failure) (a, Maybe bytes))
go Word
0
where
go :: Word
-> Maybe bytes
-> DecodeStep bytes failure m a
-> m (Either (Maybe failure) (a, Maybe bytes))
go :: Word
-> Maybe bytes
-> DecodeStep bytes failure m a
-> m (Either (Maybe failure) (a, Maybe bytes))
go !Word
sz !Maybe bytes
_ (DecodeDone a
x Maybe bytes
trailing)
| let sz' :: Word
sz' = Word
sz Word -> Word -> Word
forall a. Num a => a -> a -> a
- Word -> (bytes -> Word) -> Maybe bytes -> Word
forall b a. b -> (a -> b) -> Maybe a -> b
maybe Word
0 bytes -> Word
size Maybe bytes
trailing
, Word
sz' Word -> Word -> Bool
forall a. Ord a => a -> a -> Bool
> Word
limit = Either (Maybe failure) (a, Maybe bytes)
-> m (Either (Maybe failure) (a, Maybe bytes))
forall a. a -> m a
forall (m :: * -> *) a. Monad m => a -> m a
return (Maybe failure -> Either (Maybe failure) (a, Maybe bytes)
forall a b. a -> Either a b
Left Maybe failure
forall a. Maybe a
Nothing)
| Bool
otherwise = Either (Maybe failure) (a, Maybe bytes)
-> m (Either (Maybe failure) (a, Maybe bytes))
forall a. a -> m a
forall (m :: * -> *) a. Monad m => a -> m a
return ((a, Maybe bytes) -> Either (Maybe failure) (a, Maybe bytes)
forall a b. b -> Either a b
Right (a
x, Maybe bytes
trailing))
go !Word
_ !Maybe bytes
_ (DecodeFail failure
failure) = Either (Maybe failure) (a, Maybe bytes)
-> m (Either (Maybe failure) (a, Maybe bytes))
forall a. a -> m a
forall (m :: * -> *) a. Monad m => a -> m a
return (Maybe failure -> Either (Maybe failure) (a, Maybe bytes)
forall a b. a -> Either a b
Left (failure -> Maybe failure
forall a. a -> Maybe a
Just failure
failure))
go !Word
sz Maybe bytes
trailing (DecodePartial Maybe bytes -> m (DecodeStep bytes failure m a)
k)
| Word
sz Word -> Word -> Bool
forall a. Ord a => a -> a -> Bool
> Word
limit = Either (Maybe failure) (a, Maybe bytes)
-> m (Either (Maybe failure) (a, Maybe bytes))
forall a. a -> m a
forall (m :: * -> *) a. Monad m => a -> m a
return (Maybe failure -> Either (Maybe failure) (a, Maybe bytes)
forall a b. a -> Either a b
Left Maybe failure
forall a. Maybe a
Nothing)
| Bool
otherwise = case Maybe bytes
trailing of
Maybe bytes
Nothing -> do mbs <- m (Maybe bytes)
recv
let !sz' = Word
sz Word -> Word -> Word
forall a. Num a => a -> a -> a
+ Word -> (bytes -> Word) -> Maybe bytes -> Word
forall b a. b -> (a -> b) -> Maybe a -> b
maybe Word
0 bytes -> Word
size Maybe bytes
mbs
go sz' Nothing =<< k mbs
Just bytes
bs -> do let sz' :: Word
sz' = Word
sz Word -> Word -> Word
forall a. Num a => a -> a -> a
+ bytes -> Word
size bytes
bs
Word
-> Maybe bytes
-> DecodeStep bytes failure m a
-> m (Either (Maybe failure) (a, Maybe bytes))
go Word
sz' Maybe bytes
forall a. Maybe a
Nothing (DecodeStep bytes failure m a
-> m (Either (Maybe failure) (a, Maybe bytes)))
-> m (DecodeStep bytes failure m a)
-> m (Either (Maybe failure) (a, Maybe bytes))
forall (m :: * -> *) a b. Monad m => (a -> m b) -> m a -> m b
=<< Maybe bytes -> m (DecodeStep bytes failure m a)
k (bytes -> Maybe bytes
forall a. a -> Maybe a
Just bytes
bs)
runPeerWithLimits
:: forall ps (st :: ps) pr failure bytes m a .
( MonadAsync m
, MonadFork m
, MonadMask m
, MonadThrow (STM m)
, MonadTimer m
, ShowProxy ps
, forall (st' :: ps) stok. stok ~ StateToken st' => Show stok
, Show failure
)
=> Tracer m (TraceSendRecv ps)
-> Codec ps failure m bytes
-> ProtocolSizeLimits ps bytes
-> ProtocolTimeLimits ps
-> Channel m bytes
-> Peer ps pr NonPipelined st m a
-> m (a, Maybe bytes)
runPeerWithLimits :: forall ps (st :: ps) (pr :: PeerRole) failure bytes (m :: * -> *)
a.
(MonadAsync m, MonadFork m, MonadMask m, MonadThrow (STM m),
MonadTimer m, ShowProxy ps,
forall (st' :: ps) stok. (stok ~ StateToken st') => Show stok,
Show failure) =>
Tracer m (TraceSendRecv ps)
-> Codec ps failure m bytes
-> ProtocolSizeLimits ps bytes
-> ProtocolTimeLimits ps
-> Channel m bytes
-> Peer ps pr 'NonPipelined st m a
-> m (a, Maybe bytes)
runPeerWithLimits Tracer m (TraceSendRecv ps)
tracer Codec ps failure m bytes
codec ProtocolSizeLimits ps bytes
slimits ProtocolTimeLimits ps
tlimits Channel m bytes
channel Peer ps pr 'NonPipelined st m a
peer =
(TimeoutFn m -> m (a, Maybe bytes)) -> m (a, Maybe bytes)
forall (m :: * -> *) b.
(MonadAsync m, MonadFork m, MonadMonotonicTime m, MonadTimer m,
MonadMask m, MonadThrow (STM m)) =>
(TimeoutFn m -> m b) -> m b
withTimeoutSerial ((TimeoutFn m -> m (a, Maybe bytes)) -> m (a, Maybe bytes))
-> (TimeoutFn m -> m (a, Maybe bytes)) -> m (a, Maybe bytes)
forall a b. (a -> b) -> a -> b
$ \TimeoutFn m
timeoutFn ->
let driver :: Driver ps pr (Maybe bytes) m
driver = Tracer m (TraceSendRecv ps)
-> TimeoutFn m
-> Codec ps failure m bytes
-> ProtocolSizeLimits ps bytes
-> ProtocolTimeLimits ps
-> Channel m bytes
-> Driver ps pr (Maybe bytes) m
forall ps (pr :: PeerRole) failure bytes (m :: * -> *).
(MonadThrow m, ShowProxy ps,
forall (st' :: ps) tok. (tok ~ StateToken st') => Show tok,
Show failure) =>
Tracer m (TraceSendRecv ps)
-> TimeoutFn m
-> Codec ps failure m bytes
-> ProtocolSizeLimits ps bytes
-> ProtocolTimeLimits ps
-> Channel m bytes
-> Driver ps pr (Maybe bytes) m
driverWithLimits Tracer m (TraceSendRecv ps)
tracer DiffTime -> m a -> m (Maybe a)
TimeoutFn m
timeoutFn Codec ps failure m bytes
codec ProtocolSizeLimits ps bytes
slimits ProtocolTimeLimits ps
tlimits Channel m bytes
channel
in Driver ps pr (Maybe bytes) m
-> Peer ps pr 'NonPipelined st m a -> m (a, Maybe bytes)
forall ps (st :: ps) (pr :: PeerRole) dstate (m :: * -> *) a.
Monad m =>
Driver ps pr dstate m
-> Peer ps pr 'NonPipelined st m a -> m (a, dstate)
runPeerWithDriver Driver ps pr (Maybe bytes) m
driver Peer ps pr 'NonPipelined st m a
peer
runPipelinedPeerWithLimits
:: forall ps (st :: ps) pr failure bytes m a.
( MonadAsync m
, MonadFork m
, MonadMask m
, MonadTimer m
, MonadThrow (STM m)
, ShowProxy ps
, forall (st' :: ps) stok. stok ~ StateToken st' => Show stok
, Show failure
)
=> Tracer m (TraceSendRecv ps)
-> Codec ps failure m bytes
-> ProtocolSizeLimits ps bytes
-> ProtocolTimeLimits ps
-> Channel m bytes
-> PeerPipelined ps pr st m a
-> m (a, Maybe bytes)
runPipelinedPeerWithLimits :: forall ps (st :: ps) (pr :: PeerRole) failure bytes (m :: * -> *)
a.
(MonadAsync m, MonadFork m, MonadMask m, MonadTimer m,
MonadThrow (STM m), ShowProxy ps,
forall (st' :: ps) stok. (stok ~ StateToken st') => Show stok,
Show failure) =>
Tracer m (TraceSendRecv ps)
-> Codec ps failure m bytes
-> ProtocolSizeLimits ps bytes
-> ProtocolTimeLimits ps
-> Channel m bytes
-> PeerPipelined ps pr st m a
-> m (a, Maybe bytes)
runPipelinedPeerWithLimits Tracer m (TraceSendRecv ps)
tracer Codec ps failure m bytes
codec ProtocolSizeLimits ps bytes
slimits ProtocolTimeLimits ps
tlimits Channel m bytes
channel PeerPipelined ps pr st m a
peer =
(TimeoutFn m -> m (a, Maybe bytes)) -> m (a, Maybe bytes)
forall (m :: * -> *) b.
(MonadAsync m, MonadFork m, MonadMonotonicTime m, MonadTimer m,
MonadMask m, MonadThrow (STM m)) =>
(TimeoutFn m -> m b) -> m b
withTimeoutSerial ((TimeoutFn m -> m (a, Maybe bytes)) -> m (a, Maybe bytes))
-> (TimeoutFn m -> m (a, Maybe bytes)) -> m (a, Maybe bytes)
forall a b. (a -> b) -> a -> b
$ \TimeoutFn m
timeoutFn ->
let driver :: Driver ps pr (Maybe bytes) m
driver = Tracer m (TraceSendRecv ps)
-> TimeoutFn m
-> Codec ps failure m bytes
-> ProtocolSizeLimits ps bytes
-> ProtocolTimeLimits ps
-> Channel m bytes
-> Driver ps pr (Maybe bytes) m
forall ps (pr :: PeerRole) failure bytes (m :: * -> *).
(MonadThrow m, ShowProxy ps,
forall (st' :: ps) tok. (tok ~ StateToken st') => Show tok,
Show failure) =>
Tracer m (TraceSendRecv ps)
-> TimeoutFn m
-> Codec ps failure m bytes
-> ProtocolSizeLimits ps bytes
-> ProtocolTimeLimits ps
-> Channel m bytes
-> Driver ps pr (Maybe bytes) m
driverWithLimits Tracer m (TraceSendRecv ps)
tracer DiffTime -> m a -> m (Maybe a)
TimeoutFn m
timeoutFn Codec ps failure m bytes
codec ProtocolSizeLimits ps bytes
slimits ProtocolTimeLimits ps
tlimits Channel m bytes
channel
in Driver ps pr (Maybe bytes) m
-> PeerPipelined ps pr st m a -> m (a, Maybe bytes)
forall ps (st :: ps) (pr :: PeerRole) dstate (m :: * -> *) a.
MonadAsync m =>
Driver ps pr dstate m
-> PeerPipelined ps pr st m a -> m (a, dstate)
runPipelinedPeerWithDriver Driver ps pr (Maybe bytes) m
driver PeerPipelined ps pr st m a
peer
runConnectedPeersWithLimits
:: forall ps pr st failure bytes m a b.
( MonadAsync m
, MonadFork m
, MonadMask m
, MonadTimer m
, MonadThrow (STM m)
, Exception failure
, ShowProxy ps
, forall (st' :: ps) sing. sing ~ StateToken st' => Show sing
)
=> m (Channel m bytes, Channel m bytes)
-> Tracer m (Role, TraceSendRecv ps)
-> Codec ps failure m bytes
-> ProtocolSizeLimits ps bytes
-> ProtocolTimeLimits ps
-> Peer ps pr NonPipelined st m a
-> Peer ps (FlipAgency pr) NonPipelined st m b
-> m (a, b)
runConnectedPeersWithLimits :: forall ps (pr :: PeerRole) (st :: ps) failure bytes (m :: * -> *) a
b.
(MonadAsync m, MonadFork m, MonadMask m, MonadTimer m,
MonadThrow (STM m), Exception failure, ShowProxy ps,
forall (st' :: ps) sing. (sing ~ StateToken st') => Show sing) =>
m (Channel m bytes, Channel m bytes)
-> Tracer m (Role, TraceSendRecv ps)
-> Codec ps failure m bytes
-> ProtocolSizeLimits ps bytes
-> ProtocolTimeLimits ps
-> Peer ps pr 'NonPipelined st m a
-> Peer ps (FlipAgency pr) 'NonPipelined st m b
-> m (a, b)
runConnectedPeersWithLimits m (Channel m bytes, Channel m bytes)
createChannels Tracer m (Role, TraceSendRecv ps)
tracer Codec ps failure m bytes
codec ProtocolSizeLimits ps bytes
slimits ProtocolTimeLimits ps
tlimits Peer ps pr 'NonPipelined st m a
client Peer ps (FlipAgency pr) 'NonPipelined st m b
server =
m (Channel m bytes, Channel m bytes)
createChannels m (Channel m bytes, Channel m bytes)
-> ((Channel m bytes, Channel m bytes) -> m (a, b)) -> m (a, b)
forall a b. m a -> (a -> m b) -> m b
forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= \(Channel m bytes
clientChannel, Channel m bytes
serverChannel) ->
(do String -> m ()
forall (m :: * -> *). MonadThread m => String -> m ()
labelThisThread String
"client"
(a, Maybe bytes) -> a
forall a b. (a, b) -> a
fst ((a, Maybe bytes) -> a) -> m (a, Maybe bytes) -> m a
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> Tracer m (TraceSendRecv ps)
-> Codec ps failure m bytes
-> ProtocolSizeLimits ps bytes
-> ProtocolTimeLimits ps
-> Channel m bytes
-> Peer ps pr 'NonPipelined st m a
-> m (a, Maybe bytes)
forall ps (st :: ps) (pr :: PeerRole) failure bytes (m :: * -> *)
a.
(MonadAsync m, MonadFork m, MonadMask m, MonadThrow (STM m),
MonadTimer m, ShowProxy ps,
forall (st' :: ps) stok. (stok ~ StateToken st') => Show stok,
Show failure) =>
Tracer m (TraceSendRecv ps)
-> Codec ps failure m bytes
-> ProtocolSizeLimits ps bytes
-> ProtocolTimeLimits ps
-> Channel m bytes
-> Peer ps pr 'NonPipelined st m a
-> m (a, Maybe bytes)
runPeerWithLimits
Tracer m (TraceSendRecv ps)
tracerClient Codec ps failure m bytes
codec ProtocolSizeLimits ps bytes
slimits ProtocolTimeLimits ps
tlimits
Channel m bytes
clientChannel Peer ps pr 'NonPipelined st m a
client)
m a -> m b -> m (a, b)
forall a b. m a -> m b -> m (a, b)
forall (m :: * -> *) a b. MonadAsync m => m a -> m b -> m (a, b)
`concurrently`
(do String -> m ()
forall (m :: * -> *). MonadThread m => String -> m ()
labelThisThread String
"server"
(b, Maybe bytes) -> b
forall a b. (a, b) -> a
fst ((b, Maybe bytes) -> b) -> m (b, Maybe bytes) -> m b
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> Tracer m (TraceSendRecv ps)
-> Codec ps failure m bytes
-> Channel m bytes
-> Peer ps (FlipAgency pr) 'NonPipelined st m b
-> m (b, Maybe bytes)
forall ps (st :: ps) (pr :: PeerRole) failure bytes (m :: * -> *)
a.
(MonadThrow m, ShowProxy ps,
forall (st' :: ps) stok. (stok ~ StateToken st') => Show stok,
Show failure) =>
Tracer m (TraceSendRecv ps)
-> Codec ps failure m bytes
-> Channel m bytes
-> Peer ps pr 'NonPipelined st m a
-> m (a, Maybe bytes)
runPeer Tracer m (TraceSendRecv ps)
tracerServer Codec ps failure m bytes
codec Channel m bytes
serverChannel Peer ps (FlipAgency pr) 'NonPipelined st m b
server)
where
tracerClient :: Tracer m (TraceSendRecv ps)
tracerClient = (TraceSendRecv ps -> (Role, TraceSendRecv ps))
-> Tracer m (Role, TraceSendRecv ps) -> Tracer m (TraceSendRecv ps)
forall a' a. (a' -> a) -> Tracer m a -> Tracer m a'
forall (f :: * -> *) a' a.
Contravariant f =>
(a' -> a) -> f a -> f a'
contramap ((,) Role
Client) Tracer m (Role, TraceSendRecv ps)
tracer
tracerServer :: Tracer m (TraceSendRecv ps)
tracerServer = (TraceSendRecv ps -> (Role, TraceSendRecv ps))
-> Tracer m (Role, TraceSendRecv ps) -> Tracer m (TraceSendRecv ps)
forall a' a. (a' -> a) -> Tracer m a -> Tracer m a'
forall (f :: * -> *) a' a.
Contravariant f =>
(a' -> a) -> f a -> f a'
contramap ((,) Role
Server) Tracer m (Role, TraceSendRecv ps)
tracer
runConnectedPipelinedPeersWithLimits
:: forall ps pr st failure bytes m a b.
( MonadAsync m
, MonadFork m
, MonadMask m
, MonadTimer m
, MonadThrow (STM m)
, Exception failure
, ShowProxy ps
, forall (st' :: ps) sing. sing ~ StateToken st' => Show sing
)
=> m (Channel m bytes, Channel m bytes)
-> Tracer m (Role, TraceSendRecv ps)
-> Codec ps failure m bytes
-> ProtocolSizeLimits ps bytes
-> ProtocolTimeLimits ps
-> PeerPipelined ps pr st m a
-> Peer ps (FlipAgency pr) NonPipelined st m b
-> m (a, b)
runConnectedPipelinedPeersWithLimits :: forall ps (pr :: PeerRole) (st :: ps) failure bytes (m :: * -> *) a
b.
(MonadAsync m, MonadFork m, MonadMask m, MonadTimer m,
MonadThrow (STM m), Exception failure, ShowProxy ps,
forall (st' :: ps) sing. (sing ~ StateToken st') => Show sing) =>
m (Channel m bytes, Channel m bytes)
-> Tracer m (Role, TraceSendRecv ps)
-> Codec ps failure m bytes
-> ProtocolSizeLimits ps bytes
-> ProtocolTimeLimits ps
-> PeerPipelined ps pr st m a
-> Peer ps (FlipAgency pr) 'NonPipelined st m b
-> m (a, b)
runConnectedPipelinedPeersWithLimits m (Channel m bytes, Channel m bytes)
createChannels Tracer m (Role, TraceSendRecv ps)
tracer Codec ps failure m bytes
codec ProtocolSizeLimits ps bytes
slimits ProtocolTimeLimits ps
tlimits PeerPipelined ps pr st m a
client Peer ps (FlipAgency pr) 'NonPipelined st m b
server =
m (Channel m bytes, Channel m bytes)
createChannels m (Channel m bytes, Channel m bytes)
-> ((Channel m bytes, Channel m bytes) -> m (a, b)) -> m (a, b)
forall a b. m a -> (a -> m b) -> m b
forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= \(Channel m bytes
clientChannel, Channel m bytes
serverChannel) ->
((a, Maybe bytes) -> a
forall a b. (a, b) -> a
fst ((a, Maybe bytes) -> a) -> m (a, Maybe bytes) -> m a
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> Tracer m (TraceSendRecv ps)
-> Codec ps failure m bytes
-> ProtocolSizeLimits ps bytes
-> ProtocolTimeLimits ps
-> Channel m bytes
-> PeerPipelined ps pr st m a
-> m (a, Maybe bytes)
forall ps (st :: ps) (pr :: PeerRole) failure bytes (m :: * -> *)
a.
(MonadAsync m, MonadFork m, MonadMask m, MonadTimer m,
MonadThrow (STM m), ShowProxy ps,
forall (st' :: ps) stok. (stok ~ StateToken st') => Show stok,
Show failure) =>
Tracer m (TraceSendRecv ps)
-> Codec ps failure m bytes
-> ProtocolSizeLimits ps bytes
-> ProtocolTimeLimits ps
-> Channel m bytes
-> PeerPipelined ps pr st m a
-> m (a, Maybe bytes)
runPipelinedPeerWithLimits
Tracer m (TraceSendRecv ps)
tracerClient Codec ps failure m bytes
codec ProtocolSizeLimits ps bytes
slimits ProtocolTimeLimits ps
tlimits
Channel m bytes
clientChannel PeerPipelined ps pr st m a
client)
m a -> m b -> m (a, b)
forall a b. m a -> m b -> m (a, b)
forall (m :: * -> *) a b. MonadAsync m => m a -> m b -> m (a, b)
`concurrently`
((b, Maybe bytes) -> b
forall a b. (a, b) -> a
fst ((b, Maybe bytes) -> b) -> m (b, Maybe bytes) -> m b
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> Tracer m (TraceSendRecv ps)
-> Codec ps failure m bytes
-> Channel m bytes
-> Peer ps (FlipAgency pr) 'NonPipelined st m b
-> m (b, Maybe bytes)
forall ps (st :: ps) (pr :: PeerRole) failure bytes (m :: * -> *)
a.
(MonadThrow m, ShowProxy ps,
forall (st' :: ps) stok. (stok ~ StateToken st') => Show stok,
Show failure) =>
Tracer m (TraceSendRecv ps)
-> Codec ps failure m bytes
-> Channel m bytes
-> Peer ps pr 'NonPipelined st m a
-> m (a, Maybe bytes)
runPeer Tracer m (TraceSendRecv ps)
tracerServer Codec ps failure m bytes
codec Channel m bytes
serverChannel Peer ps (FlipAgency pr) 'NonPipelined st m b
server)
where
tracerClient :: Tracer m (TraceSendRecv ps)
tracerClient = (TraceSendRecv ps -> (Role, TraceSendRecv ps))
-> Tracer m (Role, TraceSendRecv ps) -> Tracer m (TraceSendRecv ps)
forall a' a. (a' -> a) -> Tracer m a -> Tracer m a'
forall (f :: * -> *) a' a.
Contravariant f =>
(a' -> a) -> f a -> f a'
contramap ((,) Role
Client) Tracer m (Role, TraceSendRecv ps)
tracer
tracerServer :: Tracer m (TraceSendRecv ps)
tracerServer = (TraceSendRecv ps -> (Role, TraceSendRecv ps))
-> Tracer m (Role, TraceSendRecv ps) -> Tracer m (TraceSendRecv ps)
forall a' a. (a' -> a) -> Tracer m a -> Tracer m a'
forall (f :: * -> *) a' a.
Contravariant f =>
(a' -> a) -> f a -> f a'
contramap ((,) Role
Server) Tracer m (Role, TraceSendRecv ps)
tracer