{-# LANGUAGE DataKinds #-}
{-# LANGUAGE GADTs #-}
{-# LANGUAGE KindSignatures #-}
{-# LANGUAGE NamedFieldPuns #-}
{-# LANGUAGE PolyKinds #-}
{-# LANGUAGE ScopedTypeVariables #-}
{-# LANGUAGE StandaloneDeriving #-}
module Ouroboros.Network.Protocol.BlockFetch.Examples where
import Control.Monad (unless)
import Data.Functor (($>))
import Data.Maybe (fromMaybe)
import Pipes qualified
import Control.Concurrent.Class.MonadSTM.Strict
import Network.TypedProtocol.Peer
import Ouroboros.Network.Mock.Chain (Chain, HasHeader, Point)
import Ouroboros.Network.Mock.Chain qualified as Chain
import Ouroboros.Network.Protocol.BlockFetch.Client
import Ouroboros.Network.Protocol.BlockFetch.Server
import Ouroboros.Network.Protocol.BlockFetch.Type (ChainRange (..))
constantBlockFetchReceiver
:: Functor m
=> (block -> m ())
-> m ()
-> BlockFetchReceiver block m
constantBlockFetchReceiver :: forall (m :: * -> *) block.
Functor m =>
(block -> m ()) -> m () -> BlockFetchReceiver block m
constantBlockFetchReceiver block -> m ()
onBlock m ()
handleBatchDone =
BlockFetchReceiver {
handleBlock :: block -> m (BlockFetchReceiver block m)
handleBlock = \block
block -> block -> m ()
onBlock block
block m ()
-> BlockFetchReceiver block m -> m (BlockFetchReceiver block m)
forall (f :: * -> *) a b. Functor f => f a -> b -> f b
$>
(block -> m ()) -> m () -> BlockFetchReceiver block m
forall (m :: * -> *) block.
Functor m =>
(block -> m ()) -> m () -> BlockFetchReceiver block m
constantBlockFetchReceiver block -> m ()
onBlock m ()
handleBatchDone,
m ()
handleBatchDone :: m ()
handleBatchDone :: m ()
handleBatchDone
}
blockFetchClientMap
:: forall block point m.
MonadSTM m
=> [ChainRange point]
-> BlockFetchClient block point m [block]
blockFetchClientMap :: forall block point (m :: * -> *).
MonadSTM m =>
[ChainRange point] -> BlockFetchClient block point m [block]
blockFetchClientMap [ChainRange point]
ranges = m (BlockFetchRequest block point m [block])
-> BlockFetchClient block point m [block]
forall block point (m :: * -> *) a.
m (BlockFetchRequest block point m a)
-> BlockFetchClient block point m a
BlockFetchClient (m (BlockFetchRequest block point m [block])
-> BlockFetchClient block point m [block])
-> m (BlockFetchRequest block point m [block])
-> BlockFetchClient block point m [block]
forall a b. (a -> b) -> a -> b
$ do
var <- [block] -> m (StrictTVar m [block])
forall (m :: * -> *) a. MonadSTM m => a -> m (StrictTVar m a)
newTVarIO []
donevar <- newTVarIO (length ranges)
let blockFetchResponse = BlockFetchResponse {
handleStartBatch :: m (BlockFetchReceiver block m)
handleStartBatch =
BlockFetchReceiver block m -> m (BlockFetchReceiver block m)
forall a. a -> m a
forall (m :: * -> *) a. Monad m => a -> m a
return (BlockFetchReceiver block m -> m (BlockFetchReceiver block m))
-> BlockFetchReceiver block m -> m (BlockFetchReceiver block m)
forall a b. (a -> b) -> a -> b
$ (block -> m ()) -> m () -> BlockFetchReceiver block m
forall (m :: * -> *) block.
Functor m =>
(block -> m ()) -> m () -> BlockFetchReceiver block m
constantBlockFetchReceiver
(\block
block -> STM m () -> m ()
forall a. HasCallStack => STM m a -> m a
forall (m :: * -> *) a.
(MonadSTM m, HasCallStack) =>
STM m a -> m a
atomically (StrictTVar m [block] -> ([block] -> [block]) -> STM m ()
forall (m :: * -> *) a.
MonadSTM m =>
StrictTVar m a -> (a -> a) -> STM m ()
modifyTVar StrictTVar m [block]
var (block
block block -> [block] -> [block]
forall a. a -> [a] -> [a]
:)))
(STM m () -> m ()
forall a. HasCallStack => STM m a -> m a
forall (m :: * -> *) a.
(MonadSTM m, HasCallStack) =>
STM m a -> m a
atomically (StrictTVar m Int -> (Int -> Int) -> STM m ()
forall (m :: * -> *) a.
MonadSTM m =>
StrictTVar m a -> (a -> a) -> STM m ()
modifyTVar StrictTVar m Int
donevar Int -> Int
forall a. Enum a => a -> a
pred)),
handleNoBlocks :: m ()
handleNoBlocks = do
STM m () -> m ()
forall a. HasCallStack => STM m a -> m a
forall (m :: * -> *) a.
(MonadSTM m, HasCallStack) =>
STM m a -> m a
atomically (STM m () -> m ()) -> STM m () -> m ()
forall a b. (a -> b) -> a -> b
$ StrictTVar m Int -> (Int -> Int) -> STM m ()
forall (m :: * -> *) a.
MonadSTM m =>
StrictTVar m a -> (a -> a) -> STM m ()
modifyTVar StrictTVar m Int
donevar Int -> Int
forall a. Enum a => a -> a
pred
() -> m ()
forall a. a -> m a
forall (m :: * -> *) a. Monad m => a -> m a
return ()
}
goBlockFetch donevar var ranges blockFetchResponse
where
goBlockFetch
:: StrictTVar m Int
-> StrictTVar m [block]
-> [ChainRange point]
-> BlockFetchResponse block m [block]
-> m (BlockFetchRequest block point m [block])
goBlockFetch :: StrictTVar m Int
-> StrictTVar m [block]
-> [ChainRange point]
-> BlockFetchResponse block m [block]
-> m (BlockFetchRequest block point m [block])
goBlockFetch StrictTVar m Int
donevar StrictTVar m [block]
var [] BlockFetchResponse block m [block]
_response = do
STM m () -> m ()
forall a. HasCallStack => STM m a -> m a
forall (m :: * -> *) a.
(MonadSTM m, HasCallStack) =>
STM m a -> m a
atomically (STM m () -> m ()) -> STM m () -> m ()
forall a b. (a -> b) -> a -> b
$ do
x <- StrictTVar m Int -> STM m Int
forall (m :: * -> *) a. MonadSTM m => StrictTVar m a -> STM m a
readTVar StrictTVar m Int
donevar
unless (x <= 0) retry
[block] -> BlockFetchRequest block point m [block]
forall a block point (m :: * -> *).
a -> BlockFetchRequest block point m a
SendMsgClientDone ([block] -> BlockFetchRequest block point m [block])
-> m [block] -> m (BlockFetchRequest block point m [block])
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> StrictTVar m [block] -> m [block]
forall (m :: * -> *) a. MonadSTM m => StrictTVar m a -> m a
readTVarIO StrictTVar m [block]
var
goBlockFetch StrictTVar m Int
donevar StrictTVar m [block]
var (ChainRange point
r : [ChainRange point]
rs) BlockFetchResponse block m [block]
response =
BlockFetchRequest block point m [block]
-> m (BlockFetchRequest block point m [block])
forall a. a -> m a
forall (m :: * -> *) a. Monad m => a -> m a
return (BlockFetchRequest block point m [block]
-> m (BlockFetchRequest block point m [block]))
-> BlockFetchRequest block point m [block]
-> m (BlockFetchRequest block point m [block])
forall a b. (a -> b) -> a -> b
$ ChainRange point
-> BlockFetchResponse block m [block]
-> BlockFetchClient block point m [block]
-> BlockFetchRequest block point m [block]
forall point block (m :: * -> *) a.
ChainRange point
-> BlockFetchResponse block m a
-> BlockFetchClient block point m a
-> BlockFetchRequest block point m a
SendMsgRequestRange ChainRange point
r BlockFetchResponse block m [block]
response (m (BlockFetchRequest block point m [block])
-> BlockFetchClient block point m [block]
forall block point (m :: * -> *) a.
m (BlockFetchRequest block point m a)
-> BlockFetchClient block point m a
BlockFetchClient (m (BlockFetchRequest block point m [block])
-> BlockFetchClient block point m [block])
-> m (BlockFetchRequest block point m [block])
-> BlockFetchClient block point m [block]
forall a b. (a -> b) -> a -> b
$ StrictTVar m Int
-> StrictTVar m [block]
-> [ChainRange point]
-> BlockFetchResponse block m [block]
-> m (BlockFetchRequest block point m [block])
goBlockFetch StrictTVar m Int
donevar StrictTVar m [block]
var [ChainRange point]
rs BlockFetchResponse block m [block]
response)
blockFetchClientPipelinedMax
:: forall block point m.
Monad m
=> [ChainRange point]
-> BlockFetchClientPipelined block point m [Either (ChainRange point) [block]]
blockFetchClientPipelinedMax :: forall block point (m :: * -> *).
Monad m =>
[ChainRange point]
-> BlockFetchClientPipelined
block point m [Either (ChainRange point) [block]]
blockFetchClientPipelinedMax [ChainRange point]
ranges0 =
BlockFetchSender
'Z [block] block point m [Either (ChainRange point) [block]]
-> BlockFetchClientPipelined
block point m [Either (ChainRange point) [block]]
forall c block point (m :: * -> *) a.
BlockFetchSender 'Z c block point m a
-> BlockFetchClientPipelined block point m a
BlockFetchClientPipelined ([Either (ChainRange point) [block]]
-> [ChainRange point]
-> Nat 'Z
-> BlockFetchSender
'Z [block] block point m [Either (ChainRange point) [block]]
forall (o :: N).
[Either (ChainRange point) [block]]
-> [ChainRange point]
-> Nat o
-> BlockFetchSender
o [block] block point m [Either (ChainRange point) [block]]
go [] [ChainRange point]
ranges0 Nat 'Z
forall (n :: N). ('Z ~ n) => Nat n
Zero)
where
go :: [Either (ChainRange point) [block]] -> [ChainRange point] -> Nat o
-> BlockFetchSender o [block] block point m [Either (ChainRange point) [block]]
go :: forall (o :: N).
[Either (ChainRange point) [block]]
-> [ChainRange point]
-> Nat o
-> BlockFetchSender
o [block] block point m [Either (ChainRange point) [block]]
go [Either (ChainRange point) [block]]
acc (ChainRange point
req : [ChainRange point]
reqs) Nat o
o = ChainRange point
-> [block]
-> (Maybe block -> [block] -> m [block])
-> BlockFetchSender
('S o) [block] block point m [Either (ChainRange point) [block]]
-> BlockFetchSender
o [block] block point m [Either (ChainRange point) [block]]
forall point c block (m :: * -> *) (n :: N) a.
ChainRange point
-> c
-> (Maybe block -> c -> m c)
-> BlockFetchSender ('S n) c block point m a
-> BlockFetchSender n c block point m a
SendMsgRequestRangePipelined
ChainRange point
req
[]
(\Maybe block
mBlock [block]
c -> case Maybe block
mBlock of
Maybe block
Nothing -> [block] -> m [block]
forall a. a -> m a
forall (m :: * -> *) a. Monad m => a -> m a
return [block]
c
Just block
b -> [block] -> m [block]
forall a. a -> m a
forall (m :: * -> *) a. Monad m => a -> m a
return (block
b block -> [block] -> [block]
forall a. a -> [a] -> [a]
: [block]
c))
([Either (ChainRange point) [block]]
-> [ChainRange point]
-> Nat ('S o)
-> BlockFetchSender
('S o) [block] block point m [Either (ChainRange point) [block]]
forall (o :: N).
[Either (ChainRange point) [block]]
-> [ChainRange point]
-> Nat o
-> BlockFetchSender
o [block] block point m [Either (ChainRange point) [block]]
go (ChainRange point -> Either (ChainRange point) [block]
forall a b. a -> Either a b
Left ChainRange point
req Either (ChainRange point) [block]
-> [Either (ChainRange point) [block]]
-> [Either (ChainRange point) [block]]
forall a. a -> [a] -> [a]
: [Either (ChainRange point) [block]]
acc) [ChainRange point]
reqs (Nat o -> Nat ('S o)
forall (m :: N) (n :: N). (m ~ 'S n) => Nat n -> Nat m
Succ Nat o
o))
go [Either (ChainRange point) [block]]
acc [] (Succ Nat n
o) = Maybe
(BlockFetchSender
('S n) [block] block point m [Either (ChainRange point) [block]])
-> ([block]
-> BlockFetchSender
n [block] block point m [Either (ChainRange point) [block]])
-> BlockFetchSender
('S n) [block] block point m [Either (ChainRange point) [block]]
forall (n1 :: N) c block point (m :: * -> *) a.
Maybe (BlockFetchSender ('S n1) c block point m a)
-> (c -> BlockFetchSender n1 c block point m a)
-> BlockFetchSender ('S n1) c block point m a
CollectBlocksPipelined
Maybe
(BlockFetchSender
('S n) [block] block point m [Either (ChainRange point) [block]])
forall a. Maybe a
Nothing
(\[block]
bs -> [Either (ChainRange point) [block]]
-> [ChainRange point]
-> Nat n
-> BlockFetchSender
n [block] block point m [Either (ChainRange point) [block]]
forall (o :: N).
[Either (ChainRange point) [block]]
-> [ChainRange point]
-> Nat o
-> BlockFetchSender
o [block] block point m [Either (ChainRange point) [block]]
go ([block] -> Either (ChainRange point) [block]
forall a b. b -> Either a b
Right [block]
bs Either (ChainRange point) [block]
-> [Either (ChainRange point) [block]]
-> [Either (ChainRange point) [block]]
forall a. a -> [a] -> [a]
: [Either (ChainRange point) [block]]
acc) [] Nat n
o)
go [Either (ChainRange point) [block]]
acc [] Nat o
Zero = [Either (ChainRange point) [block]]
-> BlockFetchSender
'Z [block] block point m [Either (ChainRange point) [block]]
forall a c block point (m :: * -> *).
a -> BlockFetchSender 'Z c block point m a
SendMsgDonePipelined [Either (ChainRange point) [block]]
acc
blockFetchClientPipelinedMin
:: forall block point m.
Monad m
=> [ChainRange point]
-> BlockFetchClientPipelined block point m [Either (ChainRange point) [block]]
blockFetchClientPipelinedMin :: forall block point (m :: * -> *).
Monad m =>
[ChainRange point]
-> BlockFetchClientPipelined
block point m [Either (ChainRange point) [block]]
blockFetchClientPipelinedMin [ChainRange point]
ranges0 =
BlockFetchSender
'Z [block] block point m [Either (ChainRange point) [block]]
-> BlockFetchClientPipelined
block point m [Either (ChainRange point) [block]]
forall c block point (m :: * -> *) a.
BlockFetchSender 'Z c block point m a
-> BlockFetchClientPipelined block point m a
BlockFetchClientPipelined ([Either (ChainRange point) [block]]
-> [ChainRange point]
-> Nat 'Z
-> BlockFetchSender
'Z [block] block point m [Either (ChainRange point) [block]]
forall (n :: N).
[Either (ChainRange point) [block]]
-> [ChainRange point]
-> Nat n
-> BlockFetchSender
n [block] block point m [Either (ChainRange point) [block]]
go [] [ChainRange point]
ranges0 Nat 'Z
forall (n :: N). ('Z ~ n) => Nat n
Zero)
where
go :: [Either (ChainRange point) [block]]
-> [ChainRange point]
-> Nat n
-> BlockFetchSender n [block] block point m
[Either (ChainRange point) [block]]
go :: forall (n :: N).
[Either (ChainRange point) [block]]
-> [ChainRange point]
-> Nat n
-> BlockFetchSender
n [block] block point m [Either (ChainRange point) [block]]
go [Either (ChainRange point) [block]]
acc [] (Succ Nat n
n) = Maybe
(BlockFetchSender
('S n) [block] block point m [Either (ChainRange point) [block]])
-> ([block]
-> BlockFetchSender
n [block] block point m [Either (ChainRange point) [block]])
-> BlockFetchSender
('S n) [block] block point m [Either (ChainRange point) [block]]
forall (n1 :: N) c block point (m :: * -> *) a.
Maybe (BlockFetchSender ('S n1) c block point m a)
-> (c -> BlockFetchSender n1 c block point m a)
-> BlockFetchSender ('S n1) c block point m a
CollectBlocksPipelined
Maybe
(BlockFetchSender
('S n) [block] block point m [Either (ChainRange point) [block]])
forall a. Maybe a
Nothing
(\[block]
bs -> [Either (ChainRange point) [block]]
-> [ChainRange point]
-> Nat n
-> BlockFetchSender
n [block] block point m [Either (ChainRange point) [block]]
forall (n :: N).
[Either (ChainRange point) [block]]
-> [ChainRange point]
-> Nat n
-> BlockFetchSender
n [block] block point m [Either (ChainRange point) [block]]
go ([block] -> Either (ChainRange point) [block]
forall a b. b -> Either a b
Right [block]
bs Either (ChainRange point) [block]
-> [Either (ChainRange point) [block]]
-> [Either (ChainRange point) [block]]
forall a. a -> [a] -> [a]
: [Either (ChainRange point) [block]]
acc) [] Nat n
n)
go [Either (ChainRange point) [block]]
acc (ChainRange point
req : [ChainRange point]
reqs) (Succ Nat n
n) = Maybe
(BlockFetchSender
('S n) [block] block point m [Either (ChainRange point) [block]])
-> ([block]
-> BlockFetchSender
n [block] block point m [Either (ChainRange point) [block]])
-> BlockFetchSender
('S n) [block] block point m [Either (ChainRange point) [block]]
forall (n1 :: N) c block point (m :: * -> *) a.
Maybe (BlockFetchSender ('S n1) c block point m a)
-> (c -> BlockFetchSender n1 c block point m a)
-> BlockFetchSender ('S n1) c block point m a
CollectBlocksPipelined
(BlockFetchSender
('S n) [block] block point m [Either (ChainRange point) [block]]
-> Maybe
(BlockFetchSender
('S n) [block] block point m [Either (ChainRange point) [block]])
forall a. a -> Maybe a
Just (BlockFetchSender
('S n) [block] block point m [Either (ChainRange point) [block]]
-> Maybe
(BlockFetchSender
('S n) [block] block point m [Either (ChainRange point) [block]]))
-> BlockFetchSender
('S n) [block] block point m [Either (ChainRange point) [block]]
-> Maybe
(BlockFetchSender
('S n) [block] block point m [Either (ChainRange point) [block]])
forall a b. (a -> b) -> a -> b
$ [Either (ChainRange point) [block]]
-> ChainRange point
-> [ChainRange point]
-> Nat ('S n)
-> BlockFetchSender
('S n) [block] block point m [Either (ChainRange point) [block]]
forall (n :: N).
[Either (ChainRange point) [block]]
-> ChainRange point
-> [ChainRange point]
-> Nat n
-> BlockFetchSender
n [block] block point m [Either (ChainRange point) [block]]
requestMore [Either (ChainRange point) [block]]
acc ChainRange point
req [ChainRange point]
reqs (Nat n -> Nat ('S n)
forall (m :: N) (n :: N). (m ~ 'S n) => Nat n -> Nat m
Succ Nat n
n))
(\[block]
bs -> [Either (ChainRange point) [block]]
-> [ChainRange point]
-> Nat n
-> BlockFetchSender
n [block] block point m [Either (ChainRange point) [block]]
forall (n :: N).
[Either (ChainRange point) [block]]
-> [ChainRange point]
-> Nat n
-> BlockFetchSender
n [block] block point m [Either (ChainRange point) [block]]
go ([block] -> Either (ChainRange point) [block]
forall a b. b -> Either a b
Right [block]
bs Either (ChainRange point) [block]
-> [Either (ChainRange point) [block]]
-> [Either (ChainRange point) [block]]
forall a. a -> [a] -> [a]
: [Either (ChainRange point) [block]]
acc) (ChainRange point
req ChainRange point -> [ChainRange point] -> [ChainRange point]
forall a. a -> [a] -> [a]
: [ChainRange point]
reqs) Nat n
n)
go [Either (ChainRange point) [block]]
acc (ChainRange point
req : [ChainRange point]
reqs) Nat n
Zero = [Either (ChainRange point) [block]]
-> ChainRange point
-> [ChainRange point]
-> Nat n
-> BlockFetchSender
n [block] block point m [Either (ChainRange point) [block]]
forall (n :: N).
[Either (ChainRange point) [block]]
-> ChainRange point
-> [ChainRange point]
-> Nat n
-> BlockFetchSender
n [block] block point m [Either (ChainRange point) [block]]
requestMore [Either (ChainRange point) [block]]
acc ChainRange point
req [ChainRange point]
reqs Nat n
forall (n :: N). ('Z ~ n) => Nat n
Zero
go [Either (ChainRange point) [block]]
acc [] Nat n
Zero = [Either (ChainRange point) [block]]
-> BlockFetchSender
'Z [block] block point m [Either (ChainRange point) [block]]
forall a c block point (m :: * -> *).
a -> BlockFetchSender 'Z c block point m a
SendMsgDonePipelined [Either (ChainRange point) [block]]
acc
requestMore :: [Either (ChainRange point) [block]]
-> ChainRange point -> [ChainRange point]
-> Nat n
-> BlockFetchSender n [block] block point m
[Either (ChainRange point) [block]]
requestMore :: forall (n :: N).
[Either (ChainRange point) [block]]
-> ChainRange point
-> [ChainRange point]
-> Nat n
-> BlockFetchSender
n [block] block point m [Either (ChainRange point) [block]]
requestMore [Either (ChainRange point) [block]]
acc ChainRange point
req [ChainRange point]
reqs Nat n
n = ChainRange point
-> [block]
-> (Maybe block -> [block] -> m [block])
-> BlockFetchSender
('S n) [block] block point m [Either (ChainRange point) [block]]
-> BlockFetchSender
n [block] block point m [Either (ChainRange point) [block]]
forall point c block (m :: * -> *) (n :: N) a.
ChainRange point
-> c
-> (Maybe block -> c -> m c)
-> BlockFetchSender ('S n) c block point m a
-> BlockFetchSender n c block point m a
SendMsgRequestRangePipelined
ChainRange point
req
[]
(\Maybe block
mBlock [block]
c -> case Maybe block
mBlock of
Maybe block
Nothing -> [block] -> m [block]
forall a. a -> m a
forall (m :: * -> *) a. Monad m => a -> m a
return [block]
c
Just block
b -> [block] -> m [block]
forall a. a -> m a
forall (m :: * -> *) a. Monad m => a -> m a
return (block
b block -> [block] -> [block]
forall a. a -> [a] -> [a]
: [block]
c))
([Either (ChainRange point) [block]]
-> [ChainRange point]
-> Nat ('S n)
-> BlockFetchSender
('S n) [block] block point m [Either (ChainRange point) [block]]
forall (n :: N).
[Either (ChainRange point) [block]]
-> [ChainRange point]
-> Nat n
-> BlockFetchSender
n [block] block point m [Either (ChainRange point) [block]]
go (ChainRange point -> Either (ChainRange point) [block]
forall a b. a -> Either a b
Left ChainRange point
req Either (ChainRange point) [block]
-> [Either (ChainRange point) [block]]
-> [Either (ChainRange point) [block]]
forall a. a -> [a] -> [a]
: [Either (ChainRange point) [block]]
acc) [ChainRange point]
reqs (Nat n -> Nat ('S n)
forall (m :: N) (n :: N). (m ~ 'S n) => Nat n -> Nat m
Succ Nat n
n))
blockFetchClientPipelinedLimited
:: forall block point m.
Monad m
=> Int
-> [ChainRange point]
-> BlockFetchClientPipelined block point m [Either (ChainRange point) [block]]
blockFetchClientPipelinedLimited :: forall block point (m :: * -> *).
Monad m =>
Int
-> [ChainRange point]
-> BlockFetchClientPipelined
block point m [Either (ChainRange point) [block]]
blockFetchClientPipelinedLimited Int
omax [ChainRange point]
ranges0 =
BlockFetchSender
'Z [block] block point m [Either (ChainRange point) [block]]
-> BlockFetchClientPipelined
block point m [Either (ChainRange point) [block]]
forall c block point (m :: * -> *) a.
BlockFetchSender 'Z c block point m a
-> BlockFetchClientPipelined block point m a
BlockFetchClientPipelined ([Either (ChainRange point) [block]]
-> [ChainRange point]
-> Nat 'Z
-> BlockFetchSender
'Z [block] block point m [Either (ChainRange point) [block]]
forall (n :: N).
[Either (ChainRange point) [block]]
-> [ChainRange point]
-> Nat n
-> BlockFetchSender
n [block] block point m [Either (ChainRange point) [block]]
go [] [ChainRange point]
ranges0 Nat 'Z
forall (n :: N). ('Z ~ n) => Nat n
Zero)
where
go :: [Either (ChainRange point) [block]]
-> [ChainRange point]
-> Nat n
-> BlockFetchSender n [block] block point m
[Either (ChainRange point) [block]]
go :: forall (n :: N).
[Either (ChainRange point) [block]]
-> [ChainRange point]
-> Nat n
-> BlockFetchSender
n [block] block point m [Either (ChainRange point) [block]]
go [Either (ChainRange point) [block]]
acc [] (Succ Nat n
n) = Maybe
(BlockFetchSender
('S n) [block] block point m [Either (ChainRange point) [block]])
-> ([block]
-> BlockFetchSender
n [block] block point m [Either (ChainRange point) [block]])
-> BlockFetchSender
('S n) [block] block point m [Either (ChainRange point) [block]]
forall (n1 :: N) c block point (m :: * -> *) a.
Maybe (BlockFetchSender ('S n1) c block point m a)
-> (c -> BlockFetchSender n1 c block point m a)
-> BlockFetchSender ('S n1) c block point m a
CollectBlocksPipelined
Maybe
(BlockFetchSender
('S n) [block] block point m [Either (ChainRange point) [block]])
forall a. Maybe a
Nothing
(\[block]
bs -> [Either (ChainRange point) [block]]
-> [ChainRange point]
-> Nat n
-> BlockFetchSender
n [block] block point m [Either (ChainRange point) [block]]
forall (n :: N).
[Either (ChainRange point) [block]]
-> [ChainRange point]
-> Nat n
-> BlockFetchSender
n [block] block point m [Either (ChainRange point) [block]]
go ([block] -> Either (ChainRange point) [block]
forall a b. b -> Either a b
Right [block]
bs Either (ChainRange point) [block]
-> [Either (ChainRange point) [block]]
-> [Either (ChainRange point) [block]]
forall a. a -> [a] -> [a]
: [Either (ChainRange point) [block]]
acc) [] Nat n
n)
go [Either (ChainRange point) [block]]
acc rs :: [ChainRange point]
rs@(ChainRange point
req : [ChainRange point]
reqs) (Succ Nat n
n) = Maybe
(BlockFetchSender
('S n) [block] block point m [Either (ChainRange point) [block]])
-> ([block]
-> BlockFetchSender
n [block] block point m [Either (ChainRange point) [block]])
-> BlockFetchSender
('S n) [block] block point m [Either (ChainRange point) [block]]
forall (n1 :: N) c block point (m :: * -> *) a.
Maybe (BlockFetchSender ('S n1) c block point m a)
-> (c -> BlockFetchSender n1 c block point m a)
-> BlockFetchSender ('S n1) c block point m a
CollectBlocksPipelined
(if Nat ('S n) -> Int
forall (n :: N). Nat n -> Int
int (Nat n -> Nat ('S n)
forall (m :: N) (n :: N). (m ~ 'S n) => Nat n -> Nat m
Succ Nat n
n) Int -> Int -> Bool
forall a. Ord a => a -> a -> Bool
< Int
omax
then BlockFetchSender
('S n) [block] block point m [Either (ChainRange point) [block]]
-> Maybe
(BlockFetchSender
('S n) [block] block point m [Either (ChainRange point) [block]])
forall a. a -> Maybe a
Just (BlockFetchSender
('S n) [block] block point m [Either (ChainRange point) [block]]
-> Maybe
(BlockFetchSender
('S n) [block] block point m [Either (ChainRange point) [block]]))
-> BlockFetchSender
('S n) [block] block point m [Either (ChainRange point) [block]]
-> Maybe
(BlockFetchSender
('S n) [block] block point m [Either (ChainRange point) [block]])
forall a b. (a -> b) -> a -> b
$ [Either (ChainRange point) [block]]
-> ChainRange point
-> [ChainRange point]
-> Nat ('S n)
-> BlockFetchSender
('S n) [block] block point m [Either (ChainRange point) [block]]
forall (n :: N).
[Either (ChainRange point) [block]]
-> ChainRange point
-> [ChainRange point]
-> Nat n
-> BlockFetchSender
n [block] block point m [Either (ChainRange point) [block]]
requestMore [Either (ChainRange point) [block]]
acc ChainRange point
req [ChainRange point]
reqs (Nat n -> Nat ('S n)
forall (m :: N) (n :: N). (m ~ 'S n) => Nat n -> Nat m
Succ Nat n
n)
else Maybe
(BlockFetchSender
('S n) [block] block point m [Either (ChainRange point) [block]])
forall a. Maybe a
Nothing)
(\[block]
bs -> [Either (ChainRange point) [block]]
-> [ChainRange point]
-> Nat n
-> BlockFetchSender
n [block] block point m [Either (ChainRange point) [block]]
forall (n :: N).
[Either (ChainRange point) [block]]
-> [ChainRange point]
-> Nat n
-> BlockFetchSender
n [block] block point m [Either (ChainRange point) [block]]
go ([block] -> Either (ChainRange point) [block]
forall a b. b -> Either a b
Right [block]
bs Either (ChainRange point) [block]
-> [Either (ChainRange point) [block]]
-> [Either (ChainRange point) [block]]
forall a. a -> [a] -> [a]
: [Either (ChainRange point) [block]]
acc) [ChainRange point]
rs Nat n
n)
go [Either (ChainRange point) [block]]
acc (ChainRange point
req : [ChainRange point]
reqs) Nat n
Zero = [Either (ChainRange point) [block]]
-> ChainRange point
-> [ChainRange point]
-> Nat n
-> BlockFetchSender
n [block] block point m [Either (ChainRange point) [block]]
forall (n :: N).
[Either (ChainRange point) [block]]
-> ChainRange point
-> [ChainRange point]
-> Nat n
-> BlockFetchSender
n [block] block point m [Either (ChainRange point) [block]]
requestMore [Either (ChainRange point) [block]]
acc ChainRange point
req [ChainRange point]
reqs Nat n
forall (n :: N). ('Z ~ n) => Nat n
Zero
go [Either (ChainRange point) [block]]
acc [] Nat n
Zero = [Either (ChainRange point) [block]]
-> BlockFetchSender
'Z [block] block point m [Either (ChainRange point) [block]]
forall a c block point (m :: * -> *).
a -> BlockFetchSender 'Z c block point m a
SendMsgDonePipelined [Either (ChainRange point) [block]]
acc
requestMore :: [Either (ChainRange point) [block]]
-> ChainRange point -> [ChainRange point]
-> Nat n
-> BlockFetchSender n [block] block point m
[Either (ChainRange point) [block]]
requestMore :: forall (n :: N).
[Either (ChainRange point) [block]]
-> ChainRange point
-> [ChainRange point]
-> Nat n
-> BlockFetchSender
n [block] block point m [Either (ChainRange point) [block]]
requestMore [Either (ChainRange point) [block]]
acc ChainRange point
req [ChainRange point]
reqs Nat n
n = ChainRange point
-> [block]
-> (Maybe block -> [block] -> m [block])
-> BlockFetchSender
('S n) [block] block point m [Either (ChainRange point) [block]]
-> BlockFetchSender
n [block] block point m [Either (ChainRange point) [block]]
forall point c block (m :: * -> *) (n :: N) a.
ChainRange point
-> c
-> (Maybe block -> c -> m c)
-> BlockFetchSender ('S n) c block point m a
-> BlockFetchSender n c block point m a
SendMsgRequestRangePipelined
ChainRange point
req
[]
(\Maybe block
mBlock [block]
c -> case Maybe block
mBlock of
Maybe block
Nothing -> [block] -> m [block]
forall a. a -> m a
forall (m :: * -> *) a. Monad m => a -> m a
return [block]
c
Just block
b -> [block] -> m [block]
forall a. a -> m a
forall (m :: * -> *) a. Monad m => a -> m a
return (block
b block -> [block] -> [block]
forall a. a -> [a] -> [a]
: [block]
c))
([Either (ChainRange point) [block]]
-> [ChainRange point]
-> Nat ('S n)
-> BlockFetchSender
('S n) [block] block point m [Either (ChainRange point) [block]]
forall (n :: N).
[Either (ChainRange point) [block]]
-> [ChainRange point]
-> Nat n
-> BlockFetchSender
n [block] block point m [Either (ChainRange point) [block]]
go (ChainRange point -> Either (ChainRange point) [block]
forall a b. a -> Either a b
Left ChainRange point
req Either (ChainRange point) [block]
-> [Either (ChainRange point) [block]]
-> [Either (ChainRange point) [block]]
forall a. a -> [a] -> [a]
: [Either (ChainRange point) [block]]
acc) [ChainRange point]
reqs (Nat n -> Nat ('S n)
forall (m :: N) (n :: N). (m ~ 'S n) => Nat n -> Nat m
Succ Nat n
n))
int :: Nat n -> Int
int :: forall (n :: N). Nat n -> Int
int Nat n
Zero = Int
0
int (Succ Nat n
n) = Int -> Int
forall a. Enum a => a -> a
succ (Nat n -> Int
forall (n :: N). Nat n -> Int
int Nat n
n)
newtype RangeRequests m block = RangeRequests {
forall (m :: * -> *) block.
RangeRequests m block
-> ChainRange (Point block)
-> Producer block m (RangeRequests m block)
runRangeRequest :: ChainRange (Point block)
-> Pipes.Producer block m (RangeRequests m block)
}
constantRangeRequests
:: Monad m
=> (ChainRange (Point block) -> Pipes.Producer block m ())
-> RangeRequests m block
constantRangeRequests :: forall (m :: * -> *) block.
Monad m =>
(ChainRange (Point block) -> Producer block m ())
-> RangeRequests m block
constantRangeRequests ChainRange (Point block) -> Producer block m ()
f = (ChainRange (Point block)
-> Producer block m (RangeRequests m block))
-> RangeRequests m block
forall (m :: * -> *) block.
(ChainRange (Point block)
-> Producer block m (RangeRequests m block))
-> RangeRequests m block
RangeRequests (\ChainRange (Point block)
range -> ChainRange (Point block) -> Producer block m ()
f ChainRange (Point block)
range Producer block m ()
-> RangeRequests m block
-> Producer block m (RangeRequests m block)
forall (f :: * -> *) a b. Functor f => f a -> b -> f b
$> (ChainRange (Point block) -> Producer block m ())
-> RangeRequests m block
forall (m :: * -> *) block.
Monad m =>
(ChainRange (Point block) -> Producer block m ())
-> RangeRequests m block
constantRangeRequests ChainRange (Point block) -> Producer block m ()
f)
rangeRequestsFromChain
:: ( Monad m
, HasHeader block
)
=> Chain block
-> RangeRequests m block
rangeRequestsFromChain :: forall (m :: * -> *) block.
(Monad m, HasHeader block) =>
Chain block -> RangeRequests m block
rangeRequestsFromChain Chain block
chain = (ChainRange (Point block) -> Producer block m ())
-> RangeRequests m block
forall (m :: * -> *) block.
Monad m =>
(ChainRange (Point block) -> Producer block m ())
-> RangeRequests m block
constantRangeRequests ((ChainRange (Point block) -> Producer block m ())
-> RangeRequests m block)
-> (ChainRange (Point block) -> Producer block m ())
-> RangeRequests m block
forall a b. (a -> b) -> a -> b
$ \(ChainRange Point block
from Point block
to) ->
[block] -> Producer block m ()
forall (m :: * -> *) (f :: * -> *) a x' x.
(Functor m, Foldable f) =>
f a -> Proxy x' x () a m ()
Pipes.each ([block] -> Producer block m ()) -> [block] -> Producer block m ()
forall a b. (a -> b) -> a -> b
$ [block] -> Maybe [block] -> [block]
forall a. a -> Maybe a -> a
fromMaybe [] (Maybe [block] -> [block]) -> Maybe [block] -> [block]
forall a b. (a -> b) -> a -> b
$ Chain block -> Point block -> Point block -> Maybe [block]
forall block.
HasHeader block =>
Chain block -> Point block -> Point block -> Maybe [block]
Chain.selectBlockRange Chain block
chain Point block
from Point block
to
blockFetchServer
:: forall m block.
Monad m
=> RangeRequests m block
-> BlockFetchServer block (Point block) m ()
blockFetchServer :: forall (m :: * -> *) block.
Monad m =>
RangeRequests m block -> BlockFetchServer block (Point block) m ()
blockFetchServer (RangeRequests ChainRange (Point block)
-> Producer block m (RangeRequests m block)
rangeRequest) = (ChainRange (Point block)
-> m (BlockFetchBlockSender block (Point block) m ()))
-> () -> BlockFetchServer block (Point block) m ()
forall point (m :: * -> *) block a.
(ChainRange point -> m (BlockFetchBlockSender block point m a))
-> a -> BlockFetchServer block point m a
BlockFetchServer ChainRange (Point block)
-> m (BlockFetchBlockSender block (Point block) m ())
handleRequest ()
where
handleRequest
:: ChainRange (Point block)
-> m (BlockFetchBlockSender block (Point block) m ())
handleRequest :: ChainRange (Point block)
-> m (BlockFetchBlockSender block (Point block) m ())
handleRequest ChainRange (Point block)
range = do
stream <- Producer block m (RangeRequests m block)
-> m (Either
(RangeRequests m block)
(block, Producer block m (RangeRequests m block)))
forall (m :: * -> *) a r.
Monad m =>
Producer a m r -> m (Either r (a, Producer a m r))
Pipes.next (Producer block m (RangeRequests m block)
-> m (Either
(RangeRequests m block)
(block, Producer block m (RangeRequests m block))))
-> Producer block m (RangeRequests m block)
-> m (Either
(RangeRequests m block)
(block, Producer block m (RangeRequests m block)))
forall a b. (a -> b) -> a -> b
$ ChainRange (Point block)
-> Producer block m (RangeRequests m block)
rangeRequest ChainRange (Point block)
range
case stream of
Left RangeRequests m block
rangeRequest' ->
BlockFetchBlockSender block (Point block) m ()
-> m (BlockFetchBlockSender block (Point block) m ())
forall a. a -> m a
forall (m :: * -> *) a. Monad m => a -> m a
return (BlockFetchBlockSender block (Point block) m ()
-> m (BlockFetchBlockSender block (Point block) m ()))
-> BlockFetchBlockSender block (Point block) m ()
-> m (BlockFetchBlockSender block (Point block) m ())
forall a b. (a -> b) -> a -> b
$ m (BlockFetchServer block (Point block) m ())
-> BlockFetchBlockSender block (Point block) m ()
forall (m :: * -> *) block point a.
m (BlockFetchServer block point m a)
-> BlockFetchBlockSender block point m a
SendMsgNoBlocks (BlockFetchServer block (Point block) m ()
-> m (BlockFetchServer block (Point block) m ())
forall a. a -> m a
forall (m :: * -> *) a. Monad m => a -> m a
return (BlockFetchServer block (Point block) m ()
-> m (BlockFetchServer block (Point block) m ()))
-> BlockFetchServer block (Point block) m ()
-> m (BlockFetchServer block (Point block) m ())
forall a b. (a -> b) -> a -> b
$ RangeRequests m block -> BlockFetchServer block (Point block) m ()
forall (m :: * -> *) block.
Monad m =>
RangeRequests m block -> BlockFetchServer block (Point block) m ()
blockFetchServer RangeRequests m block
rangeRequest')
Right (block
block', Producer block m (RangeRequests m block)
stream') ->
BlockFetchBlockSender block (Point block) m ()
-> m (BlockFetchBlockSender block (Point block) m ())
forall a. a -> m a
forall (m :: * -> *) a. Monad m => a -> m a
return (BlockFetchBlockSender block (Point block) m ()
-> m (BlockFetchBlockSender block (Point block) m ()))
-> BlockFetchBlockSender block (Point block) m ()
-> m (BlockFetchBlockSender block (Point block) m ())
forall a b. (a -> b) -> a -> b
$ m (BlockFetchSendBlocks block (Point block) m ())
-> BlockFetchBlockSender block (Point block) m ()
forall (m :: * -> *) block point a.
m (BlockFetchSendBlocks block point m a)
-> BlockFetchBlockSender block point m a
SendMsgStartBatch (block
-> Producer block m (RangeRequests m block)
-> m (BlockFetchSendBlocks block (Point block) m ())
sendStream block
block' Producer block m (RangeRequests m block)
stream')
sendStream
:: block
-> Pipes.Producer block m (RangeRequests m block)
-> m (BlockFetchSendBlocks block (Point block) m ())
sendStream :: block
-> Producer block m (RangeRequests m block)
-> m (BlockFetchSendBlocks block (Point block) m ())
sendStream block
block Producer block m (RangeRequests m block)
stream =
BlockFetchSendBlocks block (Point block) m ()
-> m (BlockFetchSendBlocks block (Point block) m ())
forall a. a -> m a
forall (m :: * -> *) a. Monad m => a -> m a
return (BlockFetchSendBlocks block (Point block) m ()
-> m (BlockFetchSendBlocks block (Point block) m ()))
-> BlockFetchSendBlocks block (Point block) m ()
-> m (BlockFetchSendBlocks block (Point block) m ())
forall a b. (a -> b) -> a -> b
$ block
-> m (BlockFetchSendBlocks block (Point block) m ())
-> BlockFetchSendBlocks block (Point block) m ()
forall block (m :: * -> *) point a.
block
-> m (BlockFetchSendBlocks block point m a)
-> BlockFetchSendBlocks block point m a
SendMsgBlock block
block (m (BlockFetchSendBlocks block (Point block) m ())
-> BlockFetchSendBlocks block (Point block) m ())
-> m (BlockFetchSendBlocks block (Point block) m ())
-> BlockFetchSendBlocks block (Point block) m ()
forall a b. (a -> b) -> a -> b
$ do
next <- Producer block m (RangeRequests m block)
-> m (Either
(RangeRequests m block)
(block, Producer block m (RangeRequests m block)))
forall (m :: * -> *) a r.
Monad m =>
Producer a m r -> m (Either r (a, Producer a m r))
Pipes.next Producer block m (RangeRequests m block)
stream
case next of
Left RangeRequests m block
rangeRequest' -> BlockFetchSendBlocks block (Point block) m ()
-> m (BlockFetchSendBlocks block (Point block) m ())
forall a. a -> m a
forall (m :: * -> *) a. Monad m => a -> m a
return (BlockFetchSendBlocks block (Point block) m ()
-> m (BlockFetchSendBlocks block (Point block) m ()))
-> BlockFetchSendBlocks block (Point block) m ()
-> m (BlockFetchSendBlocks block (Point block) m ())
forall a b. (a -> b) -> a -> b
$ m (BlockFetchServer block (Point block) m ())
-> BlockFetchSendBlocks block (Point block) m ()
forall (m :: * -> *) block point a.
m (BlockFetchServer block point m a)
-> BlockFetchSendBlocks block point m a
SendMsgBatchDone (BlockFetchServer block (Point block) m ()
-> m (BlockFetchServer block (Point block) m ())
forall a. a -> m a
forall (m :: * -> *) a. Monad m => a -> m a
return (BlockFetchServer block (Point block) m ()
-> m (BlockFetchServer block (Point block) m ()))
-> BlockFetchServer block (Point block) m ()
-> m (BlockFetchServer block (Point block) m ())
forall a b. (a -> b) -> a -> b
$ RangeRequests m block -> BlockFetchServer block (Point block) m ()
forall (m :: * -> *) block.
Monad m =>
RangeRequests m block -> BlockFetchServer block (Point block) m ()
blockFetchServer RangeRequests m block
rangeRequest')
Right (block
block', Producer block m (RangeRequests m block)
stream') -> block
-> Producer block m (RangeRequests m block)
-> m (BlockFetchSendBlocks block (Point block) m ())
sendStream block
block' Producer block m (RangeRequests m block)
stream'