{-# LANGUAGE BangPatterns #-}
{-# LANGUAGE DataKinds #-}
{-# LANGUAGE NamedFieldPuns #-}
{-# LANGUAGE ScopedTypeVariables #-}
{-# LANGUAGE TypeFamilies #-}
{-# LANGUAGE TypeOperators #-}
{-# OPTIONS_GHC -Wno-incomplete-uni-patterns #-}
module Ouroboros.Network.BlockFetch.Client
(
blockFetchClient
, BlockFetchClient
, FetchClientContext
, TraceFetchClientState
, FetchRequest (..)
, FetchClientStateVars
, BlockFetchProtocolFailure
) where
import Control.Concurrent.Class.MonadSTM.Strict
import Control.Exception (assert)
import Control.Monad (unless)
import Control.Monad.Class.MonadThrow
import Control.Monad.Class.MonadTime.SI
import Control.Tracer (traceWith)
import Ouroboros.Network.Block
import Network.TypedProtocol.Core
import Network.TypedProtocol.Peer.Client
import Ouroboros.Network.ControlMessage (ControlMessageSTM)
import Ouroboros.Network.Protocol.BlockFetch.Type
import Ouroboros.Network.AnchoredFragment (AnchoredFragment)
import Ouroboros.Network.AnchoredFragment qualified as AF
import Ouroboros.Network.BlockFetch.ClientState (FetchClientContext (..),
FetchClientPolicy (..), FetchClientStateVars (..), FetchRequest (..),
FromConsensus (..), TraceFetchClientState (..),
acknowledgeFetchRequest, completeBlockDownload, completeFetchBatch,
fetchClientCtxStateVars, rejectedFetchBatch, startedFetchBatch)
import Ouroboros.Network.BlockFetch.DeltaQ (PeerFetchInFlightLimits (..),
PeerGSV (..))
import Ouroboros.Network.PeerSelection.PeerMetric.Type (FetchedMetricsTracer)
data BlockFetchProtocolFailure =
BlockFetchProtocolFailureTooFewBlocks
| BlockFetchProtocolFailureTooManyBlocks
| BlockFetchProtocolFailureWrongBlock
| BlockFetchProtocolFailureInvalidBody
deriving (BlockFetchProtocolFailure -> BlockFetchProtocolFailure -> Bool
(BlockFetchProtocolFailure -> BlockFetchProtocolFailure -> Bool)
-> (BlockFetchProtocolFailure -> BlockFetchProtocolFailure -> Bool)
-> Eq BlockFetchProtocolFailure
forall a. (a -> a -> Bool) -> (a -> a -> Bool) -> Eq a
$c== :: BlockFetchProtocolFailure -> BlockFetchProtocolFailure -> Bool
== :: BlockFetchProtocolFailure -> BlockFetchProtocolFailure -> Bool
$c/= :: BlockFetchProtocolFailure -> BlockFetchProtocolFailure -> Bool
/= :: BlockFetchProtocolFailure -> BlockFetchProtocolFailure -> Bool
Eq, Int -> BlockFetchProtocolFailure -> ShowS
[BlockFetchProtocolFailure] -> ShowS
BlockFetchProtocolFailure -> String
(Int -> BlockFetchProtocolFailure -> ShowS)
-> (BlockFetchProtocolFailure -> String)
-> ([BlockFetchProtocolFailure] -> ShowS)
-> Show BlockFetchProtocolFailure
forall a.
(Int -> a -> ShowS) -> (a -> String) -> ([a] -> ShowS) -> Show a
$cshowsPrec :: Int -> BlockFetchProtocolFailure -> ShowS
showsPrec :: Int -> BlockFetchProtocolFailure -> ShowS
$cshow :: BlockFetchProtocolFailure -> String
show :: BlockFetchProtocolFailure -> String
$cshowList :: [BlockFetchProtocolFailure] -> ShowS
showList :: [BlockFetchProtocolFailure] -> ShowS
Show)
instance Exception BlockFetchProtocolFailure
type BlockFetchClient header block m a =
FetchClientContext header block m ->
ClientPipelined (BlockFetch block (Point block)) BFIdle m a
blockFetchClient :: forall header block versionNumber m.
(MonadSTM m, MonadThrow m, MonadTime m,
MonadMonotonicTime m, HasHeader header,
HasHeader block, HeaderHash header ~ HeaderHash block)
=> versionNumber
-> ControlMessageSTM m
-> FetchedMetricsTracer m
-> FetchClientContext header block m
-> ClientPipelined (BlockFetch block (Point block)) BFIdle m ()
blockFetchClient :: forall header block versionNumber (m :: * -> *).
(MonadSTM m, MonadThrow m, MonadTime m, MonadMonotonicTime m,
HasHeader header, HasHeader block,
HeaderHash header ~ HeaderHash block) =>
versionNumber
-> ControlMessageSTM m
-> FetchedMetricsTracer m
-> FetchClientContext header block m
-> ClientPipelined (BlockFetch block (Point block)) 'BFIdle m ()
blockFetchClient versionNumber
_version ControlMessageSTM m
controlMessageSTM FetchedMetricsTracer m
reportFetched
FetchClientContext {
fetchClientCtxTracer :: forall header block (m :: * -> *).
FetchClientContext header block m
-> Tracer m (TraceFetchClientState header)
fetchClientCtxTracer = Tracer m (TraceFetchClientState header)
tracer,
fetchClientCtxPolicy :: forall header block (m :: * -> *).
FetchClientContext header block m
-> FetchClientPolicy header block m
fetchClientCtxPolicy = FetchClientPolicy {
header -> SizeInBytes
blockFetchSize :: header -> SizeInBytes
blockFetchSize :: forall header block (m :: * -> *).
FetchClientPolicy header block m -> header -> SizeInBytes
blockFetchSize,
header -> block -> Bool
blockMatchesHeader :: header -> block -> Bool
blockMatchesHeader :: forall header block (m :: * -> *).
FetchClientPolicy header block m -> header -> block -> Bool
blockMatchesHeader,
Point block -> block -> m ()
addFetchedBlock :: Point block -> block -> m ()
addFetchedBlock :: forall header block (m :: * -> *).
FetchClientPolicy header block m -> Point block -> block -> m ()
addFetchedBlock,
FromConsensus header -> STM m UTCTime
headerForgeUTCTime :: FromConsensus header -> STM m UTCTime
headerForgeUTCTime :: forall header block (m :: * -> *).
FetchClientPolicy header block m
-> FromConsensus header -> STM m UTCTime
headerForgeUTCTime
},
fetchClientCtxStateVars :: forall header block (m :: * -> *).
FetchClientContext header block m -> FetchClientStateVars m header
fetchClientCtxStateVars = FetchClientStateVars m header
stateVars
} =
Client
(BlockFetch block (Point block)) ('Pipelined 'Z ()) 'BFIdle m ()
-> ClientPipelined (BlockFetch block (Point block)) 'BFIdle m ()
forall ps (st :: ps) (m :: * -> *) a c.
Client ps ('Pipelined 'Z c) st m a -> ClientPipelined ps st m a
ClientPipelined (Nat 'Z
-> Client
(BlockFetch block (Point block)) ('Pipelined 'Z ()) 'BFIdle m ()
forall (n :: N).
Nat n
-> Client
(BlockFetch block (Point block)) ('Pipelined n ()) 'BFIdle m ()
senderAwait Nat 'Z
forall (n :: N). ('Z ~ n) => Nat n
Zero)
where
senderIdle :: forall n.
Nat n
-> Client (BlockFetch block (Point block)) (Pipelined n ())
BFIdle m ()
senderIdle :: forall (n :: N).
Nat n
-> Client
(BlockFetch block (Point block)) ('Pipelined n ()) 'BFIdle m ()
senderIdle (Succ Nat n
outstanding) =
Maybe
(Client
(BlockFetch block (Point block))
('Pipelined ('S n) ())
'BFIdle
m
())
-> (()
-> Client
(BlockFetch block (Point block)) ('Pipelined n ()) 'BFIdle m ())
-> Client
(BlockFetch block (Point block))
('Pipelined ('S n) ())
'BFIdle
m
()
forall ps (st :: ps) (n :: N) c (m :: * -> *) a.
(StateTokenI st, ActiveState st) =>
Maybe (Client ps ('Pipelined ('S n) c) st m a)
-> (c -> Client ps ('Pipelined n c) st m a)
-> Client ps ('Pipelined ('S n) c) st m a
Collect (Client
(BlockFetch block (Point block))
('Pipelined ('S n) ())
'BFIdle
m
()
-> Maybe
(Client
(BlockFetch block (Point block))
('Pipelined ('S n) ())
'BFIdle
m
())
forall a. a -> Maybe a
Just (Nat ('S n)
-> Client
(BlockFetch block (Point block))
('Pipelined ('S n) ())
'BFIdle
m
()
forall (n :: N).
Nat n
-> Client
(BlockFetch block (Point block)) ('Pipelined n ()) 'BFIdle m ()
senderAwait (Nat n -> Nat ('S n)
forall (m :: N) (n :: N). (m ~ 'S n) => Nat n -> Nat m
Succ Nat n
outstanding)))
(\()
_ -> Nat n
-> Client
(BlockFetch block (Point block)) ('Pipelined n ()) 'BFIdle m ()
forall (n :: N).
Nat n
-> Client
(BlockFetch block (Point block)) ('Pipelined n ()) 'BFIdle m ()
senderIdle Nat n
outstanding)
senderIdle Nat n
Zero = Nat n
-> Peer
(BlockFetch block (Point block))
'AsClient
('Pipelined n ())
'BFIdle
m
()
forall (n :: N).
Nat n
-> Client
(BlockFetch block (Point block)) ('Pipelined n ()) 'BFIdle m ()
senderAwait Nat n
forall (n :: N). ('Z ~ n) => Nat n
Zero
senderAwait :: forall n.
Nat n
-> Client (BlockFetch block (Point block)) (Pipelined n ()) BFIdle m ()
senderAwait :: forall (n :: N).
Nat n
-> Client
(BlockFetch block (Point block)) ('Pipelined n ()) 'BFIdle m ()
senderAwait Nat n
outstanding =
m (Client
(BlockFetch block (Point block)) ('Pipelined n ()) 'BFIdle m ())
-> Client
(BlockFetch block (Point block)) ('Pipelined n ()) 'BFIdle m ()
forall ps (pl :: IsPipelined) (st :: ps) (m :: * -> *) a.
m (Client ps pl st m a) -> Client ps pl st m a
Effect (m (Client
(BlockFetch block (Point block)) ('Pipelined n ()) 'BFIdle m ())
-> Client
(BlockFetch block (Point block)) ('Pipelined n ()) 'BFIdle m ())
-> m (Client
(BlockFetch block (Point block)) ('Pipelined n ()) 'BFIdle m ())
-> Client
(BlockFetch block (Point block)) ('Pipelined n ()) 'BFIdle m ()
forall a b. (a -> b) -> a -> b
$ do
result <-
Tracer m (TraceFetchClientState header)
-> ControlMessageSTM m
-> FetchClientStateVars m header
-> m (Maybe
(FetchRequest header, PeerGSV, PeerFetchInFlightLimits))
forall (m :: * -> *) header.
MonadSTM m =>
Tracer m (TraceFetchClientState header)
-> ControlMessageSTM m
-> FetchClientStateVars m header
-> m (Maybe
(FetchRequest header, PeerGSV, PeerFetchInFlightLimits))
acknowledgeFetchRequest Tracer m (TraceFetchClientState header)
tracer ControlMessageSTM m
controlMessageSTM FetchClientStateVars m header
stateVars
case result of
Maybe (FetchRequest header, PeerGSV, PeerFetchInFlightLimits)
Nothing -> do
Tracer m (TraceFetchClientState header)
-> TraceFetchClientState header -> m ()
forall (m :: * -> *) a. Tracer m a -> a -> m ()
traceWith Tracer m (TraceFetchClientState header)
tracer (Int -> TraceFetchClientState header
forall header. Int -> TraceFetchClientState header
ClientTerminating (Int -> TraceFetchClientState header)
-> Int -> TraceFetchClientState header
forall a b. (a -> b) -> a -> b
$ Nat n -> Int
forall (n :: N). Nat n -> Int
natToInt Nat n
outstanding)
Client
(BlockFetch block (Point block)) ('Pipelined n ()) 'BFIdle m ()
-> m (Client
(BlockFetch block (Point block)) ('Pipelined n ()) 'BFIdle m ())
forall a. a -> m a
forall (m :: * -> *) a. Monad m => a -> m a
return (Client
(BlockFetch block (Point block)) ('Pipelined n ()) 'BFIdle m ()
-> m (Client
(BlockFetch block (Point block)) ('Pipelined n ()) 'BFIdle m ()))
-> Client
(BlockFetch block (Point block)) ('Pipelined n ()) 'BFIdle m ()
-> m (Client
(BlockFetch block (Point block)) ('Pipelined n ()) 'BFIdle m ())
forall a b. (a -> b) -> a -> b
$ Nat n
-> Client
(BlockFetch block (Point block)) ('Pipelined n ()) 'BFIdle m ()
forall (n :: N).
Nat n
-> Client
(BlockFetch block (Point block)) ('Pipelined n ()) 'BFIdle m ()
senderTerminate Nat n
outstanding
Just (FetchRequest header
request, PeerGSV
gsvs, PeerFetchInFlightLimits
inflightlimits) ->
Client
(BlockFetch block (Point block)) ('Pipelined n ()) 'BFIdle m ()
-> m (Client
(BlockFetch block (Point block)) ('Pipelined n ()) 'BFIdle m ())
forall a. a -> m a
forall (m :: * -> *) a. Monad m => a -> m a
return (Client
(BlockFetch block (Point block)) ('Pipelined n ()) 'BFIdle m ()
-> m (Client
(BlockFetch block (Point block)) ('Pipelined n ()) 'BFIdle m ()))
-> Client
(BlockFetch block (Point block)) ('Pipelined n ()) 'BFIdle m ()
-> m (Client
(BlockFetch block (Point block)) ('Pipelined n ()) 'BFIdle m ())
forall a b. (a -> b) -> a -> b
$ Nat n
-> PeerGSV
-> PeerFetchInFlightLimits
-> [AnchoredFragment header]
-> Client
(BlockFetch block (Point block)) ('Pipelined n ()) 'BFIdle m ()
forall (n :: N).
Nat n
-> PeerGSV
-> PeerFetchInFlightLimits
-> [AnchoredFragment header]
-> Client
(BlockFetch block (Point block)) ('Pipelined n ()) 'BFIdle m ()
senderActive Nat n
outstanding PeerGSV
gsvs PeerFetchInFlightLimits
inflightlimits
(FetchRequest header -> [AnchoredFragment header]
forall header. FetchRequest header -> [AnchoredFragment header]
fetchRequestFragments FetchRequest header
request)
senderActive :: forall n.
Nat n
-> PeerGSV
-> PeerFetchInFlightLimits
-> [AnchoredFragment header]
-> Client (BlockFetch block (Point block)) (Pipelined n ()) BFIdle m ()
senderActive :: forall (n :: N).
Nat n
-> PeerGSV
-> PeerFetchInFlightLimits
-> [AnchoredFragment header]
-> Client
(BlockFetch block (Point block)) ('Pipelined n ()) 'BFIdle m ()
senderActive Nat n
outstanding PeerGSV
gsvs PeerFetchInFlightLimits
inflightlimits (AnchoredFragment header
fragment:[AnchoredFragment header]
fragments) =
m (Client
(BlockFetch block (Point block)) ('Pipelined n ()) 'BFIdle m ())
-> Client
(BlockFetch block (Point block)) ('Pipelined n ()) 'BFIdle m ()
forall ps (pl :: IsPipelined) (st :: ps) (m :: * -> *) a.
m (Client ps pl st m a) -> Client ps pl st m a
Effect (m (Client
(BlockFetch block (Point block)) ('Pipelined n ()) 'BFIdle m ())
-> Client
(BlockFetch block (Point block)) ('Pipelined n ()) 'BFIdle m ())
-> m (Client
(BlockFetch block (Point block)) ('Pipelined n ()) 'BFIdle m ())
-> Client
(BlockFetch block (Point block)) ('Pipelined n ()) 'BFIdle m ()
forall a b. (a -> b) -> a -> b
$ do
let range :: ChainRange (Point header)
!range :: ChainRange (Point header)
range = Bool -> ChainRange (Point header) -> ChainRange (Point header)
forall a. (?callStack::CallStack) => Bool -> a -> a
assert (Bool -> Bool
not (AnchoredFragment header -> Bool
forall v a b. AnchoredSeq v a b -> Bool
AF.null AnchoredFragment header
fragment)) (ChainRange (Point header) -> ChainRange (Point header))
-> ChainRange (Point header) -> ChainRange (Point header)
forall a b. (a -> b) -> a -> b
$
Point header -> Point header -> ChainRange (Point header)
forall point. point -> point -> ChainRange point
ChainRange (header -> Point header
forall block. HasHeader block => block -> Point block
blockPoint header
lower)
(header -> Point header
forall block. HasHeader block => block -> Point block
blockPoint header
upper)
where
Right header
lower = AnchoredFragment header -> Either (Anchor header) header
forall v a b. Anchorable v a b => AnchoredSeq v a b -> Either a b
AF.last AnchoredFragment header
fragment
Right header
upper = AnchoredFragment header -> Either (Anchor header) header
forall v a b. Anchorable v a b => AnchoredSeq v a b -> Either a b
AF.head AnchoredFragment header
fragment
Tracer m (TraceFetchClientState header)
-> TraceFetchClientState header -> m ()
forall (m :: * -> *) a. Tracer m a -> a -> m ()
traceWith Tracer m (TraceFetchClientState header)
tracer (AnchoredFragment header -> PeerGSV -> TraceFetchClientState header
forall header.
AnchoredFragment header -> PeerGSV -> TraceFetchClientState header
SendFetchRequest AnchoredFragment header
fragment PeerGSV
gsvs)
Client
(BlockFetch block (Point block)) ('Pipelined n ()) 'BFIdle m ()
-> m (Client
(BlockFetch block (Point block)) ('Pipelined n ()) 'BFIdle m ())
forall a. a -> m a
forall (m :: * -> *) a. Monad m => a -> m a
return (Client
(BlockFetch block (Point block)) ('Pipelined n ()) 'BFIdle m ()
-> m (Client
(BlockFetch block (Point block)) ('Pipelined n ()) 'BFIdle m ()))
-> Client
(BlockFetch block (Point block)) ('Pipelined n ()) 'BFIdle m ()
-> m (Client
(BlockFetch block (Point block)) ('Pipelined n ()) 'BFIdle m ())
forall a b. (a -> b) -> a -> b
$
Message (BlockFetch block (Point block)) 'BFIdle 'BFBusy
-> Receiver (BlockFetch block (Point block)) 'BFBusy 'BFIdle m ()
-> Client
(BlockFetch block (Point block))
('Pipelined ('S n) ())
'BFIdle
m
()
-> Client
(BlockFetch block (Point block)) ('Pipelined n ()) 'BFIdle m ()
forall ps (st :: ps) (n :: N) c (m :: * -> *) a (st' :: ps)
(st'' :: ps).
(StateTokenI st, StateTokenI st',
StateAgency st ~ 'ClientAgency) =>
Message ps st st'
-> Receiver ps st' st'' m c
-> Client ps ('Pipelined ('S n) c) st'' m a
-> Client ps ('Pipelined n c) st m a
YieldPipelined
(ChainRange (Point block)
-> Message (BlockFetch block (Point block)) 'BFIdle 'BFBusy
forall {k} point1 (block :: k).
ChainRange point1
-> Message (BlockFetch block point1) 'BFIdle 'BFBusy
MsgRequestRange (ChainRange (Point header) -> ChainRange (Point block)
forall a b.
(HeaderHash a ~ HeaderHash b) =>
ChainRange (Point a) -> ChainRange (Point b)
castRange ChainRange (Point header)
range))
(ChainRange (Point header)
-> AnchoredFragment header
-> PeerFetchInFlightLimits
-> Receiver (BlockFetch block (Point block)) 'BFBusy 'BFIdle m ()
receiverBusy ChainRange (Point header)
range AnchoredFragment header
fragment PeerFetchInFlightLimits
inflightlimits)
(Nat ('S n)
-> PeerGSV
-> PeerFetchInFlightLimits
-> [AnchoredFragment header]
-> Client
(BlockFetch block (Point block))
('Pipelined ('S n) ())
'BFIdle
m
()
forall (n :: N).
Nat n
-> PeerGSV
-> PeerFetchInFlightLimits
-> [AnchoredFragment header]
-> Client
(BlockFetch block (Point block)) ('Pipelined n ()) 'BFIdle m ()
senderActive (Nat n -> Nat ('S n)
forall (m :: N) (n :: N). (m ~ 'S n) => Nat n -> Nat m
Succ Nat n
outstanding) PeerGSV
gsvs PeerFetchInFlightLimits
inflightlimits [AnchoredFragment header]
fragments)
senderActive Nat n
outstanding PeerGSV
_ PeerFetchInFlightLimits
_ [] = Nat n
-> Client
(BlockFetch block (Point block)) ('Pipelined n ()) 'BFIdle m ()
forall (n :: N).
Nat n
-> Client
(BlockFetch block (Point block)) ('Pipelined n ()) 'BFIdle m ()
senderIdle Nat n
outstanding
senderTerminate :: forall n.
Nat n
-> Client (BlockFetch block (Point block)) (Pipelined n ()) BFIdle m ()
senderTerminate :: forall (n :: N).
Nat n
-> Client
(BlockFetch block (Point block)) ('Pipelined n ()) 'BFIdle m ()
senderTerminate Nat n
Zero =
Message (BlockFetch block (Point block)) 'BFIdle 'BFDone
-> Client
(BlockFetch block (Point block)) ('Pipelined n ()) 'BFDone m ()
-> Client
(BlockFetch block (Point block)) ('Pipelined n ()) 'BFIdle m ()
forall ps (pl :: IsPipelined) (st :: ps) (m :: * -> *) a
(st' :: ps).
(StateTokenI st, StateTokenI st', StateAgency st ~ 'ClientAgency,
Outstanding pl ~ 'Z) =>
Message ps st st' -> Client ps pl st' m a -> Client ps pl st m a
Yield Message (BlockFetch block (Point block)) 'BFIdle 'BFDone
forall {k} {k1} (block :: k) (point :: k1).
Message (BlockFetch block point) 'BFIdle 'BFDone
MsgClientDone (()
-> Client
(BlockFetch block (Point block)) ('Pipelined n ()) 'BFDone m ()
forall ps (pl :: IsPipelined) (st :: ps) (m :: * -> *) a.
(StateTokenI st, StateAgency st ~ 'NobodyAgency,
Outstanding pl ~ 'Z) =>
a -> Client ps pl st m a
Done ())
senderTerminate (Succ Nat n
n) =
Maybe
(Client
(BlockFetch block (Point block))
('Pipelined ('S n) ())
'BFIdle
m
())
-> (()
-> Client
(BlockFetch block (Point block)) ('Pipelined n ()) 'BFIdle m ())
-> Client
(BlockFetch block (Point block))
('Pipelined ('S n) ())
'BFIdle
m
()
forall ps (st :: ps) (n :: N) c (m :: * -> *) a.
(StateTokenI st, ActiveState st) =>
Maybe (Client ps ('Pipelined ('S n) c) st m a)
-> (c -> Client ps ('Pipelined n c) st m a)
-> Client ps ('Pipelined ('S n) c) st m a
Collect Maybe
(Client
(BlockFetch block (Point block))
('Pipelined ('S n) ())
'BFIdle
m
())
forall a. Maybe a
Nothing
(\()
_ -> Nat n
-> Client
(BlockFetch block (Point block)) ('Pipelined n ()) 'BFIdle m ()
forall (n :: N).
Nat n
-> Client
(BlockFetch block (Point block)) ('Pipelined n ()) 'BFIdle m ()
senderTerminate Nat n
n)
receiverBusy :: ChainRange (Point header)
-> AnchoredFragment header
-> PeerFetchInFlightLimits
-> Receiver (BlockFetch block (Point block))
BFBusy BFIdle m ()
receiverBusy :: ChainRange (Point header)
-> AnchoredFragment header
-> PeerFetchInFlightLimits
-> Receiver (BlockFetch block (Point block)) 'BFBusy 'BFIdle m ()
receiverBusy ChainRange (Point header)
range AnchoredFragment header
fragment PeerFetchInFlightLimits
inflightlimits =
(forall (st' :: BlockFetch block (Point block)).
Message (BlockFetch block (Point block)) 'BFBusy st'
-> Receiver (BlockFetch block (Point block)) st' 'BFIdle m ())
-> Receiver (BlockFetch block (Point block)) 'BFBusy 'BFIdle m ()
forall ps (st :: ps) (stdone :: ps) (m :: * -> *) c.
(StateTokenI st, ActiveState st, StateAgency st ~ 'ServerAgency) =>
(forall (st' :: ps).
Message ps st st' -> Receiver ps st' stdone m c)
-> Receiver ps st stdone m c
ReceiverAwait ((forall (st' :: BlockFetch block (Point block)).
Message (BlockFetch block (Point block)) 'BFBusy st'
-> Receiver (BlockFetch block (Point block)) st' 'BFIdle m ())
-> Receiver (BlockFetch block (Point block)) 'BFBusy 'BFIdle m ())
-> (forall (st' :: BlockFetch block (Point block)).
Message (BlockFetch block (Point block)) 'BFBusy st'
-> Receiver (BlockFetch block (Point block)) st' 'BFIdle m ())
-> Receiver (BlockFetch block (Point block)) 'BFBusy 'BFIdle m ()
forall a b. (a -> b) -> a -> b
$ \Message (BlockFetch block (Point block)) 'BFBusy st'
msg ->
case Message (BlockFetch block (Point block)) 'BFBusy st'
msg of
Message (BlockFetch block (Point block)) 'BFBusy st'
R:MessageBlockFetchfromto (*) (*) block (Point block) 'BFBusy st'
MsgNoBlocks ->
m (Receiver (BlockFetch block (Point block)) st' 'BFIdle m ())
-> Receiver (BlockFetch block (Point block)) st' 'BFIdle m ()
forall ps (st :: ps) (stdone :: ps) (m :: * -> *) c.
m (Receiver ps st stdone m c) -> Receiver ps st stdone m c
ReceiverEffect (m (Receiver (BlockFetch block (Point block)) st' 'BFIdle m ())
-> Receiver (BlockFetch block (Point block)) st' 'BFIdle m ())
-> m (Receiver (BlockFetch block (Point block)) st' 'BFIdle m ())
-> Receiver (BlockFetch block (Point block)) st' 'BFIdle m ()
forall a b. (a -> b) -> a -> b
$ do
Tracer m (TraceFetchClientState header)
-> (header -> SizeInBytes)
-> PeerFetchInFlightLimits
-> ChainRange (Point header)
-> [header]
-> FetchClientStateVars m header
-> m ()
forall (m :: * -> *) header.
(MonadSTM m, HasHeader header) =>
Tracer m (TraceFetchClientState header)
-> (header -> SizeInBytes)
-> PeerFetchInFlightLimits
-> ChainRange (Point header)
-> [header]
-> FetchClientStateVars m header
-> m ()
rejectedFetchBatch Tracer m (TraceFetchClientState header)
tracer header -> SizeInBytes
blockFetchSize PeerFetchInFlightLimits
inflightlimits
ChainRange (Point header)
range [header]
headers FetchClientStateVars m header
stateVars
Receiver (BlockFetch block (Point block)) st' 'BFIdle m ()
-> m (Receiver (BlockFetch block (Point block)) st' 'BFIdle m ())
forall a. a -> m a
forall (m :: * -> *) a. Monad m => a -> m a
return (() -> Receiver (BlockFetch block (Point block)) st' st' m ()
forall ps (stdone :: ps) (m :: * -> *) c.
c -> Receiver ps stdone stdone m c
ReceiverDone ())
where
headers :: [header]
headers = AnchoredFragment header -> [header]
forall v a b. AnchoredSeq v a b -> [b]
AF.toOldestFirst AnchoredFragment header
fragment
Message (BlockFetch block (Point block)) 'BFBusy st'
R:MessageBlockFetchfromto (*) (*) block (Point block) 'BFBusy st'
MsgStartBatch ->
m (Receiver (BlockFetch block (Point block)) st' 'BFIdle m ())
-> Receiver (BlockFetch block (Point block)) st' 'BFIdle m ()
forall ps (st :: ps) (stdone :: ps) (m :: * -> *) c.
m (Receiver ps st stdone m c) -> Receiver ps st stdone m c
ReceiverEffect (m (Receiver (BlockFetch block (Point block)) st' 'BFIdle m ())
-> Receiver (BlockFetch block (Point block)) st' 'BFIdle m ())
-> m (Receiver (BlockFetch block (Point block)) st' 'BFIdle m ())
-> Receiver (BlockFetch block (Point block)) st' 'BFIdle m ()
forall a b. (a -> b) -> a -> b
$ do
Tracer m (TraceFetchClientState header)
-> PeerFetchInFlightLimits
-> ChainRange (Point header)
-> FetchClientStateVars m header
-> m ()
forall (m :: * -> *) header.
MonadSTM m =>
Tracer m (TraceFetchClientState header)
-> PeerFetchInFlightLimits
-> ChainRange (Point header)
-> FetchClientStateVars m header
-> m ()
startedFetchBatch Tracer m (TraceFetchClientState header)
tracer PeerFetchInFlightLimits
inflightlimits ChainRange (Point header)
range FetchClientStateVars m header
stateVars
Receiver (BlockFetch block (Point block)) st' 'BFIdle m ()
-> m (Receiver (BlockFetch block (Point block)) st' 'BFIdle m ())
forall a. a -> m a
forall (m :: * -> *) a. Monad m => a -> m a
return (PeerFetchInFlightLimits
-> ChainRange (Point header)
-> [header]
-> Receiver
(BlockFetch block (Point block)) 'BFStreaming 'BFIdle m ()
receiverStreaming PeerFetchInFlightLimits
inflightlimits ChainRange (Point header)
range [header]
headers)
where
headers :: [header]
headers = AnchoredFragment header -> [header]
forall v a b. AnchoredSeq v a b -> [b]
AF.toOldestFirst AnchoredFragment header
fragment
receiverStreaming :: PeerFetchInFlightLimits
-> ChainRange (Point header)
-> [header]
-> Receiver (BlockFetch block (Point block))
BFStreaming BFIdle m ()
receiverStreaming :: PeerFetchInFlightLimits
-> ChainRange (Point header)
-> [header]
-> Receiver
(BlockFetch block (Point block)) 'BFStreaming 'BFIdle m ()
receiverStreaming PeerFetchInFlightLimits
inflightlimits ChainRange (Point header)
range [header]
headers =
(forall (st' :: BlockFetch block (Point block)).
Message (BlockFetch block (Point block)) 'BFStreaming st'
-> Receiver (BlockFetch block (Point block)) st' 'BFIdle m ())
-> Receiver
(BlockFetch block (Point block)) 'BFStreaming 'BFIdle m ()
forall ps (st :: ps) (stdone :: ps) (m :: * -> *) c.
(StateTokenI st, ActiveState st, StateAgency st ~ 'ServerAgency) =>
(forall (st' :: ps).
Message ps st st' -> Receiver ps st' stdone m c)
-> Receiver ps st stdone m c
ReceiverAwait ((forall (st' :: BlockFetch block (Point block)).
Message (BlockFetch block (Point block)) 'BFStreaming st'
-> Receiver (BlockFetch block (Point block)) st' 'BFIdle m ())
-> Receiver
(BlockFetch block (Point block)) 'BFStreaming 'BFIdle m ())
-> (forall (st' :: BlockFetch block (Point block)).
Message (BlockFetch block (Point block)) 'BFStreaming st'
-> Receiver (BlockFetch block (Point block)) st' 'BFIdle m ())
-> Receiver
(BlockFetch block (Point block)) 'BFStreaming 'BFIdle m ()
forall a b. (a -> b) -> a -> b
$ \Message (BlockFetch block (Point block)) 'BFStreaming st'
msg ->
case (Message (BlockFetch block (Point block)) 'BFStreaming st'
msg, [header]
headers) of
(Message (BlockFetch block (Point block)) 'BFStreaming st'
R:MessageBlockFetchfromto
(*) (*) block (Point block) 'BFStreaming st'
MsgBatchDone, []) -> m (Receiver (BlockFetch block (Point block)) st' 'BFIdle m ())
-> Receiver (BlockFetch block (Point block)) st' 'BFIdle m ()
forall ps (st :: ps) (stdone :: ps) (m :: * -> *) c.
m (Receiver ps st stdone m c) -> Receiver ps st stdone m c
ReceiverEffect (m (Receiver (BlockFetch block (Point block)) st' 'BFIdle m ())
-> Receiver (BlockFetch block (Point block)) st' 'BFIdle m ())
-> m (Receiver (BlockFetch block (Point block)) st' 'BFIdle m ())
-> Receiver (BlockFetch block (Point block)) st' 'BFIdle m ()
forall a b. (a -> b) -> a -> b
$ do
Tracer m (TraceFetchClientState header)
-> PeerFetchInFlightLimits
-> ChainRange (Point header)
-> FetchClientStateVars m header
-> m ()
forall (m :: * -> *) header.
MonadSTM m =>
Tracer m (TraceFetchClientState header)
-> PeerFetchInFlightLimits
-> ChainRange (Point header)
-> FetchClientStateVars m header
-> m ()
completeFetchBatch Tracer m (TraceFetchClientState header)
tracer PeerFetchInFlightLimits
inflightlimits ChainRange (Point header)
range FetchClientStateVars m header
stateVars
Receiver (BlockFetch block (Point block)) st' 'BFIdle m ()
-> m (Receiver (BlockFetch block (Point block)) st' 'BFIdle m ())
forall a. a -> m a
forall (m :: * -> *) a. Monad m => a -> m a
return (() -> Receiver (BlockFetch block (Point block)) st' st' m ()
forall ps (stdone :: ps) (m :: * -> *) c.
c -> Receiver ps stdone stdone m c
ReceiverDone ())
(MsgBlock block1
block, header
header:[header]
headers') -> m (Receiver (BlockFetch block (Point block)) st' 'BFIdle m ())
-> Receiver (BlockFetch block (Point block)) st' 'BFIdle m ()
forall ps (st :: ps) (stdone :: ps) (m :: * -> *) c.
m (Receiver ps st stdone m c) -> Receiver ps st stdone m c
ReceiverEffect (m (Receiver (BlockFetch block (Point block)) st' 'BFIdle m ())
-> Receiver (BlockFetch block (Point block)) st' 'BFIdle m ())
-> m (Receiver (BlockFetch block (Point block)) st' 'BFIdle m ())
-> Receiver (BlockFetch block (Point block)) st' 'BFIdle m ()
forall a b. (a -> b) -> a -> b
$ do
now <- m UTCTime
forall (m :: * -> *). MonadTime m => m UTCTime
getCurrentTime
nowMono <- getMonotonicTime
unless (blockPoint header == castPoint (blockPoint block)) $
throwIO BlockFetchProtocolFailureWrongBlock
unless (blockMatchesHeader header block) $
throwIO BlockFetchProtocolFailureInvalidBody
addFetchedBlock (castPoint (blockPoint header)) block
forgeTime <- atomically $ headerForgeUTCTime $ FromConsensus header
let blockDelay = UTCTime -> UTCTime -> NominalDiffTime
diffUTCTime UTCTime
now UTCTime
forgeTime
let hf = header -> HeaderFields header
forall b. HasHeader b => b -> HeaderFields b
getHeaderFields header
header
slotNo = HeaderFields header -> SlotNo
forall k (b :: k). HeaderFields b -> SlotNo
headerFieldSlot HeaderFields header
hf
atomically $ traceWith reportFetched ( blockFetchSize header
, slotNo
, nowMono
)
completeBlockDownload tracer blockFetchSize inflightlimits
header blockDelay stateVars
return (receiverStreaming inflightlimits range headers')
(Message (BlockFetch block (Point block)) 'BFStreaming st'
R:MessageBlockFetchfromto
(*) (*) block (Point block) 'BFStreaming st'
MsgBatchDone, header
_:[header]
_) -> m (Receiver (BlockFetch block (Point block)) st' 'BFIdle m ())
-> Receiver (BlockFetch block (Point block)) st' 'BFIdle m ()
forall ps (st :: ps) (stdone :: ps) (m :: * -> *) c.
m (Receiver ps st stdone m c) -> Receiver ps st stdone m c
ReceiverEffect (m (Receiver (BlockFetch block (Point block)) st' 'BFIdle m ())
-> Receiver (BlockFetch block (Point block)) st' 'BFIdle m ())
-> m (Receiver (BlockFetch block (Point block)) st' 'BFIdle m ())
-> Receiver (BlockFetch block (Point block)) st' 'BFIdle m ()
forall a b. (a -> b) -> a -> b
$
BlockFetchProtocolFailure
-> m (Receiver (BlockFetch block (Point block)) st' 'BFIdle m ())
forall e a. Exception e => e -> m a
forall (m :: * -> *) e a. (MonadThrow m, Exception e) => e -> m a
throwIO BlockFetchProtocolFailure
BlockFetchProtocolFailureTooFewBlocks
(MsgBlock block1
_, []) -> m (Receiver (BlockFetch block (Point block)) st' 'BFIdle m ())
-> Receiver (BlockFetch block (Point block)) st' 'BFIdle m ()
forall ps (st :: ps) (stdone :: ps) (m :: * -> *) c.
m (Receiver ps st stdone m c) -> Receiver ps st stdone m c
ReceiverEffect (m (Receiver (BlockFetch block (Point block)) st' 'BFIdle m ())
-> Receiver (BlockFetch block (Point block)) st' 'BFIdle m ())
-> m (Receiver (BlockFetch block (Point block)) st' 'BFIdle m ())
-> Receiver (BlockFetch block (Point block)) st' 'BFIdle m ()
forall a b. (a -> b) -> a -> b
$
BlockFetchProtocolFailure
-> m (Receiver (BlockFetch block (Point block)) st' 'BFIdle m ())
forall e a. Exception e => e -> m a
forall (m :: * -> *) e a. (MonadThrow m, Exception e) => e -> m a
throwIO BlockFetchProtocolFailure
BlockFetchProtocolFailureTooManyBlocks
castRange :: (HeaderHash a ~ HeaderHash b)
=> ChainRange (Point a) -> ChainRange (Point b)
castRange :: forall a b.
(HeaderHash a ~ HeaderHash b) =>
ChainRange (Point a) -> ChainRange (Point b)
castRange (ChainRange Point a
l Point a
u) = Point b -> Point b -> ChainRange (Point b)
forall point. point -> point -> ChainRange point
ChainRange (Point a -> Point b
forall {k1} {k2} (b :: k1) (b' :: k2).
Coercible (HeaderHash b) (HeaderHash b') =>
Point b -> Point b'
castPoint Point a
l) (Point a -> Point b
forall {k1} {k2} (b :: k1) (b' :: k2).
Coercible (HeaderHash b) (HeaderHash b') =>
Point b -> Point b'
castPoint Point a
u)