{-# LANGUAGE GADTs               #-}
{-# LANGUAGE NamedFieldPuns      #-}
{-# LANGUAGE ScopedTypeVariables #-}
module Ouroboros.Network.Protocol.BlockFetch.Examples where

import Control.Monad (unless)
import Data.Functor (($>))
import Data.Maybe (fromMaybe)
import Pipes qualified

import Control.Concurrent.Class.MonadSTM.Strict

import Network.TypedProtocol.Pipelined

import Ouroboros.Network.Mock.Chain (Chain, HasHeader, Point)
import Ouroboros.Network.Mock.Chain qualified as Chain

import Ouroboros.Network.Protocol.BlockFetch.Client
import Ouroboros.Network.Protocol.BlockFetch.Server
import Ouroboros.Network.Protocol.BlockFetch.Type (ChainRange (..))

constantBlockFetchReceiver
  :: Functor m
  => (block -> m ())   -- ^ handle block
  -> m ()              -- ^ handle `MsgBatchDone`
  -> BlockFetchReceiver block m
constantBlockFetchReceiver :: forall (m :: * -> *) block.
Functor m =>
(block -> m ()) -> m () -> BlockFetchReceiver block m
constantBlockFetchReceiver block -> m ()
onBlock m ()
handleBatchDone =
  BlockFetchReceiver {
    handleBlock :: block -> m (BlockFetchReceiver block m)
handleBlock = \block
block -> block -> m ()
onBlock block
block m ()
-> BlockFetchReceiver block m -> m (BlockFetchReceiver block m)
forall (f :: * -> *) a b. Functor f => f a -> b -> f b
$>
                  (block -> m ()) -> m () -> BlockFetchReceiver block m
forall (m :: * -> *) block.
Functor m =>
(block -> m ()) -> m () -> BlockFetchReceiver block m
constantBlockFetchReceiver block -> m ()
onBlock m ()
handleBatchDone,
    m ()
handleBatchDone :: m ()
handleBatchDone :: m ()
handleBatchDone
  }

-- | A @'BlockFetchClient'@ designed for testing, which accumulates incoming
-- blocks in a @'StrictTVar'@, which is read on termination.
--
-- Returns a list of bodies received from the server, from the newest to
-- oldest.
--
blockFetchClientMap
  :: forall block point m.
     MonadSTM m
  => [ChainRange point]
  -> BlockFetchClient block point m [block]
blockFetchClientMap :: forall block point (m :: * -> *).
MonadSTM m =>
[ChainRange point] -> BlockFetchClient block point m [block]
blockFetchClientMap [ChainRange point]
ranges = m (BlockFetchRequest block point m [block])
-> BlockFetchClient block point m [block]
forall block point (m :: * -> *) a.
m (BlockFetchRequest block point m a)
-> BlockFetchClient block point m a
BlockFetchClient (m (BlockFetchRequest block point m [block])
 -> BlockFetchClient block point m [block])
-> m (BlockFetchRequest block point m [block])
-> BlockFetchClient block point m [block]
forall a b. (a -> b) -> a -> b
$ do
  var <- [block] -> m (StrictTVar m [block])
forall (m :: * -> *) a. MonadSTM m => a -> m (StrictTVar m a)
newTVarIO []
  donevar <- newTVarIO (length ranges)
  let blockFetchResponse = BlockFetchResponse {
        handleStartBatch :: m (BlockFetchReceiver block m)
handleStartBatch =
          BlockFetchReceiver block m -> m (BlockFetchReceiver block m)
forall a. a -> m a
forall (m :: * -> *) a. Monad m => a -> m a
return (BlockFetchReceiver block m -> m (BlockFetchReceiver block m))
-> BlockFetchReceiver block m -> m (BlockFetchReceiver block m)
forall a b. (a -> b) -> a -> b
$ (block -> m ()) -> m () -> BlockFetchReceiver block m
forall (m :: * -> *) block.
Functor m =>
(block -> m ()) -> m () -> BlockFetchReceiver block m
constantBlockFetchReceiver
            (\block
block -> STM m () -> m ()
forall a. HasCallStack => STM m a -> m a
forall (m :: * -> *) a.
(MonadSTM m, HasCallStack) =>
STM m a -> m a
atomically (StrictTVar m [block] -> ([block] -> [block]) -> STM m ()
forall (m :: * -> *) a.
MonadSTM m =>
StrictTVar m a -> (a -> a) -> STM m ()
modifyTVar StrictTVar m [block]
var (block
block block -> [block] -> [block]
forall a. a -> [a] -> [a]
:)))
            (STM m () -> m ()
forall a. HasCallStack => STM m a -> m a
forall (m :: * -> *) a.
(MonadSTM m, HasCallStack) =>
STM m a -> m a
atomically (StrictTVar m Int -> (Int -> Int) -> STM m ()
forall (m :: * -> *) a.
MonadSTM m =>
StrictTVar m a -> (a -> a) -> STM m ()
modifyTVar StrictTVar m Int
donevar Int -> Int
forall a. Enum a => a -> a
pred)),
        handleNoBlocks :: m ()
handleNoBlocks = do
          STM m () -> m ()
forall a. HasCallStack => STM m a -> m a
forall (m :: * -> *) a.
(MonadSTM m, HasCallStack) =>
STM m a -> m a
atomically (STM m () -> m ()) -> STM m () -> m ()
forall a b. (a -> b) -> a -> b
$ StrictTVar m Int -> (Int -> Int) -> STM m ()
forall (m :: * -> *) a.
MonadSTM m =>
StrictTVar m a -> (a -> a) -> STM m ()
modifyTVar StrictTVar m Int
donevar Int -> Int
forall a. Enum a => a -> a
pred
          () -> m ()
forall a. a -> m a
forall (m :: * -> *) a. Monad m => a -> m a
return ()
      }
  goBlockFetch donevar var ranges blockFetchResponse
 where
  goBlockFetch
    :: StrictTVar m Int
    -> StrictTVar m [block]
    -> [ChainRange point]
    -> BlockFetchResponse block m [block]
    -> m (BlockFetchRequest block point m [block])

  goBlockFetch :: StrictTVar m Int
-> StrictTVar m [block]
-> [ChainRange point]
-> BlockFetchResponse block m [block]
-> m (BlockFetchRequest block point m [block])
goBlockFetch StrictTVar m Int
donevar StrictTVar m [block]
var []       BlockFetchResponse block m [block]
_response = do
    -- wait for all responses to be fulfilled
    STM m () -> m ()
forall a. HasCallStack => STM m a -> m a
forall (m :: * -> *) a.
(MonadSTM m, HasCallStack) =>
STM m a -> m a
atomically (STM m () -> m ()) -> STM m () -> m ()
forall a b. (a -> b) -> a -> b
$ do
      x <- StrictTVar m Int -> STM m Int
forall (m :: * -> *) a. MonadSTM m => StrictTVar m a -> STM m a
readTVar StrictTVar m Int
donevar
      unless (x <= 0) retry
    [block] -> BlockFetchRequest block point m [block]
forall a block point (m :: * -> *).
a -> BlockFetchRequest block point m a
SendMsgClientDone ([block] -> BlockFetchRequest block point m [block])
-> m [block] -> m (BlockFetchRequest block point m [block])
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> STM m [block] -> m [block]
forall a. HasCallStack => STM m a -> m a
forall (m :: * -> *) a.
(MonadSTM m, HasCallStack) =>
STM m a -> m a
atomically (StrictTVar m [block] -> STM m [block]
forall (m :: * -> *) a. MonadSTM m => StrictTVar m a -> STM m a
readTVar StrictTVar m [block]
var)

  goBlockFetch StrictTVar m Int
donevar StrictTVar m [block]
var (ChainRange point
r : [ChainRange point]
rs) BlockFetchResponse block m [block]
response  =
    BlockFetchRequest block point m [block]
-> m (BlockFetchRequest block point m [block])
forall a. a -> m a
forall (m :: * -> *) a. Monad m => a -> m a
return (BlockFetchRequest block point m [block]
 -> m (BlockFetchRequest block point m [block]))
-> BlockFetchRequest block point m [block]
-> m (BlockFetchRequest block point m [block])
forall a b. (a -> b) -> a -> b
$ ChainRange point
-> BlockFetchResponse block m [block]
-> BlockFetchClient block point m [block]
-> BlockFetchRequest block point m [block]
forall point block (m :: * -> *) a.
ChainRange point
-> BlockFetchResponse block m a
-> BlockFetchClient block point m a
-> BlockFetchRequest block point m a
SendMsgRequestRange ChainRange point
r BlockFetchResponse block m [block]
response (m (BlockFetchRequest block point m [block])
-> BlockFetchClient block point m [block]
forall block point (m :: * -> *) a.
m (BlockFetchRequest block point m a)
-> BlockFetchClient block point m a
BlockFetchClient (m (BlockFetchRequest block point m [block])
 -> BlockFetchClient block point m [block])
-> m (BlockFetchRequest block point m [block])
-> BlockFetchClient block point m [block]
forall a b. (a -> b) -> a -> b
$ StrictTVar m Int
-> StrictTVar m [block]
-> [ChainRange point]
-> BlockFetchResponse block m [block]
-> m (BlockFetchRequest block point m [block])
goBlockFetch StrictTVar m Int
donevar StrictTVar m [block]
var [ChainRange point]
rs BlockFetchResponse block m [block]
response)

--
-- Pipelined clients of the block-fetch protocol
--

-- | A pipelined block-fetch client which sends eagerly a list of requests.
-- This presents maximum pipelining and presents minmimum choice to the
-- environment (drivers).
--
-- It returns the interleaving of `ChainRange point` requests and list of
-- received block bodies in the order from newest to oldest (received block
-- bodies are also ordered in this way).
--
blockFetchClientPipelinedMax
  :: forall block point m.
     Monad m
  => [ChainRange point]
  -> BlockFetchClientPipelined block point m [Either (ChainRange point) [block]]
blockFetchClientPipelinedMax :: forall block point (m :: * -> *).
Monad m =>
[ChainRange point]
-> BlockFetchClientPipelined
     block point m [Either (ChainRange point) [block]]
blockFetchClientPipelinedMax [ChainRange point]
ranges0 =
  BlockFetchSender
  'Z [block] block point m [Either (ChainRange point) [block]]
-> BlockFetchClientPipelined
     block point m [Either (ChainRange point) [block]]
forall c block point (m :: * -> *) a.
BlockFetchSender 'Z c block point m a
-> BlockFetchClientPipelined block point m a
BlockFetchClientPipelined ([Either (ChainRange point) [block]]
-> [ChainRange point]
-> Nat 'Z
-> BlockFetchSender
     'Z [block] block point m [Either (ChainRange point) [block]]
forall (o :: N).
[Either (ChainRange point) [block]]
-> [ChainRange point]
-> Nat o
-> BlockFetchSender
     o [block] block point m [Either (ChainRange point) [block]]
go [] [ChainRange point]
ranges0 Nat 'Z
forall (n :: N). ('Z ~ n) => Nat n
Zero)
 where
  go :: [Either (ChainRange point) [block]] -> [ChainRange point] -> Nat o
     -> BlockFetchSender o [block] block point m [Either (ChainRange point) [block]]
  go :: forall (o :: N).
[Either (ChainRange point) [block]]
-> [ChainRange point]
-> Nat o
-> BlockFetchSender
     o [block] block point m [Either (ChainRange point) [block]]
go [Either (ChainRange point) [block]]
acc (ChainRange point
req : [ChainRange point]
reqs) Nat o
o        = ChainRange point
-> [block]
-> (Maybe block -> [block] -> m [block])
-> BlockFetchSender
     ('S o) [block] block point m [Either (ChainRange point) [block]]
-> BlockFetchSender
     o [block] block point m [Either (ChainRange point) [block]]
forall point c block (m :: * -> *) (n :: N) a.
ChainRange point
-> c
-> (Maybe block -> c -> m c)
-> BlockFetchSender ('S n) c block point m a
-> BlockFetchSender n c block point m a
SendMsgRequestRangePipelined
                                    ChainRange point
req
                                    []
                                    (\Maybe block
mBlock [block]
c -> case Maybe block
mBlock of
                                        Maybe block
Nothing -> [block] -> m [block]
forall a. a -> m a
forall (m :: * -> *) a. Monad m => a -> m a
return [block]
c
                                        Just block
b  -> [block] -> m [block]
forall a. a -> m a
forall (m :: * -> *) a. Monad m => a -> m a
return (block
b block -> [block] -> [block]
forall a. a -> [a] -> [a]
: [block]
c))
                                    ([Either (ChainRange point) [block]]
-> [ChainRange point]
-> Nat ('S o)
-> BlockFetchSender
     ('S o) [block] block point m [Either (ChainRange point) [block]]
forall (o :: N).
[Either (ChainRange point) [block]]
-> [ChainRange point]
-> Nat o
-> BlockFetchSender
     o [block] block point m [Either (ChainRange point) [block]]
go (ChainRange point -> Either (ChainRange point) [block]
forall a b. a -> Either a b
Left ChainRange point
req Either (ChainRange point) [block]
-> [Either (ChainRange point) [block]]
-> [Either (ChainRange point) [block]]
forall a. a -> [a] -> [a]
: [Either (ChainRange point) [block]]
acc) [ChainRange point]
reqs (Nat o -> Nat ('S o)
forall (m :: N) (n :: N). (m ~ 'S n) => Nat n -> Nat m
Succ Nat o
o))
  go [Either (ChainRange point) [block]]
acc []           (Succ Nat n
o) = Maybe
  (BlockFetchSender
     ('S n) [block] block point m [Either (ChainRange point) [block]])
-> ([block]
    -> BlockFetchSender
         n [block] block point m [Either (ChainRange point) [block]])
-> BlockFetchSender
     ('S n) [block] block point m [Either (ChainRange point) [block]]
forall (n1 :: N) c block point (m :: * -> *) a.
Maybe (BlockFetchSender ('S n1) c block point m a)
-> (c -> BlockFetchSender n1 c block point m a)
-> BlockFetchSender ('S n1) c block point m a
CollectBlocksPipelined
                                    Maybe
  (BlockFetchSender
     ('S n) [block] block point m [Either (ChainRange point) [block]])
forall a. Maybe a
Nothing
                                    (\[block]
bs -> [Either (ChainRange point) [block]]
-> [ChainRange point]
-> Nat n
-> BlockFetchSender
     n [block] block point m [Either (ChainRange point) [block]]
forall (o :: N).
[Either (ChainRange point) [block]]
-> [ChainRange point]
-> Nat o
-> BlockFetchSender
     o [block] block point m [Either (ChainRange point) [block]]
go ([block] -> Either (ChainRange point) [block]
forall a b. b -> Either a b
Right [block]
bs Either (ChainRange point) [block]
-> [Either (ChainRange point) [block]]
-> [Either (ChainRange point) [block]]
forall a. a -> [a] -> [a]
: [Either (ChainRange point) [block]]
acc) [] Nat n
o)
  go [Either (ChainRange point) [block]]
acc []           Nat o
Zero     = [Either (ChainRange point) [block]]
-> BlockFetchSender
     'Z [block] block point m [Either (ChainRange point) [block]]
forall a c block point (m :: * -> *).
a -> BlockFetchSender 'Z c block point m a
SendMsgDonePipelined [Either (ChainRange point) [block]]
acc


-- | A pipelined block-fetch client that sends eagerly but always tries to
-- collect any replies as soon as they are available.  This keeps pipelining to
-- bare minimum, and gives maximum choice to the environment (drivers).
--
-- It returns the interleaving of `ChainRange point` requests and list of
-- received block bodies in the order from newest to oldest (received block
-- bodies are also ordered in this way).
--
blockFetchClientPipelinedMin
  :: forall block point m.
     Monad m
  => [ChainRange point]
  -> BlockFetchClientPipelined block point m [Either (ChainRange point) [block]]
blockFetchClientPipelinedMin :: forall block point (m :: * -> *).
Monad m =>
[ChainRange point]
-> BlockFetchClientPipelined
     block point m [Either (ChainRange point) [block]]
blockFetchClientPipelinedMin [ChainRange point]
ranges0 =
  BlockFetchSender
  'Z [block] block point m [Either (ChainRange point) [block]]
-> BlockFetchClientPipelined
     block point m [Either (ChainRange point) [block]]
forall c block point (m :: * -> *) a.
BlockFetchSender 'Z c block point m a
-> BlockFetchClientPipelined block point m a
BlockFetchClientPipelined ([Either (ChainRange point) [block]]
-> [ChainRange point]
-> Nat 'Z
-> BlockFetchSender
     'Z [block] block point m [Either (ChainRange point) [block]]
forall (n :: N).
[Either (ChainRange point) [block]]
-> [ChainRange point]
-> Nat n
-> BlockFetchSender
     n [block] block point m [Either (ChainRange point) [block]]
go [] [ChainRange point]
ranges0 Nat 'Z
forall (n :: N). ('Z ~ n) => Nat n
Zero)
 where
  go :: [Either (ChainRange point) [block]]
     -> [ChainRange point]
     -> Nat n
     -> BlockFetchSender n [block] block point m
                         [Either (ChainRange point) [block]]
  go :: forall (n :: N).
[Either (ChainRange point) [block]]
-> [ChainRange point]
-> Nat n
-> BlockFetchSender
     n [block] block point m [Either (ChainRange point) [block]]
go [Either (ChainRange point) [block]]
acc []           (Succ Nat n
n) = Maybe
  (BlockFetchSender
     ('S n) [block] block point m [Either (ChainRange point) [block]])
-> ([block]
    -> BlockFetchSender
         n [block] block point m [Either (ChainRange point) [block]])
-> BlockFetchSender
     ('S n) [block] block point m [Either (ChainRange point) [block]]
forall (n1 :: N) c block point (m :: * -> *) a.
Maybe (BlockFetchSender ('S n1) c block point m a)
-> (c -> BlockFetchSender n1 c block point m a)
-> BlockFetchSender ('S n1) c block point m a
CollectBlocksPipelined
                                  Maybe
  (BlockFetchSender
     ('S n) [block] block point m [Either (ChainRange point) [block]])
forall a. Maybe a
Nothing
                                  (\[block]
bs -> [Either (ChainRange point) [block]]
-> [ChainRange point]
-> Nat n
-> BlockFetchSender
     n [block] block point m [Either (ChainRange point) [block]]
forall (n :: N).
[Either (ChainRange point) [block]]
-> [ChainRange point]
-> Nat n
-> BlockFetchSender
     n [block] block point m [Either (ChainRange point) [block]]
go ([block] -> Either (ChainRange point) [block]
forall a b. b -> Either a b
Right [block]
bs Either (ChainRange point) [block]
-> [Either (ChainRange point) [block]]
-> [Either (ChainRange point) [block]]
forall a. a -> [a] -> [a]
: [Either (ChainRange point) [block]]
acc) [] Nat n
n)
  go [Either (ChainRange point) [block]]
acc (ChainRange point
req : [ChainRange point]
reqs) (Succ Nat n
n) = Maybe
  (BlockFetchSender
     ('S n) [block] block point m [Either (ChainRange point) [block]])
-> ([block]
    -> BlockFetchSender
         n [block] block point m [Either (ChainRange point) [block]])
-> BlockFetchSender
     ('S n) [block] block point m [Either (ChainRange point) [block]]
forall (n1 :: N) c block point (m :: * -> *) a.
Maybe (BlockFetchSender ('S n1) c block point m a)
-> (c -> BlockFetchSender n1 c block point m a)
-> BlockFetchSender ('S n1) c block point m a
CollectBlocksPipelined
                                  (BlockFetchSender
  ('S n) [block] block point m [Either (ChainRange point) [block]]
-> Maybe
     (BlockFetchSender
        ('S n) [block] block point m [Either (ChainRange point) [block]])
forall a. a -> Maybe a
Just (BlockFetchSender
   ('S n) [block] block point m [Either (ChainRange point) [block]]
 -> Maybe
      (BlockFetchSender
         ('S n) [block] block point m [Either (ChainRange point) [block]]))
-> BlockFetchSender
     ('S n) [block] block point m [Either (ChainRange point) [block]]
-> Maybe
     (BlockFetchSender
        ('S n) [block] block point m [Either (ChainRange point) [block]])
forall a b. (a -> b) -> a -> b
$ [Either (ChainRange point) [block]]
-> ChainRange point
-> [ChainRange point]
-> Nat ('S n)
-> BlockFetchSender
     ('S n) [block] block point m [Either (ChainRange point) [block]]
forall (n :: N).
[Either (ChainRange point) [block]]
-> ChainRange point
-> [ChainRange point]
-> Nat n
-> BlockFetchSender
     n [block] block point m [Either (ChainRange point) [block]]
requestMore [Either (ChainRange point) [block]]
acc ChainRange point
req [ChainRange point]
reqs (Nat n -> Nat ('S n)
forall (m :: N) (n :: N). (m ~ 'S n) => Nat n -> Nat m
Succ Nat n
n))
                                  (\[block]
bs -> [Either (ChainRange point) [block]]
-> [ChainRange point]
-> Nat n
-> BlockFetchSender
     n [block] block point m [Either (ChainRange point) [block]]
forall (n :: N).
[Either (ChainRange point) [block]]
-> [ChainRange point]
-> Nat n
-> BlockFetchSender
     n [block] block point m [Either (ChainRange point) [block]]
go ([block] -> Either (ChainRange point) [block]
forall a b. b -> Either a b
Right [block]
bs Either (ChainRange point) [block]
-> [Either (ChainRange point) [block]]
-> [Either (ChainRange point) [block]]
forall a. a -> [a] -> [a]
: [Either (ChainRange point) [block]]
acc) (ChainRange point
req ChainRange point -> [ChainRange point] -> [ChainRange point]
forall a. a -> [a] -> [a]
: [ChainRange point]
reqs) Nat n
n)
  go [Either (ChainRange point) [block]]
acc (ChainRange point
req : [ChainRange point]
reqs) Nat n
Zero     = [Either (ChainRange point) [block]]
-> ChainRange point
-> [ChainRange point]
-> Nat n
-> BlockFetchSender
     n [block] block point m [Either (ChainRange point) [block]]
forall (n :: N).
[Either (ChainRange point) [block]]
-> ChainRange point
-> [ChainRange point]
-> Nat n
-> BlockFetchSender
     n [block] block point m [Either (ChainRange point) [block]]
requestMore [Either (ChainRange point) [block]]
acc ChainRange point
req [ChainRange point]
reqs Nat n
forall (n :: N). ('Z ~ n) => Nat n
Zero
  go [Either (ChainRange point) [block]]
acc []           Nat n
Zero     = [Either (ChainRange point) [block]]
-> BlockFetchSender
     'Z [block] block point m [Either (ChainRange point) [block]]
forall a c block point (m :: * -> *).
a -> BlockFetchSender 'Z c block point m a
SendMsgDonePipelined [Either (ChainRange point) [block]]
acc

  requestMore :: [Either (ChainRange point) [block]]
              -> ChainRange point -> [ChainRange point]
              -> Nat n
              -> BlockFetchSender n [block] block point m
                                  [Either (ChainRange point) [block]]
  requestMore :: forall (n :: N).
[Either (ChainRange point) [block]]
-> ChainRange point
-> [ChainRange point]
-> Nat n
-> BlockFetchSender
     n [block] block point m [Either (ChainRange point) [block]]
requestMore [Either (ChainRange point) [block]]
acc ChainRange point
req [ChainRange point]
reqs Nat n
n = ChainRange point
-> [block]
-> (Maybe block -> [block] -> m [block])
-> BlockFetchSender
     ('S n) [block] block point m [Either (ChainRange point) [block]]
-> BlockFetchSender
     n [block] block point m [Either (ChainRange point) [block]]
forall point c block (m :: * -> *) (n :: N) a.
ChainRange point
-> c
-> (Maybe block -> c -> m c)
-> BlockFetchSender ('S n) c block point m a
-> BlockFetchSender n c block point m a
SendMsgRequestRangePipelined
                                ChainRange point
req
                                []
                                (\Maybe block
mBlock [block]
c -> case Maybe block
mBlock of
                                  Maybe block
Nothing -> [block] -> m [block]
forall a. a -> m a
forall (m :: * -> *) a. Monad m => a -> m a
return [block]
c
                                  Just block
b  -> [block] -> m [block]
forall a. a -> m a
forall (m :: * -> *) a. Monad m => a -> m a
return (block
b block -> [block] -> [block]
forall a. a -> [a] -> [a]
: [block]
c))
                                ([Either (ChainRange point) [block]]
-> [ChainRange point]
-> Nat ('S n)
-> BlockFetchSender
     ('S n) [block] block point m [Either (ChainRange point) [block]]
forall (n :: N).
[Either (ChainRange point) [block]]
-> [ChainRange point]
-> Nat n
-> BlockFetchSender
     n [block] block point m [Either (ChainRange point) [block]]
go (ChainRange point -> Either (ChainRange point) [block]
forall a b. a -> Either a b
Left ChainRange point
req Either (ChainRange point) [block]
-> [Either (ChainRange point) [block]]
-> [Either (ChainRange point) [block]]
forall a. a -> [a] -> [a]
: [Either (ChainRange point) [block]]
acc) [ChainRange point]
reqs (Nat n -> Nat ('S n)
forall (m :: N) (n :: N). (m ~ 'S n) => Nat n -> Nat m
Succ Nat n
n))

-- | A pipelined block-fetch client that sends eagerly up to some maximum limit
-- of outstanding requests. It is also always ready to collect any replies if
-- they are available.  This allows limited pipelining and correspondingly
-- limited choice to the environment (drivers).
--
-- It returns the interleaving of `ChainRange point` requests and list of
-- received block bodies in the order from newest to oldest (received block
-- bodies are also ordered in this way).
--
blockFetchClientPipelinedLimited
  :: forall block point m.
     Monad m
  => Int
  -> [ChainRange point]
  -> BlockFetchClientPipelined block point m [Either (ChainRange point) [block]]
blockFetchClientPipelinedLimited :: forall block point (m :: * -> *).
Monad m =>
Int
-> [ChainRange point]
-> BlockFetchClientPipelined
     block point m [Either (ChainRange point) [block]]
blockFetchClientPipelinedLimited Int
omax [ChainRange point]
ranges0 =
  BlockFetchSender
  'Z [block] block point m [Either (ChainRange point) [block]]
-> BlockFetchClientPipelined
     block point m [Either (ChainRange point) [block]]
forall c block point (m :: * -> *) a.
BlockFetchSender 'Z c block point m a
-> BlockFetchClientPipelined block point m a
BlockFetchClientPipelined ([Either (ChainRange point) [block]]
-> [ChainRange point]
-> Nat 'Z
-> BlockFetchSender
     'Z [block] block point m [Either (ChainRange point) [block]]
forall (n :: N).
[Either (ChainRange point) [block]]
-> [ChainRange point]
-> Nat n
-> BlockFetchSender
     n [block] block point m [Either (ChainRange point) [block]]
go [] [ChainRange point]
ranges0 Nat 'Z
forall (n :: N). ('Z ~ n) => Nat n
Zero)
 where
  go :: [Either (ChainRange point) [block]]
     -> [ChainRange point]
     -> Nat n
     -> BlockFetchSender n [block] block point m
                         [Either (ChainRange point) [block]]
  go :: forall (n :: N).
[Either (ChainRange point) [block]]
-> [ChainRange point]
-> Nat n
-> BlockFetchSender
     n [block] block point m [Either (ChainRange point) [block]]
go [Either (ChainRange point) [block]]
acc []              (Succ Nat n
n) = Maybe
  (BlockFetchSender
     ('S n) [block] block point m [Either (ChainRange point) [block]])
-> ([block]
    -> BlockFetchSender
         n [block] block point m [Either (ChainRange point) [block]])
-> BlockFetchSender
     ('S n) [block] block point m [Either (ChainRange point) [block]]
forall (n1 :: N) c block point (m :: * -> *) a.
Maybe (BlockFetchSender ('S n1) c block point m a)
-> (c -> BlockFetchSender n1 c block point m a)
-> BlockFetchSender ('S n1) c block point m a
CollectBlocksPipelined
                                      Maybe
  (BlockFetchSender
     ('S n) [block] block point m [Either (ChainRange point) [block]])
forall a. Maybe a
Nothing
                                      (\[block]
bs -> [Either (ChainRange point) [block]]
-> [ChainRange point]
-> Nat n
-> BlockFetchSender
     n [block] block point m [Either (ChainRange point) [block]]
forall (n :: N).
[Either (ChainRange point) [block]]
-> [ChainRange point]
-> Nat n
-> BlockFetchSender
     n [block] block point m [Either (ChainRange point) [block]]
go ([block] -> Either (ChainRange point) [block]
forall a b. b -> Either a b
Right [block]
bs Either (ChainRange point) [block]
-> [Either (ChainRange point) [block]]
-> [Either (ChainRange point) [block]]
forall a. a -> [a] -> [a]
: [Either (ChainRange point) [block]]
acc) [] Nat n
n)

  go [Either (ChainRange point) [block]]
acc rs :: [ChainRange point]
rs@(ChainRange point
req : [ChainRange point]
reqs) (Succ Nat n
n) = Maybe
  (BlockFetchSender
     ('S n) [block] block point m [Either (ChainRange point) [block]])
-> ([block]
    -> BlockFetchSender
         n [block] block point m [Either (ChainRange point) [block]])
-> BlockFetchSender
     ('S n) [block] block point m [Either (ChainRange point) [block]]
forall (n1 :: N) c block point (m :: * -> *) a.
Maybe (BlockFetchSender ('S n1) c block point m a)
-> (c -> BlockFetchSender n1 c block point m a)
-> BlockFetchSender ('S n1) c block point m a
CollectBlocksPipelined
                                      (if Nat ('S n) -> Int
forall (n :: N). Nat n -> Int
int (Nat n -> Nat ('S n)
forall (m :: N) (n :: N). (m ~ 'S n) => Nat n -> Nat m
Succ Nat n
n) Int -> Int -> Bool
forall a. Ord a => a -> a -> Bool
< Int
omax
                                        then BlockFetchSender
  ('S n) [block] block point m [Either (ChainRange point) [block]]
-> Maybe
     (BlockFetchSender
        ('S n) [block] block point m [Either (ChainRange point) [block]])
forall a. a -> Maybe a
Just (BlockFetchSender
   ('S n) [block] block point m [Either (ChainRange point) [block]]
 -> Maybe
      (BlockFetchSender
         ('S n) [block] block point m [Either (ChainRange point) [block]]))
-> BlockFetchSender
     ('S n) [block] block point m [Either (ChainRange point) [block]]
-> Maybe
     (BlockFetchSender
        ('S n) [block] block point m [Either (ChainRange point) [block]])
forall a b. (a -> b) -> a -> b
$ [Either (ChainRange point) [block]]
-> ChainRange point
-> [ChainRange point]
-> Nat ('S n)
-> BlockFetchSender
     ('S n) [block] block point m [Either (ChainRange point) [block]]
forall (n :: N).
[Either (ChainRange point) [block]]
-> ChainRange point
-> [ChainRange point]
-> Nat n
-> BlockFetchSender
     n [block] block point m [Either (ChainRange point) [block]]
requestMore [Either (ChainRange point) [block]]
acc ChainRange point
req [ChainRange point]
reqs (Nat n -> Nat ('S n)
forall (m :: N) (n :: N). (m ~ 'S n) => Nat n -> Nat m
Succ Nat n
n)
                                        else Maybe
  (BlockFetchSender
     ('S n) [block] block point m [Either (ChainRange point) [block]])
forall a. Maybe a
Nothing)
                                      (\[block]
bs -> [Either (ChainRange point) [block]]
-> [ChainRange point]
-> Nat n
-> BlockFetchSender
     n [block] block point m [Either (ChainRange point) [block]]
forall (n :: N).
[Either (ChainRange point) [block]]
-> [ChainRange point]
-> Nat n
-> BlockFetchSender
     n [block] block point m [Either (ChainRange point) [block]]
go ([block] -> Either (ChainRange point) [block]
forall a b. b -> Either a b
Right [block]
bs Either (ChainRange point) [block]
-> [Either (ChainRange point) [block]]
-> [Either (ChainRange point) [block]]
forall a. a -> [a] -> [a]
: [Either (ChainRange point) [block]]
acc) [ChainRange point]
rs Nat n
n)

  go [Either (ChainRange point) [block]]
acc (ChainRange point
req : [ChainRange point]
reqs) Nat n
Zero        = [Either (ChainRange point) [block]]
-> ChainRange point
-> [ChainRange point]
-> Nat n
-> BlockFetchSender
     n [block] block point m [Either (ChainRange point) [block]]
forall (n :: N).
[Either (ChainRange point) [block]]
-> ChainRange point
-> [ChainRange point]
-> Nat n
-> BlockFetchSender
     n [block] block point m [Either (ChainRange point) [block]]
requestMore [Either (ChainRange point) [block]]
acc ChainRange point
req [ChainRange point]
reqs Nat n
forall (n :: N). ('Z ~ n) => Nat n
Zero

  go [Either (ChainRange point) [block]]
acc []           Nat n
Zero        = [Either (ChainRange point) [block]]
-> BlockFetchSender
     'Z [block] block point m [Either (ChainRange point) [block]]
forall a c block point (m :: * -> *).
a -> BlockFetchSender 'Z c block point m a
SendMsgDonePipelined [Either (ChainRange point) [block]]
acc

  requestMore :: [Either (ChainRange point) [block]]
              -> ChainRange point -> [ChainRange point]
              -> Nat n
              -> BlockFetchSender n [block] block point m
                                  [Either (ChainRange point) [block]]
  requestMore :: forall (n :: N).
[Either (ChainRange point) [block]]
-> ChainRange point
-> [ChainRange point]
-> Nat n
-> BlockFetchSender
     n [block] block point m [Either (ChainRange point) [block]]
requestMore [Either (ChainRange point) [block]]
acc ChainRange point
req [ChainRange point]
reqs Nat n
n = ChainRange point
-> [block]
-> (Maybe block -> [block] -> m [block])
-> BlockFetchSender
     ('S n) [block] block point m [Either (ChainRange point) [block]]
-> BlockFetchSender
     n [block] block point m [Either (ChainRange point) [block]]
forall point c block (m :: * -> *) (n :: N) a.
ChainRange point
-> c
-> (Maybe block -> c -> m c)
-> BlockFetchSender ('S n) c block point m a
-> BlockFetchSender n c block point m a
SendMsgRequestRangePipelined
                                ChainRange point
req
                                []
                                (\Maybe block
mBlock [block]
c -> case Maybe block
mBlock of
                                  Maybe block
Nothing -> [block] -> m [block]
forall a. a -> m a
forall (m :: * -> *) a. Monad m => a -> m a
return [block]
c
                                  Just block
b  -> [block] -> m [block]
forall a. a -> m a
forall (m :: * -> *) a. Monad m => a -> m a
return (block
b block -> [block] -> [block]
forall a. a -> [a] -> [a]
: [block]
c))
                                ([Either (ChainRange point) [block]]
-> [ChainRange point]
-> Nat ('S n)
-> BlockFetchSender
     ('S n) [block] block point m [Either (ChainRange point) [block]]
forall (n :: N).
[Either (ChainRange point) [block]]
-> [ChainRange point]
-> Nat n
-> BlockFetchSender
     n [block] block point m [Either (ChainRange point) [block]]
go (ChainRange point -> Either (ChainRange point) [block]
forall a b. a -> Either a b
Left ChainRange point
req Either (ChainRange point) [block]
-> [Either (ChainRange point) [block]]
-> [Either (ChainRange point) [block]]
forall a. a -> [a] -> [a]
: [Either (ChainRange point) [block]]
acc) [ChainRange point]
reqs (Nat n -> Nat ('S n)
forall (m :: N) (n :: N). (m ~ 'S n) => Nat n -> Nat m
Succ Nat n
n))

  -- this isn't supposed to be efficient, it's just for the example
  int :: Nat n -> Int
  int :: forall (n :: N). Nat n -> Int
int Nat n
Zero     = Int
0
  int (Succ Nat n
n) = Int -> Int
forall a. Enum a => a -> a
succ (Nat n -> Int
forall (n :: N). Nat n -> Int
int Nat n
n)


--
-- Server side of the block-fetch protocol
--

-- | A recursive control data type which encodes a succession of @'ChainRange'
-- block@ requests.
--
newtype RangeRequests m block = RangeRequests {
    forall (m :: * -> *) block.
RangeRequests m block
-> ChainRange (Point block)
-> Producer block m (RangeRequests m block)
runRangeRequest :: ChainRange (Point block)
                    -> Pipes.Producer block m (RangeRequests m block)
  }

-- | A constant @'RangeRequests'@ object.
--
constantRangeRequests
  :: Monad m
  => (ChainRange (Point block) -> Pipes.Producer block m ())
  -> RangeRequests m block
constantRangeRequests :: forall (m :: * -> *) block.
Monad m =>
(ChainRange (Point block) -> Producer block m ())
-> RangeRequests m block
constantRangeRequests ChainRange (Point block) -> Producer block m ()
f = (ChainRange (Point block)
 -> Producer block m (RangeRequests m block))
-> RangeRequests m block
forall (m :: * -> *) block.
(ChainRange (Point block)
 -> Producer block m (RangeRequests m block))
-> RangeRequests m block
RangeRequests (\ChainRange (Point block)
range -> ChainRange (Point block) -> Producer block m ()
f ChainRange (Point block)
range Producer block m ()
-> RangeRequests m block
-> Producer block m (RangeRequests m block)
forall (f :: * -> *) a b. Functor f => f a -> b -> f b
$> (ChainRange (Point block) -> Producer block m ())
-> RangeRequests m block
forall (m :: * -> *) block.
Monad m =>
(ChainRange (Point block) -> Producer block m ())
-> RangeRequests m block
constantRangeRequests ChainRange (Point block) -> Producer block m ()
f)

-- | @RangeRequests@ which requests blocks from a chain.  Use @'Functor'@
-- instance of @'RangeRequests'@ to change map @'block'@.
--
rangeRequestsFromChain
  :: ( Monad m
     , HasHeader block
     )
  => Chain block
  -> RangeRequests m block
rangeRequestsFromChain :: forall (m :: * -> *) block.
(Monad m, HasHeader block) =>
Chain block -> RangeRequests m block
rangeRequestsFromChain Chain block
chain = (ChainRange (Point block) -> Producer block m ())
-> RangeRequests m block
forall (m :: * -> *) block.
Monad m =>
(ChainRange (Point block) -> Producer block m ())
-> RangeRequests m block
constantRangeRequests ((ChainRange (Point block) -> Producer block m ())
 -> RangeRequests m block)
-> (ChainRange (Point block) -> Producer block m ())
-> RangeRequests m block
forall a b. (a -> b) -> a -> b
$ \(ChainRange Point block
from Point block
to) ->
  [block] -> Producer block m ()
forall (m :: * -> *) (f :: * -> *) a x' x.
(Functor m, Foldable f) =>
f a -> Proxy x' x () a m ()
Pipes.each ([block] -> Producer block m ()) -> [block] -> Producer block m ()
forall a b. (a -> b) -> a -> b
$ [block] -> Maybe [block] -> [block]
forall a. a -> Maybe a -> a
fromMaybe [] (Maybe [block] -> [block]) -> Maybe [block] -> [block]
forall a b. (a -> b) -> a -> b
$ Chain block -> Point block -> Point block -> Maybe [block]
forall block.
HasHeader block =>
Chain block -> Point block -> Point block -> Maybe [block]
Chain.selectBlockRange Chain block
chain Point block
from Point block
to

-- | Construct a @'BlockFetchServer'@ from a @'RangeRequest'@ control data type.
--
blockFetchServer
  :: forall m block.
     Monad m
  => RangeRequests m block
  -> BlockFetchServer block (Point block) m ()
blockFetchServer :: forall (m :: * -> *) block.
Monad m =>
RangeRequests m block -> BlockFetchServer block (Point block) m ()
blockFetchServer (RangeRequests ChainRange (Point block)
-> Producer block m (RangeRequests m block)
rangeRequest) = (ChainRange (Point block)
 -> m (BlockFetchBlockSender block (Point block) m ()))
-> () -> BlockFetchServer block (Point block) m ()
forall point (m :: * -> *) block a.
(ChainRange point -> m (BlockFetchBlockSender block point m a))
-> a -> BlockFetchServer block point m a
BlockFetchServer ChainRange (Point block)
-> m (BlockFetchBlockSender block (Point block) m ())
handleRequest ()
 where
  handleRequest
    :: ChainRange (Point block)
    -> m (BlockFetchBlockSender block (Point block) m ())
  handleRequest :: ChainRange (Point block)
-> m (BlockFetchBlockSender block (Point block) m ())
handleRequest ChainRange (Point block)
range = do
    stream <- Producer block m (RangeRequests m block)
-> m (Either
        (RangeRequests m block)
        (block, Producer block m (RangeRequests m block)))
forall (m :: * -> *) a r.
Monad m =>
Producer a m r -> m (Either r (a, Producer a m r))
Pipes.next (Producer block m (RangeRequests m block)
 -> m (Either
         (RangeRequests m block)
         (block, Producer block m (RangeRequests m block))))
-> Producer block m (RangeRequests m block)
-> m (Either
        (RangeRequests m block)
        (block, Producer block m (RangeRequests m block)))
forall a b. (a -> b) -> a -> b
$ ChainRange (Point block)
-> Producer block m (RangeRequests m block)
rangeRequest ChainRange (Point block)
range
    case stream of
      Left RangeRequests m block
rangeRequest'     ->
        BlockFetchBlockSender block (Point block) m ()
-> m (BlockFetchBlockSender block (Point block) m ())
forall a. a -> m a
forall (m :: * -> *) a. Monad m => a -> m a
return (BlockFetchBlockSender block (Point block) m ()
 -> m (BlockFetchBlockSender block (Point block) m ()))
-> BlockFetchBlockSender block (Point block) m ()
-> m (BlockFetchBlockSender block (Point block) m ())
forall a b. (a -> b) -> a -> b
$ m (BlockFetchServer block (Point block) m ())
-> BlockFetchBlockSender block (Point block) m ()
forall (m :: * -> *) block point a.
m (BlockFetchServer block point m a)
-> BlockFetchBlockSender block point m a
SendMsgNoBlocks (BlockFetchServer block (Point block) m ()
-> m (BlockFetchServer block (Point block) m ())
forall a. a -> m a
forall (m :: * -> *) a. Monad m => a -> m a
return (BlockFetchServer block (Point block) m ()
 -> m (BlockFetchServer block (Point block) m ()))
-> BlockFetchServer block (Point block) m ()
-> m (BlockFetchServer block (Point block) m ())
forall a b. (a -> b) -> a -> b
$ RangeRequests m block -> BlockFetchServer block (Point block) m ()
forall (m :: * -> *) block.
Monad m =>
RangeRequests m block -> BlockFetchServer block (Point block) m ()
blockFetchServer RangeRequests m block
rangeRequest')
      Right (block
block', Producer block m (RangeRequests m block)
stream') ->
        BlockFetchBlockSender block (Point block) m ()
-> m (BlockFetchBlockSender block (Point block) m ())
forall a. a -> m a
forall (m :: * -> *) a. Monad m => a -> m a
return (BlockFetchBlockSender block (Point block) m ()
 -> m (BlockFetchBlockSender block (Point block) m ()))
-> BlockFetchBlockSender block (Point block) m ()
-> m (BlockFetchBlockSender block (Point block) m ())
forall a b. (a -> b) -> a -> b
$ m (BlockFetchSendBlocks block (Point block) m ())
-> BlockFetchBlockSender block (Point block) m ()
forall (m :: * -> *) block point a.
m (BlockFetchSendBlocks block point m a)
-> BlockFetchBlockSender block point m a
SendMsgStartBatch (block
-> Producer block m (RangeRequests m block)
-> m (BlockFetchSendBlocks block (Point block) m ())
sendStream block
block' Producer block m (RangeRequests m block)
stream')

  sendStream
    :: block
    -> Pipes.Producer block m (RangeRequests m block)
    -> m (BlockFetchSendBlocks block (Point block) m ())
  sendStream :: block
-> Producer block m (RangeRequests m block)
-> m (BlockFetchSendBlocks block (Point block) m ())
sendStream block
block Producer block m (RangeRequests m block)
stream =
    BlockFetchSendBlocks block (Point block) m ()
-> m (BlockFetchSendBlocks block (Point block) m ())
forall a. a -> m a
forall (m :: * -> *) a. Monad m => a -> m a
return (BlockFetchSendBlocks block (Point block) m ()
 -> m (BlockFetchSendBlocks block (Point block) m ()))
-> BlockFetchSendBlocks block (Point block) m ()
-> m (BlockFetchSendBlocks block (Point block) m ())
forall a b. (a -> b) -> a -> b
$ block
-> m (BlockFetchSendBlocks block (Point block) m ())
-> BlockFetchSendBlocks block (Point block) m ()
forall block (m :: * -> *) point a.
block
-> m (BlockFetchSendBlocks block point m a)
-> BlockFetchSendBlocks block point m a
SendMsgBlock block
block (m (BlockFetchSendBlocks block (Point block) m ())
 -> BlockFetchSendBlocks block (Point block) m ())
-> m (BlockFetchSendBlocks block (Point block) m ())
-> BlockFetchSendBlocks block (Point block) m ()
forall a b. (a -> b) -> a -> b
$ do
      next <- Producer block m (RangeRequests m block)
-> m (Either
        (RangeRequests m block)
        (block, Producer block m (RangeRequests m block)))
forall (m :: * -> *) a r.
Monad m =>
Producer a m r -> m (Either r (a, Producer a m r))
Pipes.next Producer block m (RangeRequests m block)
stream
      case next of
        Left RangeRequests m block
rangeRequest' -> BlockFetchSendBlocks block (Point block) m ()
-> m (BlockFetchSendBlocks block (Point block) m ())
forall a. a -> m a
forall (m :: * -> *) a. Monad m => a -> m a
return (BlockFetchSendBlocks block (Point block) m ()
 -> m (BlockFetchSendBlocks block (Point block) m ()))
-> BlockFetchSendBlocks block (Point block) m ()
-> m (BlockFetchSendBlocks block (Point block) m ())
forall a b. (a -> b) -> a -> b
$ m (BlockFetchServer block (Point block) m ())
-> BlockFetchSendBlocks block (Point block) m ()
forall (m :: * -> *) block point a.
m (BlockFetchServer block point m a)
-> BlockFetchSendBlocks block point m a
SendMsgBatchDone (BlockFetchServer block (Point block) m ()
-> m (BlockFetchServer block (Point block) m ())
forall a. a -> m a
forall (m :: * -> *) a. Monad m => a -> m a
return (BlockFetchServer block (Point block) m ()
 -> m (BlockFetchServer block (Point block) m ()))
-> BlockFetchServer block (Point block) m ()
-> m (BlockFetchServer block (Point block) m ())
forall a b. (a -> b) -> a -> b
$ RangeRequests m block -> BlockFetchServer block (Point block) m ()
forall (m :: * -> *) block.
Monad m =>
RangeRequests m block -> BlockFetchServer block (Point block) m ()
blockFetchServer RangeRequests m block
rangeRequest')
        Right (block
block', Producer block m (RangeRequests m block)
stream') -> block
-> Producer block m (RangeRequests m block)
-> m (BlockFetchSendBlocks block (Point block) m ())
sendStream block
block' Producer block m (RangeRequests m block)
stream'