{-# LANGUAGE BangPatterns          #-}
{-# LANGUAGE DataKinds             #-}
{-# LANGUAGE FlexibleContexts      #-}
{-# LANGUAGE GADTs                 #-}
{-# LANGUAGE NamedFieldPuns        #-}
{-# LANGUAGE PolyKinds             #-}
{-# LANGUAGE QuantifiedConstraints #-}
{-# LANGUAGE RankNTypes            #-}
{-# LANGUAGE ScopedTypeVariables   #-}
{-# LANGUAGE TypeApplications      #-}
{-# LANGUAGE TypeFamilies          #-}
{-# LANGUAGE TypeOperators         #-}
{-# LANGUAGE UndecidableInstances  #-}

-- | Drivers for running 'Peer's.
--
module Ouroboros.Network.Driver.Limits
  ( -- * Limits
    ProtocolSizeLimits (..)
  , ProtocolTimeLimits (..)
  , ProtocolLimitFailure (..)
    -- * Normal peers
  , runPeerWithLimits
  , runPipelinedPeerWithLimits
  , TraceSendRecv (..)
    -- * Driver utilities
  , driverWithLimits
  , runConnectedPeersWithLimits
  , runConnectedPipelinedPeersWithLimits
  ) where

import Data.Maybe (fromMaybe)

import Control.Monad.Class.MonadAsync
import Control.Monad.Class.MonadFork
import Control.Monad.Class.MonadSTM
import Control.Monad.Class.MonadThrow
import Control.Monad.Class.MonadTimer.SI
import Control.Tracer (Tracer (..), contramap, traceWith)

import Network.Mux.Timeout
import Network.TypedProtocol.Codec
import Network.TypedProtocol.Core
import Network.TypedProtocol.Driver
import Network.TypedProtocol.Peer

import Ouroboros.Network.Channel
import Ouroboros.Network.Driver.Simple
import Ouroboros.Network.Protocol.Limits
import Ouroboros.Network.Util.ShowProxy


driverWithLimits :: forall ps (pr :: PeerRole) failure bytes m.
                    ( MonadThrow m
                    , ShowProxy ps
                    , forall (st' :: ps) tok. tok ~ StateToken st' => Show tok
                    , Show failure
                    )
                 => Tracer m (TraceSendRecv ps)
                 -> TimeoutFn m
                 -> Codec ps failure m bytes
                 -> ProtocolSizeLimits ps bytes
                 -> ProtocolTimeLimits ps
                 -> Channel m bytes
                 -> Driver ps pr (Maybe bytes) m
driverWithLimits :: forall ps (pr :: PeerRole) failure bytes (m :: * -> *).
(MonadThrow m, ShowProxy ps,
 forall (st' :: ps) tok. (tok ~ StateToken st') => Show tok,
 Show failure) =>
Tracer m (TraceSendRecv ps)
-> TimeoutFn m
-> Codec ps failure m bytes
-> ProtocolSizeLimits ps bytes
-> ProtocolTimeLimits ps
-> Channel m bytes
-> Driver ps pr (Maybe bytes) m
driverWithLimits Tracer m (TraceSendRecv ps)
tracer TimeoutFn m
timeoutFn
                 Codec{forall (st :: ps) (st' :: ps).
(StateTokenI st, ActiveState st) =>
Message ps st st' -> bytes
encode :: forall (st :: ps) (st' :: ps).
(StateTokenI st, ActiveState st) =>
Message ps st st' -> bytes
encode :: forall ps failure (m :: * -> *) bytes.
Codec ps failure m bytes
-> forall (st :: ps) (st' :: ps).
   (StateTokenI st, ActiveState st) =>
   Message ps st st' -> bytes
encode, forall (st :: ps).
ActiveState st =>
StateToken st -> m (DecodeStep bytes failure m (SomeMessage st))
decode :: forall (st :: ps).
ActiveState st =>
StateToken st -> m (DecodeStep bytes failure m (SomeMessage st))
decode :: forall ps failure (m :: * -> *) bytes.
Codec ps failure m bytes
-> forall (st :: ps).
   ActiveState st =>
   StateToken st -> m (DecodeStep bytes failure m (SomeMessage st))
decode}
                 ProtocolSizeLimits{forall (st :: ps). ActiveState st => StateToken st -> Word
sizeLimitForState :: forall (st :: ps). ActiveState st => StateToken st -> Word
sizeLimitForState :: forall ps bytes.
ProtocolSizeLimits ps bytes
-> forall (st :: ps). ActiveState st => StateToken st -> Word
sizeLimitForState, bytes -> Word
dataSize :: bytes -> Word
dataSize :: forall ps bytes. ProtocolSizeLimits ps bytes -> bytes -> Word
dataSize}
                 ProtocolTimeLimits{forall (st :: ps).
ActiveState st =>
StateToken st -> Maybe DiffTime
timeLimitForState :: forall (st :: ps).
ActiveState st =>
StateToken st -> Maybe DiffTime
timeLimitForState :: forall ps.
ProtocolTimeLimits ps
-> forall (st :: ps).
   ActiveState st =>
   StateToken st -> Maybe DiffTime
timeLimitForState}
                 channel :: Channel m bytes
channel@Channel{bytes -> m ()
send :: bytes -> m ()
send :: forall (m :: * -> *) a. Channel m a -> a -> m ()
send} =
    Driver { WeHaveAgencyProof pr st -> Message ps st st' -> m ()
forall (st :: ps) (st' :: ps).
(StateTokenI st, ActiveState st) =>
WeHaveAgencyProof pr st -> Message ps st st' -> m ()
forall (st :: ps) (st' :: ps).
(StateTokenI st, StateTokenI st', ActiveState st) =>
WeHaveAgencyProof pr st -> Message ps st st' -> m ()
sendMessage :: forall (st :: ps) (st' :: ps).
(StateTokenI st, ActiveState st) =>
WeHaveAgencyProof pr st -> Message ps st st' -> m ()
sendMessage :: forall (st :: ps) (st' :: ps).
(StateTokenI st, StateTokenI st', ActiveState st) =>
WeHaveAgencyProof pr st -> Message ps st st' -> m ()
sendMessage, TheyHaveAgencyProof pr st
-> Maybe bytes -> m (SomeMessage st, Maybe bytes)
forall (st :: ps).
(StateTokenI st, ActiveState st) =>
TheyHaveAgencyProof pr st
-> Maybe bytes -> m (SomeMessage st, Maybe bytes)
recvMessage :: forall (st :: ps).
(StateTokenI st, ActiveState st) =>
TheyHaveAgencyProof pr st
-> Maybe bytes -> m (SomeMessage st, Maybe bytes)
recvMessage :: forall (st :: ps).
(StateTokenI st, ActiveState st) =>
TheyHaveAgencyProof pr st
-> Maybe bytes -> m (SomeMessage st, Maybe bytes)
recvMessage, initialDState :: Maybe bytes
initialDState = Maybe bytes
forall a. Maybe a
Nothing }
  where
    sendMessage :: forall (st :: ps) (st' :: ps).
                   StateTokenI st
                => ActiveState st
                => WeHaveAgencyProof pr st
                -> Message ps st st'
                -> m ()
    sendMessage :: forall (st :: ps) (st' :: ps).
(StateTokenI st, ActiveState st) =>
WeHaveAgencyProof pr st -> Message ps st st' -> m ()
sendMessage !WeHaveAgencyProof pr st
_ Message ps st st'
msg = do
      bytes -> m ()
send (Message ps st st' -> bytes
forall (st :: ps) (st' :: ps).
(StateTokenI st, ActiveState st) =>
Message ps st st' -> bytes
encode Message ps st st'
msg)
      Tracer m (TraceSendRecv ps) -> TraceSendRecv ps -> m ()
forall (m :: * -> *) a. Tracer m a -> a -> m ()
traceWith Tracer m (TraceSendRecv ps)
tracer (AnyMessage ps -> TraceSendRecv ps
forall ps. AnyMessage ps -> TraceSendRecv ps
TraceSendMsg (Message ps st st' -> AnyMessage ps
forall ps (st :: ps) (st' :: ps).
(StateTokenI st, ActiveState st) =>
Message ps st st' -> AnyMessage ps
AnyMessage Message ps st st'
msg))


    recvMessage :: forall (st :: ps).
                   StateTokenI st
                => ActiveState st
                => TheyHaveAgencyProof pr st
                -> Maybe bytes
                -> m (SomeMessage st, Maybe bytes)
    recvMessage :: forall (st :: ps).
(StateTokenI st, ActiveState st) =>
TheyHaveAgencyProof pr st
-> Maybe bytes -> m (SomeMessage st, Maybe bytes)
recvMessage !TheyHaveAgencyProof pr st
_ Maybe bytes
trailing = do
      let tok :: StateToken st
tok = StateToken st
forall {ps} (st :: ps). StateTokenI st => StateToken st
stateToken
      decoder <- StateToken st -> m (DecodeStep bytes failure m (SomeMessage st))
forall (st :: ps).
ActiveState st =>
StateToken st -> m (DecodeStep bytes failure m (SomeMessage st))
decode StateToken st
tok
      let sizeLimit = forall (st :: ps). ActiveState st => StateToken st -> Word
sizeLimitForState @st StateToken st
forall {ps} (st :: ps). StateTokenI st => StateToken st
stateToken
          timeLimit = DiffTime -> Maybe DiffTime -> DiffTime
forall a. a -> Maybe a -> a
fromMaybe (-DiffTime
1) (forall (st :: ps).
ActiveState st =>
StateToken st -> Maybe DiffTime
timeLimitForState @st StateToken st
forall {ps} (st :: ps). StateTokenI st => StateToken st
stateToken)
      result  <- timeoutFn timeLimit $
                   runDecoderWithLimit sizeLimit dataSize
                                       channel trailing decoder

      case result of
        Just (Right x :: (SomeMessage st, Maybe bytes)
x@(SomeMessage Message ps st st'
msg, Maybe bytes
_trailing')) -> do
          Tracer m (TraceSendRecv ps) -> TraceSendRecv ps -> m ()
forall (m :: * -> *) a. Tracer m a -> a -> m ()
traceWith Tracer m (TraceSendRecv ps)
tracer (AnyMessage ps -> TraceSendRecv ps
forall ps. AnyMessage ps -> TraceSendRecv ps
TraceRecvMsg (Message ps st st' -> AnyMessage ps
forall ps (st :: ps) (st' :: ps).
(StateTokenI st, ActiveState st) =>
Message ps st st' -> AnyMessage ps
AnyMessage Message ps st st'
msg))
          (SomeMessage st, Maybe bytes) -> m (SomeMessage st, Maybe bytes)
forall a. a -> m a
forall (m :: * -> *) a. Monad m => a -> m a
return (SomeMessage st, Maybe bytes)
x
        Just (Left (Just failure
failure)) -> DecoderFailure -> m (SomeMessage st, Maybe bytes)
forall e a. Exception e => e -> m a
forall (m :: * -> *) e a. (MonadThrow m, Exception e) => e -> m a
throwIO (StateToken st -> failure -> DecoderFailure
forall ps (st :: ps) failure.
(Show failure, Show (StateToken st), ShowProxy ps,
 ActiveState st) =>
StateToken st -> failure -> DecoderFailure
DecoderFailure StateToken st
tok failure
failure)
        Just (Left Maybe failure
Nothing)        -> ProtocolLimitFailure -> m (SomeMessage st, Maybe bytes)
forall e a. Exception e => e -> m a
forall (m :: * -> *) e a. (MonadThrow m, Exception e) => e -> m a
throwIO (StateToken st -> ProtocolLimitFailure
forall ps (st :: ps).
(Show (StateToken st), ShowProxy ps, ActiveState st) =>
StateToken st -> ProtocolLimitFailure
ExceededSizeLimit StateToken st
tok)
        Maybe (Either (Maybe failure) (SomeMessage st, Maybe bytes))
Nothing                    -> ProtocolLimitFailure -> m (SomeMessage st, Maybe bytes)
forall e a. Exception e => e -> m a
forall (m :: * -> *) e a. (MonadThrow m, Exception e) => e -> m a
throwIO (StateToken st -> ProtocolLimitFailure
forall ps (st :: ps).
(Show (StateToken st), ShowProxy ps, ActiveState st) =>
StateToken st -> ProtocolLimitFailure
ExceededTimeLimit StateToken st
tok)


runDecoderWithLimit
    :: forall m bytes failure a. Monad m
    => Word
    -- ^ message size limit
    -> (bytes -> Word)
    -- ^ byte size
    -> Channel m bytes
    -> Maybe bytes
    -> DecodeStep bytes failure m a
    -> m (Either (Maybe failure) (a, Maybe bytes))
runDecoderWithLimit :: forall (m :: * -> *) bytes failure a.
Monad m =>
Word
-> (bytes -> Word)
-> Channel m bytes
-> Maybe bytes
-> DecodeStep bytes failure m a
-> m (Either (Maybe failure) (a, Maybe bytes))
runDecoderWithLimit Word
limit bytes -> Word
size Channel{m (Maybe bytes)
recv :: m (Maybe bytes)
recv :: forall (m :: * -> *) a. Channel m a -> m (Maybe a)
recv} =
    Word
-> Maybe bytes
-> DecodeStep bytes failure m a
-> m (Either (Maybe failure) (a, Maybe bytes))
go Word
0
  where
    -- Our strategy here is as follows...
    --
    -- We of course want to enforce the maximum data limit, but we also want to
    -- detect and report when we exceed the limit rather than having it be
    -- misclassified as a generic decode error. For example if we simply limited
    -- the decoder input to the maximum size then the failure would be reported
    -- as an unexpected end of input, rather than that the size limit was
    -- exceeded.
    --
    -- So our strategy is to allow the last chunk of input to exceed the limit.
    -- This leaves just one special case: if the decoder finishes with that
    -- final chunk, we must check if it consumed too much of the final chunk.
    --
    go :: Word        -- ^ size of consumed input so far
       -> Maybe bytes -- ^ any trailing data
       -> DecodeStep bytes failure m a
       -> m (Either (Maybe failure) (a, Maybe bytes))

    go :: Word
-> Maybe bytes
-> DecodeStep bytes failure m a
-> m (Either (Maybe failure) (a, Maybe bytes))
go !Word
sz !Maybe bytes
_ (DecodeDone a
x Maybe bytes
trailing)
      | let sz' :: Word
sz' = Word
sz Word -> Word -> Word
forall a. Num a => a -> a -> a
- Word -> (bytes -> Word) -> Maybe bytes -> Word
forall b a. b -> (a -> b) -> Maybe a -> b
maybe Word
0 bytes -> Word
size Maybe bytes
trailing
      , Word
sz' Word -> Word -> Bool
forall a. Ord a => a -> a -> Bool
> Word
limit = Either (Maybe failure) (a, Maybe bytes)
-> m (Either (Maybe failure) (a, Maybe bytes))
forall a. a -> m a
forall (m :: * -> *) a. Monad m => a -> m a
return (Maybe failure -> Either (Maybe failure) (a, Maybe bytes)
forall a b. a -> Either a b
Left Maybe failure
forall a. Maybe a
Nothing)
      | Bool
otherwise   = Either (Maybe failure) (a, Maybe bytes)
-> m (Either (Maybe failure) (a, Maybe bytes))
forall a. a -> m a
forall (m :: * -> *) a. Monad m => a -> m a
return ((a, Maybe bytes) -> Either (Maybe failure) (a, Maybe bytes)
forall a b. b -> Either a b
Right (a
x, Maybe bytes
trailing))

    go !Word
_ !Maybe bytes
_ (DecodeFail failure
failure) = Either (Maybe failure) (a, Maybe bytes)
-> m (Either (Maybe failure) (a, Maybe bytes))
forall a. a -> m a
forall (m :: * -> *) a. Monad m => a -> m a
return (Maybe failure -> Either (Maybe failure) (a, Maybe bytes)
forall a b. a -> Either a b
Left (failure -> Maybe failure
forall a. a -> Maybe a
Just failure
failure))

    go !Word
sz Maybe bytes
trailing (DecodePartial Maybe bytes -> m (DecodeStep bytes failure m a)
k)
      | Word
sz Word -> Word -> Bool
forall a. Ord a => a -> a -> Bool
> Word
limit = Either (Maybe failure) (a, Maybe bytes)
-> m (Either (Maybe failure) (a, Maybe bytes))
forall a. a -> m a
forall (m :: * -> *) a. Monad m => a -> m a
return (Maybe failure -> Either (Maybe failure) (a, Maybe bytes)
forall a b. a -> Either a b
Left Maybe failure
forall a. Maybe a
Nothing)
      | Bool
otherwise  = case Maybe bytes
trailing of
                       Maybe bytes
Nothing -> do mbs <- m (Maybe bytes)
recv
                                     let !sz' = Word
sz Word -> Word -> Word
forall a. Num a => a -> a -> a
+ Word -> (bytes -> Word) -> Maybe bytes -> Word
forall b a. b -> (a -> b) -> Maybe a -> b
maybe Word
0 bytes -> Word
size Maybe bytes
mbs
                                     go sz' Nothing =<< k mbs
                       Just bytes
bs -> do let sz' :: Word
sz' = Word
sz Word -> Word -> Word
forall a. Num a => a -> a -> a
+ bytes -> Word
size bytes
bs
                                     Word
-> Maybe bytes
-> DecodeStep bytes failure m a
-> m (Either (Maybe failure) (a, Maybe bytes))
go Word
sz' Maybe bytes
forall a. Maybe a
Nothing (DecodeStep bytes failure m a
 -> m (Either (Maybe failure) (a, Maybe bytes)))
-> m (DecodeStep bytes failure m a)
-> m (Either (Maybe failure) (a, Maybe bytes))
forall (m :: * -> *) a b. Monad m => (a -> m b) -> m a -> m b
=<< Maybe bytes -> m (DecodeStep bytes failure m a)
k (bytes -> Maybe bytes
forall a. a -> Maybe a
Just bytes
bs)


runPeerWithLimits
  :: forall ps (st :: ps) pr failure bytes m a .
     ( MonadAsync m
     , MonadFork m
     , MonadMask m
     , MonadThrow (STM m)
     , MonadTimer m
     , ShowProxy ps
     , forall (st' :: ps) stok. stok ~ StateToken st' => Show stok
     , Show failure
     )
  => Tracer m (TraceSendRecv ps)
  -> Codec ps failure m bytes
  -> ProtocolSizeLimits ps bytes
  -> ProtocolTimeLimits ps
  -> Channel m bytes
  -> Peer ps pr NonPipelined st m a
  -> m (a, Maybe bytes)
runPeerWithLimits :: forall ps (st :: ps) (pr :: PeerRole) failure bytes (m :: * -> *)
       a.
(MonadAsync m, MonadFork m, MonadMask m, MonadThrow (STM m),
 MonadTimer m, ShowProxy ps,
 forall (st' :: ps) stok. (stok ~ StateToken st') => Show stok,
 Show failure) =>
Tracer m (TraceSendRecv ps)
-> Codec ps failure m bytes
-> ProtocolSizeLimits ps bytes
-> ProtocolTimeLimits ps
-> Channel m bytes
-> Peer ps pr 'NonPipelined st m a
-> m (a, Maybe bytes)
runPeerWithLimits Tracer m (TraceSendRecv ps)
tracer Codec ps failure m bytes
codec ProtocolSizeLimits ps bytes
slimits ProtocolTimeLimits ps
tlimits Channel m bytes
channel Peer ps pr 'NonPipelined st m a
peer =
    (TimeoutFn m -> m (a, Maybe bytes)) -> m (a, Maybe bytes)
forall (m :: * -> *) b.
(MonadAsync m, MonadFork m, MonadMonotonicTime m, MonadTimer m,
 MonadMask m, MonadThrow (STM m)) =>
(TimeoutFn m -> m b) -> m b
withTimeoutSerial ((TimeoutFn m -> m (a, Maybe bytes)) -> m (a, Maybe bytes))
-> (TimeoutFn m -> m (a, Maybe bytes)) -> m (a, Maybe bytes)
forall a b. (a -> b) -> a -> b
$ \TimeoutFn m
timeoutFn ->
      let driver :: Driver ps pr (Maybe bytes) m
driver = Tracer m (TraceSendRecv ps)
-> TimeoutFn m
-> Codec ps failure m bytes
-> ProtocolSizeLimits ps bytes
-> ProtocolTimeLimits ps
-> Channel m bytes
-> Driver ps pr (Maybe bytes) m
forall ps (pr :: PeerRole) failure bytes (m :: * -> *).
(MonadThrow m, ShowProxy ps,
 forall (st' :: ps) tok. (tok ~ StateToken st') => Show tok,
 Show failure) =>
Tracer m (TraceSendRecv ps)
-> TimeoutFn m
-> Codec ps failure m bytes
-> ProtocolSizeLimits ps bytes
-> ProtocolTimeLimits ps
-> Channel m bytes
-> Driver ps pr (Maybe bytes) m
driverWithLimits Tracer m (TraceSendRecv ps)
tracer DiffTime -> m a -> m (Maybe a)
TimeoutFn m
timeoutFn Codec ps failure m bytes
codec ProtocolSizeLimits ps bytes
slimits ProtocolTimeLimits ps
tlimits Channel m bytes
channel
      in Driver ps pr (Maybe bytes) m
-> Peer ps pr 'NonPipelined st m a -> m (a, Maybe bytes)
forall ps (st :: ps) (pr :: PeerRole) dstate (m :: * -> *) a.
Monad m =>
Driver ps pr dstate m
-> Peer ps pr 'NonPipelined st m a -> m (a, dstate)
runPeerWithDriver Driver ps pr (Maybe bytes) m
driver Peer ps pr 'NonPipelined st m a
peer
-- | Run a pipelined peer with the given channel via the given codec.
--
-- This runs the peer to completion (if the protocol allows for termination).
--
-- Unlike normal peers, running pipelined peers rely on concurrency, hence the
-- 'MonadAsync' constraint.
--
runPipelinedPeerWithLimits
  :: forall ps (st :: ps) pr failure bytes m a.
     ( MonadAsync m
     , MonadFork m
     , MonadMask m
     , MonadTimer m
     , MonadThrow (STM m)
     , ShowProxy ps
     , forall (st' :: ps) stok. stok ~ StateToken st' => Show stok
     , Show failure
     )
  => Tracer m (TraceSendRecv ps)
  -> Codec ps failure m bytes
  -> ProtocolSizeLimits ps bytes
  -> ProtocolTimeLimits ps
  -> Channel m bytes
  -> PeerPipelined ps pr st m a
  -> m (a, Maybe bytes)
runPipelinedPeerWithLimits :: forall ps (st :: ps) (pr :: PeerRole) failure bytes (m :: * -> *)
       a.
(MonadAsync m, MonadFork m, MonadMask m, MonadTimer m,
 MonadThrow (STM m), ShowProxy ps,
 forall (st' :: ps) stok. (stok ~ StateToken st') => Show stok,
 Show failure) =>
Tracer m (TraceSendRecv ps)
-> Codec ps failure m bytes
-> ProtocolSizeLimits ps bytes
-> ProtocolTimeLimits ps
-> Channel m bytes
-> PeerPipelined ps pr st m a
-> m (a, Maybe bytes)
runPipelinedPeerWithLimits Tracer m (TraceSendRecv ps)
tracer Codec ps failure m bytes
codec ProtocolSizeLimits ps bytes
slimits ProtocolTimeLimits ps
tlimits Channel m bytes
channel PeerPipelined ps pr st m a
peer =
    (TimeoutFn m -> m (a, Maybe bytes)) -> m (a, Maybe bytes)
forall (m :: * -> *) b.
(MonadAsync m, MonadFork m, MonadMonotonicTime m, MonadTimer m,
 MonadMask m, MonadThrow (STM m)) =>
(TimeoutFn m -> m b) -> m b
withTimeoutSerial ((TimeoutFn m -> m (a, Maybe bytes)) -> m (a, Maybe bytes))
-> (TimeoutFn m -> m (a, Maybe bytes)) -> m (a, Maybe bytes)
forall a b. (a -> b) -> a -> b
$ \TimeoutFn m
timeoutFn ->
      let driver :: Driver ps pr (Maybe bytes) m
driver = Tracer m (TraceSendRecv ps)
-> TimeoutFn m
-> Codec ps failure m bytes
-> ProtocolSizeLimits ps bytes
-> ProtocolTimeLimits ps
-> Channel m bytes
-> Driver ps pr (Maybe bytes) m
forall ps (pr :: PeerRole) failure bytes (m :: * -> *).
(MonadThrow m, ShowProxy ps,
 forall (st' :: ps) tok. (tok ~ StateToken st') => Show tok,
 Show failure) =>
Tracer m (TraceSendRecv ps)
-> TimeoutFn m
-> Codec ps failure m bytes
-> ProtocolSizeLimits ps bytes
-> ProtocolTimeLimits ps
-> Channel m bytes
-> Driver ps pr (Maybe bytes) m
driverWithLimits Tracer m (TraceSendRecv ps)
tracer DiffTime -> m a -> m (Maybe a)
TimeoutFn m
timeoutFn Codec ps failure m bytes
codec ProtocolSizeLimits ps bytes
slimits ProtocolTimeLimits ps
tlimits Channel m bytes
channel
      in Driver ps pr (Maybe bytes) m
-> PeerPipelined ps pr st m a -> m (a, Maybe bytes)
forall ps (st :: ps) (pr :: PeerRole) dstate (m :: * -> *) a.
MonadAsync m =>
Driver ps pr dstate m
-> PeerPipelined ps pr st m a -> m (a, dstate)
runPipelinedPeerWithDriver Driver ps pr (Maybe bytes) m
driver PeerPipelined ps pr st m a
peer


-- | Run two 'Peer's via a pair of connected 'Channel's and a common 'Codec'.
-- The client side is using 'driverWithLimits'.
--
-- This is useful for tests and quick experiments.
--
-- The first argument is expected to create two channels that are connected,
-- for example 'createConnectedChannels'.
--
runConnectedPeersWithLimits
  :: forall ps pr st failure bytes m a b.
     ( MonadAsync       m
     , MonadFork        m
     , MonadMask        m
     , MonadTimer       m
     , MonadThrow  (STM m)
     , Exception failure
     , ShowProxy ps
     , forall (st' :: ps) sing. sing ~ StateToken st' => Show sing
     )
  => m (Channel m bytes, Channel m bytes)
  -> Tracer m (Role, TraceSendRecv ps)
  -> Codec ps failure m bytes
  -> ProtocolSizeLimits ps bytes
  -> ProtocolTimeLimits ps
  -> Peer ps             pr  NonPipelined st m a
  -> Peer ps (FlipAgency pr) NonPipelined st m b
  -> m (a, b)
runConnectedPeersWithLimits :: forall ps (pr :: PeerRole) (st :: ps) failure bytes (m :: * -> *) a
       b.
(MonadAsync m, MonadFork m, MonadMask m, MonadTimer m,
 MonadThrow (STM m), Exception failure, ShowProxy ps,
 forall (st' :: ps) sing. (sing ~ StateToken st') => Show sing) =>
m (Channel m bytes, Channel m bytes)
-> Tracer m (Role, TraceSendRecv ps)
-> Codec ps failure m bytes
-> ProtocolSizeLimits ps bytes
-> ProtocolTimeLimits ps
-> Peer ps pr 'NonPipelined st m a
-> Peer ps (FlipAgency pr) 'NonPipelined st m b
-> m (a, b)
runConnectedPeersWithLimits m (Channel m bytes, Channel m bytes)
createChannels Tracer m (Role, TraceSendRecv ps)
tracer Codec ps failure m bytes
codec ProtocolSizeLimits ps bytes
slimits ProtocolTimeLimits ps
tlimits Peer ps pr 'NonPipelined st m a
client Peer ps (FlipAgency pr) 'NonPipelined st m b
server =
    m (Channel m bytes, Channel m bytes)
createChannels m (Channel m bytes, Channel m bytes)
-> ((Channel m bytes, Channel m bytes) -> m (a, b)) -> m (a, b)
forall a b. m a -> (a -> m b) -> m b
forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= \(Channel m bytes
clientChannel, Channel m bytes
serverChannel) ->

    (do String -> m ()
forall (m :: * -> *). MonadThread m => String -> m ()
labelThisThread String
"client"
        (a, Maybe bytes) -> a
forall a b. (a, b) -> a
fst ((a, Maybe bytes) -> a) -> m (a, Maybe bytes) -> m a
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> Tracer m (TraceSendRecv ps)
-> Codec ps failure m bytes
-> ProtocolSizeLimits ps bytes
-> ProtocolTimeLimits ps
-> Channel m bytes
-> Peer ps pr 'NonPipelined st m a
-> m (a, Maybe bytes)
forall ps (st :: ps) (pr :: PeerRole) failure bytes (m :: * -> *)
       a.
(MonadAsync m, MonadFork m, MonadMask m, MonadThrow (STM m),
 MonadTimer m, ShowProxy ps,
 forall (st' :: ps) stok. (stok ~ StateToken st') => Show stok,
 Show failure) =>
Tracer m (TraceSendRecv ps)
-> Codec ps failure m bytes
-> ProtocolSizeLimits ps bytes
-> ProtocolTimeLimits ps
-> Channel m bytes
-> Peer ps pr 'NonPipelined st m a
-> m (a, Maybe bytes)
runPeerWithLimits
                        Tracer m (TraceSendRecv ps)
tracerClient Codec ps failure m bytes
codec ProtocolSizeLimits ps bytes
slimits ProtocolTimeLimits ps
tlimits
                                     Channel m bytes
clientChannel Peer ps pr 'NonPipelined st m a
client)
      m a -> m b -> m (a, b)
forall a b. m a -> m b -> m (a, b)
forall (m :: * -> *) a b. MonadAsync m => m a -> m b -> m (a, b)
`concurrently`
    (do String -> m ()
forall (m :: * -> *). MonadThread m => String -> m ()
labelThisThread String
"server"
        (b, Maybe bytes) -> b
forall a b. (a, b) -> a
fst ((b, Maybe bytes) -> b) -> m (b, Maybe bytes) -> m b
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> Tracer m (TraceSendRecv ps)
-> Codec ps failure m bytes
-> Channel m bytes
-> Peer ps (FlipAgency pr) 'NonPipelined st m b
-> m (b, Maybe bytes)
forall ps (st :: ps) (pr :: PeerRole) failure bytes (m :: * -> *)
       a.
(MonadThrow m, ShowProxy ps,
 forall (st' :: ps) stok. (stok ~ StateToken st') => Show stok,
 Show failure) =>
Tracer m (TraceSendRecv ps)
-> Codec ps failure m bytes
-> Channel m bytes
-> Peer ps pr 'NonPipelined st m a
-> m (a, Maybe bytes)
runPeer Tracer m (TraceSendRecv ps)
tracerServer Codec ps failure m bytes
codec Channel m bytes
serverChannel Peer ps (FlipAgency pr) 'NonPipelined st m b
server)
  where
    tracerClient :: Tracer m (TraceSendRecv ps)
tracerClient = (TraceSendRecv ps -> (Role, TraceSendRecv ps))
-> Tracer m (Role, TraceSendRecv ps) -> Tracer m (TraceSendRecv ps)
forall a' a. (a' -> a) -> Tracer m a -> Tracer m a'
forall (f :: * -> *) a' a.
Contravariant f =>
(a' -> a) -> f a -> f a'
contramap ((,) Role
Client) Tracer m (Role, TraceSendRecv ps)
tracer
    tracerServer :: Tracer m (TraceSendRecv ps)
tracerServer = (TraceSendRecv ps -> (Role, TraceSendRecv ps))
-> Tracer m (Role, TraceSendRecv ps) -> Tracer m (TraceSendRecv ps)
forall a' a. (a' -> a) -> Tracer m a -> Tracer m a'
forall (f :: * -> *) a' a.
Contravariant f =>
(a' -> a) -> f a -> f a'
contramap ((,) Role
Server) Tracer m (Role, TraceSendRecv ps)
tracer


-- | Run two 'Peer's via a pair of connected 'Channel's and a common 'Codec'.
-- The client side is using 'driverWithLimits'.
--
-- This is useful for tests and quick experiments.
--
-- The first argument is expected to create two channels that are connected,
-- for example 'createConnectedChannels'.
--
runConnectedPipelinedPeersWithLimits
  :: forall ps pr st failure bytes m a b.
     ( MonadAsync      m
     , MonadFork       m
     , MonadMask       m
     , MonadTimer      m
     , MonadThrow (STM m)
     , Exception failure
     , ShowProxy ps
     , forall (st' :: ps) sing. sing ~ StateToken st' => Show sing
     )
  => m (Channel m bytes, Channel m bytes)
  -> Tracer m (Role, TraceSendRecv ps)
  -> Codec ps failure m bytes
  -> ProtocolSizeLimits ps bytes
  -> ProtocolTimeLimits ps
  -> PeerPipelined ps    pr               st m a
  -> Peer ps (FlipAgency pr) NonPipelined st m b
  -> m (a, b)
runConnectedPipelinedPeersWithLimits :: forall ps (pr :: PeerRole) (st :: ps) failure bytes (m :: * -> *) a
       b.
(MonadAsync m, MonadFork m, MonadMask m, MonadTimer m,
 MonadThrow (STM m), Exception failure, ShowProxy ps,
 forall (st' :: ps) sing. (sing ~ StateToken st') => Show sing) =>
m (Channel m bytes, Channel m bytes)
-> Tracer m (Role, TraceSendRecv ps)
-> Codec ps failure m bytes
-> ProtocolSizeLimits ps bytes
-> ProtocolTimeLimits ps
-> PeerPipelined ps pr st m a
-> Peer ps (FlipAgency pr) 'NonPipelined st m b
-> m (a, b)
runConnectedPipelinedPeersWithLimits m (Channel m bytes, Channel m bytes)
createChannels Tracer m (Role, TraceSendRecv ps)
tracer Codec ps failure m bytes
codec ProtocolSizeLimits ps bytes
slimits ProtocolTimeLimits ps
tlimits PeerPipelined ps pr st m a
client Peer ps (FlipAgency pr) 'NonPipelined st m b
server =
    m (Channel m bytes, Channel m bytes)
createChannels m (Channel m bytes, Channel m bytes)
-> ((Channel m bytes, Channel m bytes) -> m (a, b)) -> m (a, b)
forall a b. m a -> (a -> m b) -> m b
forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= \(Channel m bytes
clientChannel, Channel m bytes
serverChannel) ->

    ((a, Maybe bytes) -> a
forall a b. (a, b) -> a
fst ((a, Maybe bytes) -> a) -> m (a, Maybe bytes) -> m a
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> Tracer m (TraceSendRecv ps)
-> Codec ps failure m bytes
-> ProtocolSizeLimits ps bytes
-> ProtocolTimeLimits ps
-> Channel m bytes
-> PeerPipelined ps pr st m a
-> m (a, Maybe bytes)
forall ps (st :: ps) (pr :: PeerRole) failure bytes (m :: * -> *)
       a.
(MonadAsync m, MonadFork m, MonadMask m, MonadTimer m,
 MonadThrow (STM m), ShowProxy ps,
 forall (st' :: ps) stok. (stok ~ StateToken st') => Show stok,
 Show failure) =>
Tracer m (TraceSendRecv ps)
-> Codec ps failure m bytes
-> ProtocolSizeLimits ps bytes
-> ProtocolTimeLimits ps
-> Channel m bytes
-> PeerPipelined ps pr st m a
-> m (a, Maybe bytes)
runPipelinedPeerWithLimits
                     Tracer m (TraceSendRecv ps)
tracerClient Codec ps failure m bytes
codec ProtocolSizeLimits ps bytes
slimits ProtocolTimeLimits ps
tlimits
                                        Channel m bytes
clientChannel PeerPipelined ps pr st m a
client)
      m a -> m b -> m (a, b)
forall a b. m a -> m b -> m (a, b)
forall (m :: * -> *) a b. MonadAsync m => m a -> m b -> m (a, b)
`concurrently`
    ((b, Maybe bytes) -> b
forall a b. (a, b) -> a
fst ((b, Maybe bytes) -> b) -> m (b, Maybe bytes) -> m b
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> Tracer m (TraceSendRecv ps)
-> Codec ps failure m bytes
-> Channel m bytes
-> Peer ps (FlipAgency pr) 'NonPipelined st m b
-> m (b, Maybe bytes)
forall ps (st :: ps) (pr :: PeerRole) failure bytes (m :: * -> *)
       a.
(MonadThrow m, ShowProxy ps,
 forall (st' :: ps) stok. (stok ~ StateToken st') => Show stok,
 Show failure) =>
Tracer m (TraceSendRecv ps)
-> Codec ps failure m bytes
-> Channel m bytes
-> Peer ps pr 'NonPipelined st m a
-> m (a, Maybe bytes)
runPeer Tracer m (TraceSendRecv ps)
tracerServer Codec ps failure m bytes
codec Channel m bytes
serverChannel Peer ps (FlipAgency pr) 'NonPipelined st m b
server)
  where
    tracerClient :: Tracer m (TraceSendRecv ps)
tracerClient = (TraceSendRecv ps -> (Role, TraceSendRecv ps))
-> Tracer m (Role, TraceSendRecv ps) -> Tracer m (TraceSendRecv ps)
forall a' a. (a' -> a) -> Tracer m a -> Tracer m a'
forall (f :: * -> *) a' a.
Contravariant f =>
(a' -> a) -> f a -> f a'
contramap ((,) Role
Client) Tracer m (Role, TraceSendRecv ps)
tracer
    tracerServer :: Tracer m (TraceSendRecv ps)
tracerServer = (TraceSendRecv ps -> (Role, TraceSendRecv ps))
-> Tracer m (Role, TraceSendRecv ps) -> Tracer m (TraceSendRecv ps)
forall a' a. (a' -> a) -> Tracer m a -> Tracer m a'
forall (f :: * -> *) a' a.
Contravariant f =>
(a' -> a) -> f a -> f a'
contramap ((,) Role
Server) Tracer m (Role, TraceSendRecv ps)
tracer