{-# LANGUAGE BangPatterns #-}
{-# LANGUAGE CPP #-}
{-# LANGUAGE FlexibleContexts #-}
{-# LANGUAGE LambdaCase #-}
{-# LANGUAGE NamedFieldPuns #-}
{-# LANGUAGE RecordWildCards #-}
#if __GLASGOW_HASKELL__ >= 908
{-# OPTIONS_GHC -Wno-x-partial #-}
#endif
module Ouroboros.Network.BlockFetch.ClientState
( FetchClientContext (..)
, FetchClientPolicy (..)
, FetchClientStateVars (..)
, newFetchClientStateVars
, readFetchClientState
, PeerFetchStatus (..)
, IsIdle (..)
, PeerFetchInFlight (..)
, initialPeerFetchInFlight
, FetchRequest (..)
, addNewFetchRequest
, acknowledgeFetchRequest
, startedFetchBatch
, completeBlockDownload
, completeFetchBatch
, rejectedFetchBatch
, TraceFetchClientState (..)
, TraceLabelPeer (..)
, ChainRange (..)
, FromConsensus (..)
) where
import Data.List as List (foldl')
import Data.Maybe (mapMaybe)
import Data.Semigroup (Last (..))
import Data.Set (Set)
import Data.Set qualified as Set
import Control.Concurrent.Class.MonadSTM.Strict
import Control.Concurrent.Class.MonadSTM.Strict.TMergeVar
import Control.Exception (assert)
import Control.Monad (when)
import Control.Monad.Class.MonadTime.SI
import Control.Tracer (Tracer, traceWith)
import Network.Mux.Trace (TraceLabelPeer (..))
import Ouroboros.Network.AnchoredFragment (AnchoredFragment)
import Ouroboros.Network.AnchoredFragment qualified as AF
import Ouroboros.Network.Block (HasHeader, MaxSlotNo (..), Point, blockPoint)
import Ouroboros.Network.BlockFetch.ConsensusInterface (FromConsensus (..))
import Ouroboros.Network.BlockFetch.DeltaQ (PeerFetchInFlightLimits (..),
PeerGSV, SizeInBytes, calculatePeerFetchInFlightLimits)
import Ouroboros.Network.ControlMessage (ControlMessageSTM,
timeoutWithControlMessage)
import Ouroboros.Network.Point (withOriginToMaybe)
import Ouroboros.Network.Protocol.BlockFetch.Type (ChainRange (..))
data FetchClientContext header block m =
FetchClientContext {
forall header block (m :: * -> *).
FetchClientContext header block m
-> Tracer m (TraceFetchClientState header)
fetchClientCtxTracer :: Tracer m (TraceFetchClientState header),
forall header block (m :: * -> *).
FetchClientContext header block m
-> FetchClientPolicy header block m
fetchClientCtxPolicy :: FetchClientPolicy header block m,
forall header block (m :: * -> *).
FetchClientContext header block m -> FetchClientStateVars m header
fetchClientCtxStateVars :: FetchClientStateVars m header
}
data FetchClientPolicy header block m =
FetchClientPolicy {
forall header block (m :: * -> *).
FetchClientPolicy header block m -> header -> SizeInBytes
blockFetchSize :: header -> SizeInBytes,
:: header -> block -> Bool,
forall header block (m :: * -> *).
FetchClientPolicy header block m -> Point block -> block -> m ()
addFetchedBlock :: Point block -> block -> m (),
forall header block (m :: * -> *).
FetchClientPolicy header block m
-> FromConsensus block -> STM m UTCTime
blockForgeUTCTime :: FromConsensus block -> STM m UTCTime
}
data FetchClientStateVars m header =
FetchClientStateVars {
forall (m :: * -> *) header.
FetchClientStateVars m header
-> StrictTVar m (PeerFetchStatus header)
fetchClientStatusVar :: StrictTVar m (PeerFetchStatus header),
forall (m :: * -> *) header.
FetchClientStateVars m header
-> StrictTVar m (PeerFetchInFlight header)
fetchClientInFlightVar :: StrictTVar m (PeerFetchInFlight header),
forall (m :: * -> *) header.
FetchClientStateVars m header -> TFetchRequestVar m header
fetchClientRequestVar :: TFetchRequestVar m header
}
newFetchClientStateVars :: MonadSTM m => STM m (FetchClientStateVars m header)
newFetchClientStateVars :: forall (m :: * -> *) header.
MonadSTM m =>
STM m (FetchClientStateVars m header)
newFetchClientStateVars = do
fetchClientInFlightVar <- PeerFetchInFlight header
-> STM m (StrictTVar m (PeerFetchInFlight header))
forall (m :: * -> *) a. MonadSTM m => a -> STM m (StrictTVar m a)
newTVar PeerFetchInFlight header
forall header. PeerFetchInFlight header
initialPeerFetchInFlight
fetchClientStatusVar <- newTVar PeerFetchStatusStarting
fetchClientRequestVar <- newTFetchRequestVar
return FetchClientStateVars {..}
readFetchClientState :: MonadSTM m
=> FetchClientStateVars m header
-> STM m (PeerFetchStatus header,
PeerFetchInFlight header,
FetchClientStateVars m header)
readFetchClientState :: forall (m :: * -> *) header.
MonadSTM m =>
FetchClientStateVars m header
-> STM
m
(PeerFetchStatus header, PeerFetchInFlight header,
FetchClientStateVars m header)
readFetchClientState vars :: FetchClientStateVars m header
vars@FetchClientStateVars{StrictTVar m (PeerFetchInFlight header)
StrictTVar m (PeerFetchStatus header)
TFetchRequestVar m header
fetchClientStatusVar :: forall (m :: * -> *) header.
FetchClientStateVars m header
-> StrictTVar m (PeerFetchStatus header)
fetchClientInFlightVar :: forall (m :: * -> *) header.
FetchClientStateVars m header
-> StrictTVar m (PeerFetchInFlight header)
fetchClientRequestVar :: forall (m :: * -> *) header.
FetchClientStateVars m header -> TFetchRequestVar m header
fetchClientStatusVar :: StrictTVar m (PeerFetchStatus header)
fetchClientInFlightVar :: StrictTVar m (PeerFetchInFlight header)
fetchClientRequestVar :: TFetchRequestVar m header
..} =
(,,) (PeerFetchStatus header
-> PeerFetchInFlight header
-> FetchClientStateVars m header
-> (PeerFetchStatus header, PeerFetchInFlight header,
FetchClientStateVars m header))
-> STM m (PeerFetchStatus header)
-> STM
m
(PeerFetchInFlight header
-> FetchClientStateVars m header
-> (PeerFetchStatus header, PeerFetchInFlight header,
FetchClientStateVars m header))
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> StrictTVar m (PeerFetchStatus header)
-> STM m (PeerFetchStatus header)
forall (m :: * -> *) a. MonadSTM m => StrictTVar m a -> STM m a
readTVar StrictTVar m (PeerFetchStatus header)
fetchClientStatusVar
STM
m
(PeerFetchInFlight header
-> FetchClientStateVars m header
-> (PeerFetchStatus header, PeerFetchInFlight header,
FetchClientStateVars m header))
-> STM m (PeerFetchInFlight header)
-> STM
m
(FetchClientStateVars m header
-> (PeerFetchStatus header, PeerFetchInFlight header,
FetchClientStateVars m header))
forall a b. STM m (a -> b) -> STM m a -> STM m b
forall (f :: * -> *) a b. Applicative f => f (a -> b) -> f a -> f b
<*> StrictTVar m (PeerFetchInFlight header)
-> STM m (PeerFetchInFlight header)
forall (m :: * -> *) a. MonadSTM m => StrictTVar m a -> STM m a
readTVar StrictTVar m (PeerFetchInFlight header)
fetchClientInFlightVar
STM
m
(FetchClientStateVars m header
-> (PeerFetchStatus header, PeerFetchInFlight header,
FetchClientStateVars m header))
-> STM m (FetchClientStateVars m header)
-> STM
m
(PeerFetchStatus header, PeerFetchInFlight header,
FetchClientStateVars m header)
forall a b. STM m (a -> b) -> STM m a -> STM m b
forall (f :: * -> *) a b. Applicative f => f (a -> b) -> f a -> f b
<*> FetchClientStateVars m header
-> STM m (FetchClientStateVars m header)
forall a. a -> STM m a
forall (f :: * -> *) a. Applicative f => a -> f a
pure FetchClientStateVars m header
vars
data PeerFetchStatus header =
PeerFetchStatusShutdown
| PeerFetchStatusStarting
| PeerFetchStatusAberrant
| PeerFetchStatusBusy
| PeerFetchStatusReady (Set (Point header)) IsIdle
deriving (PeerFetchStatus header -> PeerFetchStatus header -> Bool
(PeerFetchStatus header -> PeerFetchStatus header -> Bool)
-> (PeerFetchStatus header -> PeerFetchStatus header -> Bool)
-> Eq (PeerFetchStatus header)
forall header.
StandardHash header =>
PeerFetchStatus header -> PeerFetchStatus header -> Bool
forall a. (a -> a -> Bool) -> (a -> a -> Bool) -> Eq a
$c== :: forall header.
StandardHash header =>
PeerFetchStatus header -> PeerFetchStatus header -> Bool
== :: PeerFetchStatus header -> PeerFetchStatus header -> Bool
$c/= :: forall header.
StandardHash header =>
PeerFetchStatus header -> PeerFetchStatus header -> Bool
/= :: PeerFetchStatus header -> PeerFetchStatus header -> Bool
Eq, Int -> PeerFetchStatus header -> ShowS
[PeerFetchStatus header] -> ShowS
PeerFetchStatus header -> String
(Int -> PeerFetchStatus header -> ShowS)
-> (PeerFetchStatus header -> String)
-> ([PeerFetchStatus header] -> ShowS)
-> Show (PeerFetchStatus header)
forall header.
StandardHash header =>
Int -> PeerFetchStatus header -> ShowS
forall header.
StandardHash header =>
[PeerFetchStatus header] -> ShowS
forall header.
StandardHash header =>
PeerFetchStatus header -> String
forall a.
(Int -> a -> ShowS) -> (a -> String) -> ([a] -> ShowS) -> Show a
$cshowsPrec :: forall header.
StandardHash header =>
Int -> PeerFetchStatus header -> ShowS
showsPrec :: Int -> PeerFetchStatus header -> ShowS
$cshow :: forall header.
StandardHash header =>
PeerFetchStatus header -> String
show :: PeerFetchStatus header -> String
$cshowList :: forall header.
StandardHash header =>
[PeerFetchStatus header] -> ShowS
showList :: [PeerFetchStatus header] -> ShowS
Show)
data IsIdle = IsIdle | IsNotIdle
deriving (IsIdle -> IsIdle -> Bool
(IsIdle -> IsIdle -> Bool)
-> (IsIdle -> IsIdle -> Bool) -> Eq IsIdle
forall a. (a -> a -> Bool) -> (a -> a -> Bool) -> Eq a
$c== :: IsIdle -> IsIdle -> Bool
== :: IsIdle -> IsIdle -> Bool
$c/= :: IsIdle -> IsIdle -> Bool
/= :: IsIdle -> IsIdle -> Bool
Eq, Int -> IsIdle -> ShowS
[IsIdle] -> ShowS
IsIdle -> String
(Int -> IsIdle -> ShowS)
-> (IsIdle -> String) -> ([IsIdle] -> ShowS) -> Show IsIdle
forall a.
(Int -> a -> ShowS) -> (a -> String) -> ([a] -> ShowS) -> Show a
$cshowsPrec :: Int -> IsIdle -> ShowS
showsPrec :: Int -> IsIdle -> ShowS
$cshow :: IsIdle -> String
show :: IsIdle -> String
$cshowList :: [IsIdle] -> ShowS
showList :: [IsIdle] -> ShowS
Show)
idleIf :: Bool -> IsIdle
idleIf :: Bool -> IsIdle
idleIf Bool
b = if Bool
b then IsIdle
IsIdle else IsIdle
IsNotIdle
data PeerFetchInFlight header = PeerFetchInFlight {
forall header. PeerFetchInFlight header -> Word
peerFetchReqsInFlight :: !Word,
forall header. PeerFetchInFlight header -> SizeInBytes
peerFetchBytesInFlight :: !SizeInBytes,
forall header. PeerFetchInFlight header -> Set (Point header)
peerFetchBlocksInFlight :: Set (Point header),
forall header. PeerFetchInFlight header -> MaxSlotNo
peerFetchMaxSlotNo :: !MaxSlotNo
}
deriving (PeerFetchInFlight header -> PeerFetchInFlight header -> Bool
(PeerFetchInFlight header -> PeerFetchInFlight header -> Bool)
-> (PeerFetchInFlight header -> PeerFetchInFlight header -> Bool)
-> Eq (PeerFetchInFlight header)
forall header.
StandardHash header =>
PeerFetchInFlight header -> PeerFetchInFlight header -> Bool
forall a. (a -> a -> Bool) -> (a -> a -> Bool) -> Eq a
$c== :: forall header.
StandardHash header =>
PeerFetchInFlight header -> PeerFetchInFlight header -> Bool
== :: PeerFetchInFlight header -> PeerFetchInFlight header -> Bool
$c/= :: forall header.
StandardHash header =>
PeerFetchInFlight header -> PeerFetchInFlight header -> Bool
/= :: PeerFetchInFlight header -> PeerFetchInFlight header -> Bool
Eq, Int -> PeerFetchInFlight header -> ShowS
[PeerFetchInFlight header] -> ShowS
PeerFetchInFlight header -> String
(Int -> PeerFetchInFlight header -> ShowS)
-> (PeerFetchInFlight header -> String)
-> ([PeerFetchInFlight header] -> ShowS)
-> Show (PeerFetchInFlight header)
forall header.
StandardHash header =>
Int -> PeerFetchInFlight header -> ShowS
forall header.
StandardHash header =>
[PeerFetchInFlight header] -> ShowS
forall header.
StandardHash header =>
PeerFetchInFlight header -> String
forall a.
(Int -> a -> ShowS) -> (a -> String) -> ([a] -> ShowS) -> Show a
$cshowsPrec :: forall header.
StandardHash header =>
Int -> PeerFetchInFlight header -> ShowS
showsPrec :: Int -> PeerFetchInFlight header -> ShowS
$cshow :: forall header.
StandardHash header =>
PeerFetchInFlight header -> String
show :: PeerFetchInFlight header -> String
$cshowList :: forall header.
StandardHash header =>
[PeerFetchInFlight header] -> ShowS
showList :: [PeerFetchInFlight header] -> ShowS
Show)
initialPeerFetchInFlight :: PeerFetchInFlight header
initialPeerFetchInFlight :: forall header. PeerFetchInFlight header
initialPeerFetchInFlight =
PeerFetchInFlight {
peerFetchReqsInFlight :: Word
peerFetchReqsInFlight = Word
0,
peerFetchBytesInFlight :: SizeInBytes
peerFetchBytesInFlight = SizeInBytes
0,
peerFetchBlocksInFlight :: Set (Point header)
peerFetchBlocksInFlight = Set (Point header)
forall a. Set a
Set.empty,
peerFetchMaxSlotNo :: MaxSlotNo
peerFetchMaxSlotNo = MaxSlotNo
NoMaxSlotNo
}
addHeadersInFlight :: HasHeader header
=> (header -> SizeInBytes)
-> Maybe (FetchRequest header)
-> FetchRequest header
-> FetchRequest header
-> PeerFetchInFlight header
-> PeerFetchInFlight header
header -> SizeInBytes
blockFetchSize Maybe (FetchRequest header)
oldReq FetchRequest header
addedReq FetchRequest header
mergedReq PeerFetchInFlight header
inflight =
Bool -> PeerFetchInFlight header -> PeerFetchInFlight header
forall a. (?callStack::CallStack) => Bool -> a -> a
assert ([Bool] -> Bool
forall (t :: * -> *). Foldable t => t Bool -> Bool
and [ header -> Point header
forall block. HasHeader block => block -> Point block
blockPoint header
header Point header -> Set (Point header) -> Bool
forall a. Ord a => a -> Set a -> Bool
`Set.notMember` PeerFetchInFlight header -> Set (Point header)
forall header. PeerFetchInFlight header -> Set (Point header)
peerFetchBlocksInFlight PeerFetchInFlight header
inflight
| AnchoredFragment header
fragment <- FetchRequest header -> [AnchoredFragment header]
forall header. FetchRequest header -> [AnchoredFragment header]
fetchRequestFragments FetchRequest header
addedReq
, header
header <- AnchoredFragment header -> [header]
forall v a b. AnchoredSeq v a b -> [b]
AF.toOldestFirst AnchoredFragment header
fragment ]) (PeerFetchInFlight header -> PeerFetchInFlight header)
-> PeerFetchInFlight header -> PeerFetchInFlight header
forall a b. (a -> b) -> a -> b
$
PeerFetchInFlight {
peerFetchReqsInFlight :: Word
peerFetchReqsInFlight = PeerFetchInFlight header -> Word
forall header. PeerFetchInFlight header -> Word
peerFetchReqsInFlight PeerFetchInFlight header
inflight
Word -> Word -> Word
forall a. Num a => a -> a -> a
+ FetchRequest header -> Word
forall header. FetchRequest header -> Word
numFetchReqs FetchRequest header
mergedReq
Word -> Word -> Word
forall a. Num a => a -> a -> a
- Word
-> (FetchRequest header -> Word)
-> Maybe (FetchRequest header)
-> Word
forall b a. b -> (a -> b) -> Maybe a -> b
maybe Word
0 FetchRequest header -> Word
forall header. FetchRequest header -> Word
numFetchReqs Maybe (FetchRequest header)
oldReq,
peerFetchBytesInFlight :: SizeInBytes
peerFetchBytesInFlight = PeerFetchInFlight header -> SizeInBytes
forall header. PeerFetchInFlight header -> SizeInBytes
peerFetchBytesInFlight PeerFetchInFlight header
inflight
SizeInBytes -> SizeInBytes -> SizeInBytes
forall a. Num a => a -> a -> a
+ [SizeInBytes] -> SizeInBytes
forall a. Num a => [a] -> a
forall (t :: * -> *) a. (Foldable t, Num a) => t a -> a
sum [ header -> SizeInBytes
blockFetchSize header
header
| AnchoredFragment header
fragment <- FetchRequest header -> [AnchoredFragment header]
forall header. FetchRequest header -> [AnchoredFragment header]
fetchRequestFragments FetchRequest header
addedReq
, header
header <- AnchoredFragment header -> [header]
forall v a b. AnchoredSeq v a b -> [b]
AF.toOldestFirst AnchoredFragment header
fragment ],
peerFetchBlocksInFlight :: Set (Point header)
peerFetchBlocksInFlight = PeerFetchInFlight header -> Set (Point header)
forall header. PeerFetchInFlight header -> Set (Point header)
peerFetchBlocksInFlight PeerFetchInFlight header
inflight
Set (Point header) -> Set (Point header) -> Set (Point header)
forall a. Ord a => Set a -> Set a -> Set a
`Set.union` [Point header] -> Set (Point header)
forall a. Ord a => [a] -> Set a
Set.fromList
[ header -> Point header
forall block. HasHeader block => block -> Point block
blockPoint header
header
| AnchoredFragment header
fragment <- FetchRequest header -> [AnchoredFragment header]
forall header. FetchRequest header -> [AnchoredFragment header]
fetchRequestFragments FetchRequest header
addedReq
, header
header <- AnchoredFragment header -> [header]
forall v a b. AnchoredSeq v a b -> [b]
AF.toOldestFirst AnchoredFragment header
fragment ],
peerFetchMaxSlotNo :: MaxSlotNo
peerFetchMaxSlotNo = PeerFetchInFlight header -> MaxSlotNo
forall header. PeerFetchInFlight header -> MaxSlotNo
peerFetchMaxSlotNo PeerFetchInFlight header
inflight
MaxSlotNo -> MaxSlotNo -> MaxSlotNo
forall a. Ord a => a -> a -> a
`max` FetchRequest header -> MaxSlotNo
forall header. HasHeader header => FetchRequest header -> MaxSlotNo
fetchRequestMaxSlotNo FetchRequest header
addedReq
}
where
numFetchReqs :: FetchRequest header -> Word
numFetchReqs :: forall header. FetchRequest header -> Word
numFetchReqs = Int -> Word
forall a b. (Integral a, Num b) => a -> b
fromIntegral (Int -> Word)
-> (FetchRequest header -> Int) -> FetchRequest header -> Word
forall b c a. (b -> c) -> (a -> b) -> a -> c
. [AnchoredFragment header] -> Int
forall a. [a] -> Int
forall (t :: * -> *) a. Foldable t => t a -> Int
length ([AnchoredFragment header] -> Int)
-> (FetchRequest header -> [AnchoredFragment header])
-> FetchRequest header
-> Int
forall b c a. (b -> c) -> (a -> b) -> a -> c
. FetchRequest header -> [AnchoredFragment header]
forall header. FetchRequest header -> [AnchoredFragment header]
fetchRequestFragments
deleteHeaderInFlight :: HasHeader header
=> (header -> SizeInBytes)
-> header
-> PeerFetchInFlight header
-> PeerFetchInFlight header
header -> SizeInBytes
blockFetchSize header
header PeerFetchInFlight header
inflight =
Bool -> PeerFetchInFlight header -> PeerFetchInFlight header
forall a. (?callStack::CallStack) => Bool -> a -> a
assert (PeerFetchInFlight header -> SizeInBytes
forall header. PeerFetchInFlight header -> SizeInBytes
peerFetchBytesInFlight PeerFetchInFlight header
inflight SizeInBytes -> SizeInBytes -> Bool
forall a. Ord a => a -> a -> Bool
>= header -> SizeInBytes
blockFetchSize header
header) (PeerFetchInFlight header -> PeerFetchInFlight header)
-> PeerFetchInFlight header -> PeerFetchInFlight header
forall a b. (a -> b) -> a -> b
$
Bool -> PeerFetchInFlight header -> PeerFetchInFlight header
forall a. (?callStack::CallStack) => Bool -> a -> a
assert (header -> Point header
forall block. HasHeader block => block -> Point block
blockPoint header
header Point header -> Set (Point header) -> Bool
forall a. Ord a => a -> Set a -> Bool
`Set.member` PeerFetchInFlight header -> Set (Point header)
forall header. PeerFetchInFlight header -> Set (Point header)
peerFetchBlocksInFlight PeerFetchInFlight header
inflight) (PeerFetchInFlight header -> PeerFetchInFlight header)
-> PeerFetchInFlight header -> PeerFetchInFlight header
forall a b. (a -> b) -> a -> b
$
PeerFetchInFlight header
inflight {
peerFetchBytesInFlight = peerFetchBytesInFlight inflight
- blockFetchSize header,
peerFetchBlocksInFlight = blockPoint header
`Set.delete` peerFetchBlocksInFlight inflight
}
deleteHeadersInFlight :: HasHeader header
=> (header -> SizeInBytes)
-> [header]
-> PeerFetchInFlight header
-> PeerFetchInFlight header
header -> SizeInBytes
blockFetchSize [header]
headers PeerFetchInFlight header
inflight =
(PeerFetchInFlight header -> header -> PeerFetchInFlight header)
-> PeerFetchInFlight header -> [header] -> PeerFetchInFlight header
forall b a. (b -> a -> b) -> b -> [a] -> b
forall (t :: * -> *) b a.
Foldable t =>
(b -> a -> b) -> b -> t a -> b
List.foldl' ((header -> PeerFetchInFlight header -> PeerFetchInFlight header)
-> PeerFetchInFlight header -> header -> PeerFetchInFlight header
forall a b c. (a -> b -> c) -> b -> a -> c
flip ((header -> SizeInBytes)
-> header -> PeerFetchInFlight header -> PeerFetchInFlight header
forall header.
HasHeader header =>
(header -> SizeInBytes)
-> header -> PeerFetchInFlight header -> PeerFetchInFlight header
deleteHeaderInFlight header -> SizeInBytes
blockFetchSize)) PeerFetchInFlight header
inflight [header]
headers
newtype FetchRequest header =
FetchRequest { forall header. FetchRequest header -> [AnchoredFragment header]
fetchRequestFragments :: [AnchoredFragment header] }
deriving Int -> FetchRequest header -> ShowS
[FetchRequest header] -> ShowS
FetchRequest header -> String
(Int -> FetchRequest header -> ShowS)
-> (FetchRequest header -> String)
-> ([FetchRequest header] -> ShowS)
-> Show (FetchRequest header)
forall header.
(StandardHash header, Show header) =>
Int -> FetchRequest header -> ShowS
forall header.
(StandardHash header, Show header) =>
[FetchRequest header] -> ShowS
forall header.
(StandardHash header, Show header) =>
FetchRequest header -> String
forall a.
(Int -> a -> ShowS) -> (a -> String) -> ([a] -> ShowS) -> Show a
$cshowsPrec :: forall header.
(StandardHash header, Show header) =>
Int -> FetchRequest header -> ShowS
showsPrec :: Int -> FetchRequest header -> ShowS
$cshow :: forall header.
(StandardHash header, Show header) =>
FetchRequest header -> String
show :: FetchRequest header -> String
$cshowList :: forall header.
(StandardHash header, Show header) =>
[FetchRequest header] -> ShowS
showList :: [FetchRequest header] -> ShowS
Show
instance HasHeader header => Semigroup (FetchRequest header) where
FetchRequest afs :: [AnchoredFragment header]
afs@(AnchoredFragment header
_:[AnchoredFragment header]
_) <> :: FetchRequest header -> FetchRequest header -> FetchRequest header
<> FetchRequest bfs :: [AnchoredFragment header]
bfs@(AnchoredFragment header
_:[AnchoredFragment header]
_)
| Just AnchoredFragment header
f <- AnchoredFragment header
-> AnchoredFragment header -> Maybe (AnchoredFragment header)
forall block.
HasHeader block =>
AnchoredFragment block
-> AnchoredFragment block -> Maybe (AnchoredFragment block)
AF.join ([AnchoredFragment header] -> AnchoredFragment header
forall a. (?callStack::CallStack) => [a] -> a
last [AnchoredFragment header]
afs) ([AnchoredFragment header] -> AnchoredFragment header
forall a. (?callStack::CallStack) => [a] -> a
head [AnchoredFragment header]
bfs)
= [AnchoredFragment header] -> FetchRequest header
forall header. [AnchoredFragment header] -> FetchRequest header
FetchRequest ([AnchoredFragment header] -> [AnchoredFragment header]
forall a. (?callStack::CallStack) => [a] -> [a]
init [AnchoredFragment header]
afs [AnchoredFragment header]
-> [AnchoredFragment header] -> [AnchoredFragment header]
forall a. [a] -> [a] -> [a]
++ AnchoredFragment header
f AnchoredFragment header
-> [AnchoredFragment header] -> [AnchoredFragment header]
forall a. a -> [a] -> [a]
: [AnchoredFragment header] -> [AnchoredFragment header]
forall a. (?callStack::CallStack) => [a] -> [a]
tail [AnchoredFragment header]
bfs)
FetchRequest [AnchoredFragment header]
afs <> FetchRequest [AnchoredFragment header]
bfs
= [AnchoredFragment header] -> FetchRequest header
forall header. [AnchoredFragment header] -> FetchRequest header
FetchRequest ([AnchoredFragment header]
afs [AnchoredFragment header]
-> [AnchoredFragment header] -> [AnchoredFragment header]
forall a. [a] -> [a] -> [a]
++ [AnchoredFragment header]
bfs)
fetchRequestMaxSlotNo :: HasHeader header => FetchRequest header -> MaxSlotNo
fetchRequestMaxSlotNo :: forall header. HasHeader header => FetchRequest header -> MaxSlotNo
fetchRequestMaxSlotNo (FetchRequest [AnchoredFragment header]
afs) =
(MaxSlotNo -> MaxSlotNo -> MaxSlotNo)
-> MaxSlotNo -> [MaxSlotNo] -> MaxSlotNo
forall b a. (b -> a -> b) -> b -> [a] -> b
forall (t :: * -> *) b a.
Foldable t =>
(b -> a -> b) -> b -> t a -> b
List.foldl' MaxSlotNo -> MaxSlotNo -> MaxSlotNo
forall a. Ord a => a -> a -> a
max MaxSlotNo
NoMaxSlotNo ([MaxSlotNo] -> MaxSlotNo) -> [MaxSlotNo] -> MaxSlotNo
forall a b. (a -> b) -> a -> b
$ (SlotNo -> MaxSlotNo) -> [SlotNo] -> [MaxSlotNo]
forall a b. (a -> b) -> [a] -> [b]
map SlotNo -> MaxSlotNo
MaxSlotNo ([SlotNo] -> [MaxSlotNo]) -> [SlotNo] -> [MaxSlotNo]
forall a b. (a -> b) -> a -> b
$
(AnchoredFragment header -> Maybe SlotNo)
-> [AnchoredFragment header] -> [SlotNo]
forall a b. (a -> Maybe b) -> [a] -> [b]
mapMaybe (WithOrigin SlotNo -> Maybe SlotNo
forall t. WithOrigin t -> Maybe t
withOriginToMaybe (WithOrigin SlotNo -> Maybe SlotNo)
-> (AnchoredFragment header -> WithOrigin SlotNo)
-> AnchoredFragment header
-> Maybe SlotNo
forall b c a. (b -> c) -> (a -> b) -> a -> c
. AnchoredFragment header -> WithOrigin SlotNo
forall block.
HasHeader block =>
AnchoredFragment block -> WithOrigin SlotNo
AF.headSlot) [AnchoredFragment header]
afs
data TraceFetchClientState header =
AddedFetchRequest
(FetchRequest header)
(PeerFetchInFlight header)
PeerFetchInFlightLimits
(PeerFetchStatus header)
| AcknowledgedFetchRequest
(FetchRequest header)
| SendFetchRequest
(AnchoredFragment header)
PeerGSV
| StartedFetchBatch
(ChainRange (Point header))
(PeerFetchInFlight header)
PeerFetchInFlightLimits
(PeerFetchStatus header)
| CompletedBlockFetch
(Point header)
(PeerFetchInFlight header)
PeerFetchInFlightLimits
(PeerFetchStatus header)
NominalDiffTime
SizeInBytes
| CompletedFetchBatch
(ChainRange (Point header))
(PeerFetchInFlight header)
PeerFetchInFlightLimits
(PeerFetchStatus header)
| RejectedFetchBatch
(ChainRange (Point header))
(PeerFetchInFlight header)
PeerFetchInFlightLimits
(PeerFetchStatus header)
| ClientTerminating Int
deriving Int -> TraceFetchClientState header -> ShowS
[TraceFetchClientState header] -> ShowS
TraceFetchClientState header -> String
(Int -> TraceFetchClientState header -> ShowS)
-> (TraceFetchClientState header -> String)
-> ([TraceFetchClientState header] -> ShowS)
-> Show (TraceFetchClientState header)
forall header.
(StandardHash header, Show header) =>
Int -> TraceFetchClientState header -> ShowS
forall header.
(StandardHash header, Show header) =>
[TraceFetchClientState header] -> ShowS
forall header.
(StandardHash header, Show header) =>
TraceFetchClientState header -> String
forall a.
(Int -> a -> ShowS) -> (a -> String) -> ([a] -> ShowS) -> Show a
$cshowsPrec :: forall header.
(StandardHash header, Show header) =>
Int -> TraceFetchClientState header -> ShowS
showsPrec :: Int -> TraceFetchClientState header -> ShowS
$cshow :: forall header.
(StandardHash header, Show header) =>
TraceFetchClientState header -> String
show :: TraceFetchClientState header -> String
$cshowList :: forall header.
(StandardHash header, Show header) =>
[TraceFetchClientState header] -> ShowS
showList :: [TraceFetchClientState header] -> ShowS
Show
addNewFetchRequest :: (MonadSTM m, HasHeader header)
=> Tracer m (TraceFetchClientState header)
-> (header -> SizeInBytes)
-> FetchRequest header
-> PeerGSV
-> FetchClientStateVars m header
-> m (PeerFetchStatus header)
addNewFetchRequest :: forall (m :: * -> *) header.
(MonadSTM m, HasHeader header) =>
Tracer m (TraceFetchClientState header)
-> (header -> SizeInBytes)
-> FetchRequest header
-> PeerGSV
-> FetchClientStateVars m header
-> m (PeerFetchStatus header)
addNewFetchRequest Tracer m (TraceFetchClientState header)
tracer header -> SizeInBytes
blockFetchSize FetchRequest header
addedReq PeerGSV
gsvs
FetchClientStateVars{
TFetchRequestVar m header
fetchClientRequestVar :: forall (m :: * -> *) header.
FetchClientStateVars m header -> TFetchRequestVar m header
fetchClientRequestVar :: TFetchRequestVar m header
fetchClientRequestVar,
StrictTVar m (PeerFetchInFlight header)
fetchClientInFlightVar :: forall (m :: * -> *) header.
FetchClientStateVars m header
-> StrictTVar m (PeerFetchInFlight header)
fetchClientInFlightVar :: StrictTVar m (PeerFetchInFlight header)
fetchClientInFlightVar,
StrictTVar m (PeerFetchStatus header)
fetchClientStatusVar :: forall (m :: * -> *) header.
FetchClientStateVars m header
-> StrictTVar m (PeerFetchStatus header)
fetchClientStatusVar :: StrictTVar m (PeerFetchStatus header)
fetchClientStatusVar
} = do
(inflight', currentStatus') <- STM m (PeerFetchInFlight header, PeerFetchStatus header)
-> m (PeerFetchInFlight header, PeerFetchStatus header)
forall a. (?callStack::CallStack) => STM m a -> m a
forall (m :: * -> *) a.
(MonadSTM m, ?callStack::CallStack) =>
STM m a -> m a
atomically (STM m (PeerFetchInFlight header, PeerFetchStatus header)
-> m (PeerFetchInFlight header, PeerFetchStatus header))
-> STM m (PeerFetchInFlight header, PeerFetchStatus header)
-> m (PeerFetchInFlight header, PeerFetchStatus header)
forall a b. (a -> b) -> a -> b
$ do
oldReq <- TFetchRequestVar m header -> STM m (Maybe (FetchRequest header))
forall (m :: * -> *) header.
MonadSTM m =>
TFetchRequestVar m header -> STM m (Maybe (FetchRequest header))
peekTFetchRequestVar TFetchRequestVar m header
fetchClientRequestVar
mergedReq <- writeTFetchRequestVar fetchClientRequestVar
addedReq gsvs inflightlimits
inflight <- readTVar fetchClientInFlightVar
let !inflight' = (header -> SizeInBytes)
-> Maybe (FetchRequest header)
-> FetchRequest header
-> FetchRequest header
-> PeerFetchInFlight header
-> PeerFetchInFlight header
forall header.
HasHeader header =>
(header -> SizeInBytes)
-> Maybe (FetchRequest header)
-> FetchRequest header
-> FetchRequest header
-> PeerFetchInFlight header
-> PeerFetchInFlight header
addHeadersInFlight header -> SizeInBytes
blockFetchSize
Maybe (FetchRequest header)
oldReq FetchRequest header
addedReq FetchRequest header
mergedReq
PeerFetchInFlight header
inflight
writeTVar fetchClientInFlightVar inflight'
currentStatus' <- updateCurrentStatus
(busyIfOverHighWatermark inflightlimits)
fetchClientStatusVar
inflight'
return (inflight', currentStatus')
traceWith tracer $
AddedFetchRequest
addedReq
inflight' inflightlimits
currentStatus'
return currentStatus'
where
inflightlimits :: PeerFetchInFlightLimits
inflightlimits = PeerGSV -> PeerFetchInFlightLimits
calculatePeerFetchInFlightLimits PeerGSV
gsvs
acknowledgeFetchRequest :: MonadSTM m
=> Tracer m (TraceFetchClientState header)
-> ControlMessageSTM m
-> FetchClientStateVars m header
-> m (Maybe
( FetchRequest header
, PeerGSV
, PeerFetchInFlightLimits ))
acknowledgeFetchRequest :: forall (m :: * -> *) header.
MonadSTM m =>
Tracer m (TraceFetchClientState header)
-> ControlMessageSTM m
-> FetchClientStateVars m header
-> m (Maybe
(FetchRequest header, PeerGSV, PeerFetchInFlightLimits))
acknowledgeFetchRequest Tracer m (TraceFetchClientState header)
tracer ControlMessageSTM m
controlMessageSTM FetchClientStateVars {TFetchRequestVar m header
fetchClientRequestVar :: forall (m :: * -> *) header.
FetchClientStateVars m header -> TFetchRequestVar m header
fetchClientRequestVar :: TFetchRequestVar m header
fetchClientRequestVar} = do
result <-
ControlMessageSTM m
-> STM m (FetchRequest header, PeerGSV, PeerFetchInFlightLimits)
-> m (Maybe
(FetchRequest header, PeerGSV, PeerFetchInFlightLimits))
forall (m :: * -> *) a.
MonadSTM m =>
ControlMessageSTM m -> STM m a -> m (Maybe a)
timeoutWithControlMessage ControlMessageSTM m
controlMessageSTM (TFetchRequestVar m header
-> STM m (FetchRequest header, PeerGSV, PeerFetchInFlightLimits)
forall (m :: * -> *) header.
MonadSTM m =>
TFetchRequestVar m header
-> STM m (FetchRequest header, PeerGSV, PeerFetchInFlightLimits)
takeTFetchRequestVar TFetchRequestVar m header
fetchClientRequestVar)
case result of
Maybe (FetchRequest header, PeerGSV, PeerFetchInFlightLimits)
Nothing -> Maybe (FetchRequest header, PeerGSV, PeerFetchInFlightLimits)
-> m (Maybe
(FetchRequest header, PeerGSV, PeerFetchInFlightLimits))
forall a. a -> m a
forall (m :: * -> *) a. Monad m => a -> m a
return Maybe (FetchRequest header, PeerGSV, PeerFetchInFlightLimits)
result
Just (FetchRequest header
request, PeerGSV
_, PeerFetchInFlightLimits
_) -> do
Tracer m (TraceFetchClientState header)
-> TraceFetchClientState header -> m ()
forall (m :: * -> *) a. Tracer m a -> a -> m ()
traceWith Tracer m (TraceFetchClientState header)
tracer (FetchRequest header -> TraceFetchClientState header
forall header. FetchRequest header -> TraceFetchClientState header
AcknowledgedFetchRequest FetchRequest header
request)
Maybe (FetchRequest header, PeerGSV, PeerFetchInFlightLimits)
-> m (Maybe
(FetchRequest header, PeerGSV, PeerFetchInFlightLimits))
forall a. a -> m a
forall (m :: * -> *) a. Monad m => a -> m a
return Maybe (FetchRequest header, PeerGSV, PeerFetchInFlightLimits)
result
startedFetchBatch :: MonadSTM m
=> Tracer m (TraceFetchClientState header)
-> PeerFetchInFlightLimits
-> ChainRange (Point header)
-> FetchClientStateVars m header
-> m ()
startedFetchBatch :: forall (m :: * -> *) header.
MonadSTM m =>
Tracer m (TraceFetchClientState header)
-> PeerFetchInFlightLimits
-> ChainRange (Point header)
-> FetchClientStateVars m header
-> m ()
startedFetchBatch Tracer m (TraceFetchClientState header)
tracer PeerFetchInFlightLimits
inflightlimits ChainRange (Point header)
range
FetchClientStateVars {
StrictTVar m (PeerFetchInFlight header)
fetchClientInFlightVar :: forall (m :: * -> *) header.
FetchClientStateVars m header
-> StrictTVar m (PeerFetchInFlight header)
fetchClientInFlightVar :: StrictTVar m (PeerFetchInFlight header)
fetchClientInFlightVar,
StrictTVar m (PeerFetchStatus header)
fetchClientStatusVar :: forall (m :: * -> *) header.
FetchClientStateVars m header
-> StrictTVar m (PeerFetchStatus header)
fetchClientStatusVar :: StrictTVar m (PeerFetchStatus header)
fetchClientStatusVar
} = do
(inflight, currentStatus) <-
STM m (PeerFetchInFlight header, PeerFetchStatus header)
-> m (PeerFetchInFlight header, PeerFetchStatus header)
forall a. (?callStack::CallStack) => STM m a -> m a
forall (m :: * -> *) a.
(MonadSTM m, ?callStack::CallStack) =>
STM m a -> m a
atomically (STM m (PeerFetchInFlight header, PeerFetchStatus header)
-> m (PeerFetchInFlight header, PeerFetchStatus header))
-> STM m (PeerFetchInFlight header, PeerFetchStatus header)
-> m (PeerFetchInFlight header, PeerFetchStatus header)
forall a b. (a -> b) -> a -> b
$ (,) (PeerFetchInFlight header
-> PeerFetchStatus header
-> (PeerFetchInFlight header, PeerFetchStatus header))
-> STM m (PeerFetchInFlight header)
-> STM
m
(PeerFetchStatus header
-> (PeerFetchInFlight header, PeerFetchStatus header))
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> StrictTVar m (PeerFetchInFlight header)
-> STM m (PeerFetchInFlight header)
forall (m :: * -> *) a. MonadSTM m => StrictTVar m a -> STM m a
readTVar StrictTVar m (PeerFetchInFlight header)
fetchClientInFlightVar
STM
m
(PeerFetchStatus header
-> (PeerFetchInFlight header, PeerFetchStatus header))
-> STM m (PeerFetchStatus header)
-> STM m (PeerFetchInFlight header, PeerFetchStatus header)
forall a b. STM m (a -> b) -> STM m a -> STM m b
forall (f :: * -> *) a b. Applicative f => f (a -> b) -> f a -> f b
<*> StrictTVar m (PeerFetchStatus header)
-> STM m (PeerFetchStatus header)
forall (m :: * -> *) a. MonadSTM m => StrictTVar m a -> STM m a
readTVar StrictTVar m (PeerFetchStatus header)
fetchClientStatusVar
traceWith tracer $
StartedFetchBatch
range
inflight inflightlimits
currentStatus
completeBlockDownload :: (MonadSTM m, HasHeader header)
=> Tracer m (TraceFetchClientState header)
-> (header -> SizeInBytes)
-> PeerFetchInFlightLimits
-> header
-> NominalDiffTime
-> FetchClientStateVars m header
-> m ()
completeBlockDownload :: forall (m :: * -> *) header.
(MonadSTM m, HasHeader header) =>
Tracer m (TraceFetchClientState header)
-> (header -> SizeInBytes)
-> PeerFetchInFlightLimits
-> header
-> NominalDiffTime
-> FetchClientStateVars m header
-> m ()
completeBlockDownload Tracer m (TraceFetchClientState header)
tracer header -> SizeInBytes
blockFetchSize PeerFetchInFlightLimits
inflightlimits header
header NominalDiffTime
blockDelay
FetchClientStateVars {
StrictTVar m (PeerFetchInFlight header)
fetchClientInFlightVar :: forall (m :: * -> *) header.
FetchClientStateVars m header
-> StrictTVar m (PeerFetchInFlight header)
fetchClientInFlightVar :: StrictTVar m (PeerFetchInFlight header)
fetchClientInFlightVar,
StrictTVar m (PeerFetchStatus header)
fetchClientStatusVar :: forall (m :: * -> *) header.
FetchClientStateVars m header
-> StrictTVar m (PeerFetchStatus header)
fetchClientStatusVar :: StrictTVar m (PeerFetchStatus header)
fetchClientStatusVar
} = do
(inflight', currentStatus') <- STM m (PeerFetchInFlight header, PeerFetchStatus header)
-> m (PeerFetchInFlight header, PeerFetchStatus header)
forall a. (?callStack::CallStack) => STM m a -> m a
forall (m :: * -> *) a.
(MonadSTM m, ?callStack::CallStack) =>
STM m a -> m a
atomically (STM m (PeerFetchInFlight header, PeerFetchStatus header)
-> m (PeerFetchInFlight header, PeerFetchStatus header))
-> STM m (PeerFetchInFlight header, PeerFetchStatus header)
-> m (PeerFetchInFlight header, PeerFetchStatus header)
forall a b. (a -> b) -> a -> b
$ do
inflight <- StrictTVar m (PeerFetchInFlight header)
-> STM m (PeerFetchInFlight header)
forall (m :: * -> *) a. MonadSTM m => StrictTVar m a -> STM m a
readTVar StrictTVar m (PeerFetchInFlight header)
fetchClientInFlightVar
let !inflight' = (header -> SizeInBytes)
-> header -> PeerFetchInFlight header -> PeerFetchInFlight header
forall header.
HasHeader header =>
(header -> SizeInBytes)
-> header -> PeerFetchInFlight header -> PeerFetchInFlight header
deleteHeaderInFlight header -> SizeInBytes
blockFetchSize header
header PeerFetchInFlight header
inflight
writeTVar fetchClientInFlightVar inflight'
currentStatus' <- updateCurrentStatus
(readyIfUnderLowWatermark inflightlimits)
fetchClientStatusVar
inflight'
return (inflight', currentStatus')
traceWith tracer $
CompletedBlockFetch
(blockPoint header)
inflight' inflightlimits
currentStatus'
blockDelay
(blockFetchSize header)
completeFetchBatch :: MonadSTM m
=> Tracer m (TraceFetchClientState header)
-> PeerFetchInFlightLimits
-> ChainRange (Point header)
-> FetchClientStateVars m header
-> m ()
completeFetchBatch :: forall (m :: * -> *) header.
MonadSTM m =>
Tracer m (TraceFetchClientState header)
-> PeerFetchInFlightLimits
-> ChainRange (Point header)
-> FetchClientStateVars m header
-> m ()
completeFetchBatch Tracer m (TraceFetchClientState header)
tracer PeerFetchInFlightLimits
inflightlimits ChainRange (Point header)
range
FetchClientStateVars {
StrictTVar m (PeerFetchInFlight header)
fetchClientInFlightVar :: forall (m :: * -> *) header.
FetchClientStateVars m header
-> StrictTVar m (PeerFetchInFlight header)
fetchClientInFlightVar :: StrictTVar m (PeerFetchInFlight header)
fetchClientInFlightVar,
StrictTVar m (PeerFetchStatus header)
fetchClientStatusVar :: forall (m :: * -> *) header.
FetchClientStateVars m header
-> StrictTVar m (PeerFetchStatus header)
fetchClientStatusVar :: StrictTVar m (PeerFetchStatus header)
fetchClientStatusVar
} = do
(inflight, currentStatus) <- STM m (PeerFetchInFlight header, PeerFetchStatus header)
-> m (PeerFetchInFlight header, PeerFetchStatus header)
forall a. (?callStack::CallStack) => STM m a -> m a
forall (m :: * -> *) a.
(MonadSTM m, ?callStack::CallStack) =>
STM m a -> m a
atomically (STM m (PeerFetchInFlight header, PeerFetchStatus header)
-> m (PeerFetchInFlight header, PeerFetchStatus header))
-> STM m (PeerFetchInFlight header, PeerFetchStatus header)
-> m (PeerFetchInFlight header, PeerFetchStatus header)
forall a b. (a -> b) -> a -> b
$ do
inflight <- StrictTVar m (PeerFetchInFlight header)
-> STM m (PeerFetchInFlight header)
forall (m :: * -> *) a. MonadSTM m => StrictTVar m a -> STM m a
readTVar StrictTVar m (PeerFetchInFlight header)
fetchClientInFlightVar
let !inflight' =
Bool -> PeerFetchInFlight header -> PeerFetchInFlight header
forall a. (?callStack::CallStack) => Bool -> a -> a
assert (if PeerFetchInFlight header -> Word
forall header. PeerFetchInFlight header -> Word
peerFetchReqsInFlight PeerFetchInFlight header
inflight Word -> Word -> Bool
forall a. Eq a => a -> a -> Bool
== Word
1
then PeerFetchInFlight header -> SizeInBytes
forall header. PeerFetchInFlight header -> SizeInBytes
peerFetchBytesInFlight PeerFetchInFlight header
inflight SizeInBytes -> SizeInBytes -> Bool
forall a. Eq a => a -> a -> Bool
== SizeInBytes
0
Bool -> Bool -> Bool
&& Set (Point header) -> Bool
forall a. Set a -> Bool
Set.null (PeerFetchInFlight header -> Set (Point header)
forall header. PeerFetchInFlight header -> Set (Point header)
peerFetchBlocksInFlight PeerFetchInFlight header
inflight)
else Bool
True)
PeerFetchInFlight header
inflight {
peerFetchReqsInFlight = peerFetchReqsInFlight inflight - 1
}
writeTVar fetchClientInFlightVar inflight'
currentStatus' <- readTVar fetchClientStatusVar >>= \case
PeerFetchStatusReady Set (Point header)
bs IsIdle
IsNotIdle
| Set (Point header) -> Bool
forall a. Set a -> Bool
Set.null Set (Point header)
bs
Bool -> Bool -> Bool
&& Word
0 Word -> Word -> Bool
forall a. Eq a => a -> a -> Bool
== PeerFetchInFlight header -> Word
forall header. PeerFetchInFlight header -> Word
peerFetchReqsInFlight PeerFetchInFlight header
inflight'
-> let status :: PeerFetchStatus header
status = Set (Point header) -> IsIdle -> PeerFetchStatus header
forall header.
Set (Point header) -> IsIdle -> PeerFetchStatus header
PeerFetchStatusReady Set (Point header)
forall a. Set a
Set.empty IsIdle
IsIdle
in PeerFetchStatus header
forall header. PeerFetchStatus header
status PeerFetchStatus header
-> STM m () -> STM m (PeerFetchStatus header)
forall a b. a -> STM m b -> STM m a
forall (f :: * -> *) a b. Functor f => a -> f b -> f a
<$ StrictTVar m (PeerFetchStatus header)
-> PeerFetchStatus header -> STM m ()
forall (m :: * -> *) a.
MonadSTM m =>
StrictTVar m a -> a -> STM m ()
writeTVar StrictTVar m (PeerFetchStatus header)
fetchClientStatusVar PeerFetchStatus header
forall header. PeerFetchStatus header
status
PeerFetchStatus header
currentStatus -> PeerFetchStatus header -> STM m (PeerFetchStatus header)
forall a. a -> STM m a
forall (f :: * -> *) a. Applicative f => a -> f a
pure PeerFetchStatus header
currentStatus
return (inflight', currentStatus')
traceWith tracer $
CompletedFetchBatch
range
inflight inflightlimits
currentStatus
rejectedFetchBatch :: (MonadSTM m, HasHeader header)
=> Tracer m (TraceFetchClientState header)
-> (header -> SizeInBytes)
-> PeerFetchInFlightLimits
-> ChainRange (Point header)
-> [header]
-> FetchClientStateVars m header
-> m ()
rejectedFetchBatch :: forall (m :: * -> *) header.
(MonadSTM m, HasHeader header) =>
Tracer m (TraceFetchClientState header)
-> (header -> SizeInBytes)
-> PeerFetchInFlightLimits
-> ChainRange (Point header)
-> [header]
-> FetchClientStateVars m header
-> m ()
rejectedFetchBatch Tracer m (TraceFetchClientState header)
tracer header -> SizeInBytes
blockFetchSize PeerFetchInFlightLimits
inflightlimits ChainRange (Point header)
range [header]
headers
FetchClientStateVars {
StrictTVar m (PeerFetchInFlight header)
fetchClientInFlightVar :: forall (m :: * -> *) header.
FetchClientStateVars m header
-> StrictTVar m (PeerFetchInFlight header)
fetchClientInFlightVar :: StrictTVar m (PeerFetchInFlight header)
fetchClientInFlightVar,
StrictTVar m (PeerFetchStatus header)
fetchClientStatusVar :: forall (m :: * -> *) header.
FetchClientStateVars m header
-> StrictTVar m (PeerFetchStatus header)
fetchClientStatusVar :: StrictTVar m (PeerFetchStatus header)
fetchClientStatusVar
} = do
(inflight', currentStatus') <- STM m (PeerFetchInFlight header, PeerFetchStatus header)
-> m (PeerFetchInFlight header, PeerFetchStatus header)
forall a. (?callStack::CallStack) => STM m a -> m a
forall (m :: * -> *) a.
(MonadSTM m, ?callStack::CallStack) =>
STM m a -> m a
atomically (STM m (PeerFetchInFlight header, PeerFetchStatus header)
-> m (PeerFetchInFlight header, PeerFetchStatus header))
-> STM m (PeerFetchInFlight header, PeerFetchStatus header)
-> m (PeerFetchInFlight header, PeerFetchStatus header)
forall a b. (a -> b) -> a -> b
$ do
inflight <- StrictTVar m (PeerFetchInFlight header)
-> STM m (PeerFetchInFlight header)
forall (m :: * -> *) a. MonadSTM m => StrictTVar m a -> STM m a
readTVar StrictTVar m (PeerFetchInFlight header)
fetchClientInFlightVar
let !inflight' =
((header -> SizeInBytes)
-> [header] -> PeerFetchInFlight header -> PeerFetchInFlight header
forall header.
HasHeader header =>
(header -> SizeInBytes)
-> [header] -> PeerFetchInFlight header -> PeerFetchInFlight header
deleteHeadersInFlight header -> SizeInBytes
blockFetchSize [header]
headers PeerFetchInFlight header
inflight) {
peerFetchReqsInFlight = peerFetchReqsInFlight inflight - 1
}
writeTVar fetchClientInFlightVar inflight'
currentStatus' <- updateCurrentStatus
(readyIfUnderLowWatermark inflightlimits)
fetchClientStatusVar
inflight'
return (inflight', currentStatus')
traceWith tracer $
RejectedFetchBatch
range
inflight' inflightlimits
currentStatus'
updateCurrentStatus :: (MonadSTM m, HasHeader header)
=> (PeerFetchInFlight header -> PeerFetchStatus header)
-> StrictTVar m (PeerFetchStatus header)
-> PeerFetchInFlight header
-> STM m (PeerFetchStatus header)
updateCurrentStatus :: forall (m :: * -> *) header.
(MonadSTM m, HasHeader header) =>
(PeerFetchInFlight header -> PeerFetchStatus header)
-> StrictTVar m (PeerFetchStatus header)
-> PeerFetchInFlight header
-> STM m (PeerFetchStatus header)
updateCurrentStatus PeerFetchInFlight header -> PeerFetchStatus header
decideCurrentStatus StrictTVar m (PeerFetchStatus header)
fetchClientStatusVar PeerFetchInFlight header
inflight = do
let currentStatus' :: PeerFetchStatus header
currentStatus' = PeerFetchInFlight header -> PeerFetchStatus header
decideCurrentStatus PeerFetchInFlight header
inflight
currentStatus <- StrictTVar m (PeerFetchStatus header)
-> STM m (PeerFetchStatus header)
forall (m :: * -> *) a. MonadSTM m => StrictTVar m a -> STM m a
readTVar StrictTVar m (PeerFetchStatus header)
fetchClientStatusVar
when (currentStatus' /= currentStatus) $
writeTVar fetchClientStatusVar currentStatus'
return currentStatus'
busyIfOverHighWatermark :: PeerFetchInFlightLimits
-> PeerFetchInFlight header
-> PeerFetchStatus header
busyIfOverHighWatermark :: forall header.
PeerFetchInFlightLimits
-> PeerFetchInFlight header -> PeerFetchStatus header
busyIfOverHighWatermark PeerFetchInFlightLimits
inflightlimits PeerFetchInFlight header
inflight
| PeerFetchInFlight header -> SizeInBytes
forall header. PeerFetchInFlight header -> SizeInBytes
peerFetchBytesInFlight PeerFetchInFlight header
inflight SizeInBytes -> SizeInBytes -> Bool
forall a. Ord a => a -> a -> Bool
>= PeerFetchInFlightLimits -> SizeInBytes
inFlightBytesHighWatermark PeerFetchInFlightLimits
inflightlimits
= PeerFetchStatus header
forall header. PeerFetchStatus header
PeerFetchStatusBusy
| Bool
otherwise
= Set (Point header) -> IsIdle -> PeerFetchStatus header
forall header.
Set (Point header) -> IsIdle -> PeerFetchStatus header
PeerFetchStatusReady
(PeerFetchInFlight header -> Set (Point header)
forall header. PeerFetchInFlight header -> Set (Point header)
peerFetchBlocksInFlight PeerFetchInFlight header
inflight)
(Bool -> IsIdle
idleIf (Word
0 Word -> Word -> Bool
forall a. Eq a => a -> a -> Bool
== PeerFetchInFlight header -> Word
forall header. PeerFetchInFlight header -> Word
peerFetchReqsInFlight PeerFetchInFlight header
inflight))
readyIfUnderLowWatermark :: PeerFetchInFlightLimits
-> PeerFetchInFlight header
-> PeerFetchStatus header
readyIfUnderLowWatermark :: forall header.
PeerFetchInFlightLimits
-> PeerFetchInFlight header -> PeerFetchStatus header
readyIfUnderLowWatermark PeerFetchInFlightLimits
inflightlimits PeerFetchInFlight header
inflight
| PeerFetchInFlight header -> SizeInBytes
forall header. PeerFetchInFlight header -> SizeInBytes
peerFetchBytesInFlight PeerFetchInFlight header
inflight SizeInBytes -> SizeInBytes -> Bool
forall a. Ord a => a -> a -> Bool
<= PeerFetchInFlightLimits -> SizeInBytes
inFlightBytesLowWatermark PeerFetchInFlightLimits
inflightlimits
= Set (Point header) -> IsIdle -> PeerFetchStatus header
forall header.
Set (Point header) -> IsIdle -> PeerFetchStatus header
PeerFetchStatusReady
(PeerFetchInFlight header -> Set (Point header)
forall header. PeerFetchInFlight header -> Set (Point header)
peerFetchBlocksInFlight PeerFetchInFlight header
inflight)
(Bool -> IsIdle
idleIf (Word
0 Word -> Word -> Bool
forall a. Eq a => a -> a -> Bool
== PeerFetchInFlight header -> Word
forall header. PeerFetchInFlight header -> Word
peerFetchReqsInFlight PeerFetchInFlight header
inflight))
| Bool
otherwise
= PeerFetchStatus header
forall header. PeerFetchStatus header
PeerFetchStatusBusy
type TFetchRequestVar m header =
TMergeVar m (FetchRequest header,
Last PeerGSV,
Last PeerFetchInFlightLimits)
newTFetchRequestVar :: MonadSTM m => STM m (TFetchRequestVar m header)
newTFetchRequestVar :: forall (m :: * -> *) header.
MonadSTM m =>
STM m (TFetchRequestVar m header)
newTFetchRequestVar = STM
m
(TMergeVar
m
(FetchRequest header, Last PeerGSV, Last PeerFetchInFlightLimits))
forall (m :: * -> *) a. MonadSTM m => STM m (TMergeVar m a)
newTMergeVar
writeTFetchRequestVar :: (MonadSTM m, HasHeader header)
=> TFetchRequestVar m header
-> FetchRequest header
-> PeerGSV
-> PeerFetchInFlightLimits
-> STM m (FetchRequest header)
writeTFetchRequestVar :: forall (m :: * -> *) header.
(MonadSTM m, HasHeader header) =>
TFetchRequestVar m header
-> FetchRequest header
-> PeerGSV
-> PeerFetchInFlightLimits
-> STM m (FetchRequest header)
writeTFetchRequestVar TFetchRequestVar m header
v FetchRequest header
r PeerGSV
g PeerFetchInFlightLimits
l = do
(r', _, _) <- TFetchRequestVar m header
-> (FetchRequest header, Last PeerGSV,
Last PeerFetchInFlightLimits)
-> STM
m (FetchRequest header, Last PeerGSV, Last PeerFetchInFlightLimits)
forall (m :: * -> *) a.
(MonadSTM m, Semigroup a) =>
TMergeVar m a -> a -> STM m a
writeTMergeVar TFetchRequestVar m header
v (FetchRequest header
r, PeerGSV -> Last PeerGSV
forall a. a -> Last a
Last PeerGSV
g, PeerFetchInFlightLimits -> Last PeerFetchInFlightLimits
forall a. a -> Last a
Last PeerFetchInFlightLimits
l)
return r'
peekTFetchRequestVar :: MonadSTM m
=> TFetchRequestVar m header
-> STM m (Maybe (FetchRequest header))
peekTFetchRequestVar :: forall (m :: * -> *) header.
MonadSTM m =>
TFetchRequestVar m header -> STM m (Maybe (FetchRequest header))
peekTFetchRequestVar TFetchRequestVar m header
v = ((FetchRequest header, Last PeerGSV, Last PeerFetchInFlightLimits)
-> FetchRequest header)
-> Maybe
(FetchRequest header, Last PeerGSV, Last PeerFetchInFlightLimits)
-> Maybe (FetchRequest header)
forall a b. (a -> b) -> Maybe a -> Maybe b
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
fmap (\(FetchRequest header
x, Last PeerGSV
_, Last PeerFetchInFlightLimits
_) -> FetchRequest header
x) (Maybe
(FetchRequest header, Last PeerGSV, Last PeerFetchInFlightLimits)
-> Maybe (FetchRequest header))
-> STM
m
(Maybe
(FetchRequest header, Last PeerGSV, Last PeerFetchInFlightLimits))
-> STM m (Maybe (FetchRequest header))
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> TFetchRequestVar m header
-> STM
m
(Maybe
(FetchRequest header, Last PeerGSV, Last PeerFetchInFlightLimits))
forall (m :: * -> *) a.
MonadSTM m =>
TMergeVar m a -> STM m (Maybe a)
tryReadTMergeVar TFetchRequestVar m header
v
takeTFetchRequestVar :: MonadSTM m
=> TFetchRequestVar m header
-> STM m (FetchRequest header,
PeerGSV,
PeerFetchInFlightLimits)
takeTFetchRequestVar :: forall (m :: * -> *) header.
MonadSTM m =>
TFetchRequestVar m header
-> STM m (FetchRequest header, PeerGSV, PeerFetchInFlightLimits)
takeTFetchRequestVar TFetchRequestVar m header
v = (\(FetchRequest header
r,Last PeerGSV
g,Last PeerFetchInFlightLimits
l) -> (FetchRequest header
r, Last PeerGSV -> PeerGSV
forall a. Last a -> a
getLast Last PeerGSV
g, Last PeerFetchInFlightLimits -> PeerFetchInFlightLimits
forall a. Last a -> a
getLast Last PeerFetchInFlightLimits
l))
((FetchRequest header, Last PeerGSV, Last PeerFetchInFlightLimits)
-> (FetchRequest header, PeerGSV, PeerFetchInFlightLimits))
-> STM
m (FetchRequest header, Last PeerGSV, Last PeerFetchInFlightLimits)
-> STM m (FetchRequest header, PeerGSV, PeerFetchInFlightLimits)
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> TFetchRequestVar m header
-> STM
m (FetchRequest header, Last PeerGSV, Last PeerFetchInFlightLimits)
forall (m :: * -> *) a. MonadSTM m => TMergeVar m a -> STM m a
takeTMergeVar TFetchRequestVar m header
v