{-# LANGUAGE BangPatterns        #-}
{-# LANGUAGE DataKinds           #-}
{-# LANGUAGE NamedFieldPuns      #-}
{-# LANGUAGE ScopedTypeVariables #-}
{-# LANGUAGE TypeFamilies        #-}
{-# LANGUAGE TypeOperators       #-}

-- hic sunt dracones!
{-# OPTIONS_GHC -Wno-incomplete-uni-patterns #-}

module Ouroboros.Network.BlockFetch.Client
  ( -- * Block fetch protocol client implementation
    blockFetchClient
  , BlockFetchClient
  , FetchClientContext
  , TraceFetchClientState
  , FetchRequest (..)
  , FetchClientStateVars
    -- * Exception types
  , BlockFetchProtocolFailure
  ) where

import Control.Concurrent.Class.MonadSTM.Strict
import Control.Exception (assert)
import Control.Monad (unless)
import Control.Monad.Class.MonadThrow
import Control.Monad.Class.MonadTime.SI

import Control.Tracer (traceWith)

import Ouroboros.Network.Block

import Network.TypedProtocol.Core
import Network.TypedProtocol.Peer.Client
import Ouroboros.Network.ControlMessage (ControlMessageSTM)
import Ouroboros.Network.Protocol.BlockFetch.Type

import Ouroboros.Network.AnchoredFragment (AnchoredFragment)
import Ouroboros.Network.AnchoredFragment qualified as AF
import Ouroboros.Network.BlockFetch.ClientState (FetchClientContext (..),
           FetchClientPolicy (..), FetchClientStateVars (..), FetchRequest (..),
           FromConsensus (..), TraceFetchClientState (..),
           acknowledgeFetchRequest, completeBlockDownload, completeFetchBatch,
           fetchClientCtxStateVars, rejectedFetchBatch, startedFetchBatch)
import Ouroboros.Network.BlockFetch.DeltaQ (PeerFetchInFlightLimits (..),
           PeerGSV (..))
import Ouroboros.Network.PeerSelection.PeerMetric.Type (FetchedMetricsTracer)


data BlockFetchProtocolFailure =
       BlockFetchProtocolFailureTooFewBlocks
     | BlockFetchProtocolFailureTooManyBlocks
     | BlockFetchProtocolFailureWrongBlock
     | BlockFetchProtocolFailureInvalidBody
  deriving (BlockFetchProtocolFailure -> BlockFetchProtocolFailure -> Bool
(BlockFetchProtocolFailure -> BlockFetchProtocolFailure -> Bool)
-> (BlockFetchProtocolFailure -> BlockFetchProtocolFailure -> Bool)
-> Eq BlockFetchProtocolFailure
forall a. (a -> a -> Bool) -> (a -> a -> Bool) -> Eq a
$c== :: BlockFetchProtocolFailure -> BlockFetchProtocolFailure -> Bool
== :: BlockFetchProtocolFailure -> BlockFetchProtocolFailure -> Bool
$c/= :: BlockFetchProtocolFailure -> BlockFetchProtocolFailure -> Bool
/= :: BlockFetchProtocolFailure -> BlockFetchProtocolFailure -> Bool
Eq, Int -> BlockFetchProtocolFailure -> ShowS
[BlockFetchProtocolFailure] -> ShowS
BlockFetchProtocolFailure -> String
(Int -> BlockFetchProtocolFailure -> ShowS)
-> (BlockFetchProtocolFailure -> String)
-> ([BlockFetchProtocolFailure] -> ShowS)
-> Show BlockFetchProtocolFailure
forall a.
(Int -> a -> ShowS) -> (a -> String) -> ([a] -> ShowS) -> Show a
$cshowsPrec :: Int -> BlockFetchProtocolFailure -> ShowS
showsPrec :: Int -> BlockFetchProtocolFailure -> ShowS
$cshow :: BlockFetchProtocolFailure -> String
show :: BlockFetchProtocolFailure -> String
$cshowList :: [BlockFetchProtocolFailure] -> ShowS
showList :: [BlockFetchProtocolFailure] -> ShowS
Show)

instance Exception BlockFetchProtocolFailure


-- | TODO: use a fetch client wrapper type rather than the raw
--         PeerPipelined, and eliminate this alias. It is only here
--         to avoid large types leaking into the consensus layer.
type BlockFetchClient header block m a =
  FetchClientContext header block m ->
  ClientPipelined (BlockFetch block (Point block)) BFIdle m a

-- | The implementation of the client side of block fetch protocol designed to
-- work in conjunction with our fetch logic.
--
blockFetchClient :: forall header block versionNumber m.
                    (MonadSTM m, MonadThrow m, MonadTime m,
                     MonadMonotonicTime m, HasHeader header,
                     HasHeader block, HeaderHash header ~ HeaderHash block)
                 => versionNumber
                 -> ControlMessageSTM m
                 -> FetchedMetricsTracer m
                 -> FetchClientContext header block m
                 -> ClientPipelined (BlockFetch block (Point block)) BFIdle m ()
blockFetchClient :: forall header block versionNumber (m :: * -> *).
(MonadSTM m, MonadThrow m, MonadTime m, MonadMonotonicTime m,
 HasHeader header, HasHeader block,
 HeaderHash header ~ HeaderHash block) =>
versionNumber
-> ControlMessageSTM m
-> FetchedMetricsTracer m
-> FetchClientContext header block m
-> ClientPipelined (BlockFetch block (Point block)) 'BFIdle m ()
blockFetchClient versionNumber
_version ControlMessageSTM m
controlMessageSTM FetchedMetricsTracer m
reportFetched
                 FetchClientContext {
                   fetchClientCtxTracer :: forall header block (m :: * -> *).
FetchClientContext header block m
-> Tracer m (TraceFetchClientState header)
fetchClientCtxTracer    = Tracer m (TraceFetchClientState header)
tracer,
                   fetchClientCtxPolicy :: forall header block (m :: * -> *).
FetchClientContext header block m
-> FetchClientPolicy header block m
fetchClientCtxPolicy    = FetchClientPolicy {
                                               header -> SizeInBytes
blockFetchSize :: header -> SizeInBytes
blockFetchSize :: forall header block (m :: * -> *).
FetchClientPolicy header block m -> header -> SizeInBytes
blockFetchSize,
                                               header -> block -> Bool
blockMatchesHeader :: header -> block -> Bool
blockMatchesHeader :: forall header block (m :: * -> *).
FetchClientPolicy header block m -> header -> block -> Bool
blockMatchesHeader,
                                               Point block -> block -> m ()
addFetchedBlock :: Point block -> block -> m ()
addFetchedBlock :: forall header block (m :: * -> *).
FetchClientPolicy header block m -> Point block -> block -> m ()
addFetchedBlock,
                                               FromConsensus header -> STM m UTCTime
headerForgeUTCTime :: FromConsensus header -> STM m UTCTime
headerForgeUTCTime :: forall header block (m :: * -> *).
FetchClientPolicy header block m
-> FromConsensus header -> STM m UTCTime
headerForgeUTCTime
                                             },
                   fetchClientCtxStateVars :: forall header block (m :: * -> *).
FetchClientContext header block m -> FetchClientStateVars m header
fetchClientCtxStateVars = FetchClientStateVars m header
stateVars
                 } =
    Client
  (BlockFetch block (Point block)) ('Pipelined 'Z ()) 'BFIdle m ()
-> ClientPipelined (BlockFetch block (Point block)) 'BFIdle m ()
forall ps (st :: ps) (m :: * -> *) a c.
Client ps ('Pipelined 'Z c) st m a -> ClientPipelined ps st m a
ClientPipelined (Nat 'Z
-> Client
     (BlockFetch block (Point block)) ('Pipelined 'Z ()) 'BFIdle m ()
forall (n :: N).
Nat n
-> Client
     (BlockFetch block (Point block)) ('Pipelined n ()) 'BFIdle m ()
senderAwait Nat 'Z
forall (n :: N). ('Z ~ n) => Nat n
Zero)
  where
    senderIdle :: forall n.
                  Nat n
               -> Client (BlockFetch block (Point block)) (Pipelined n ())
                         BFIdle m ()

    -- We have no requests to send. Check if we have any pending pipelined
    -- results to collect. If so, go round and collect any more. If not, block
    -- and wait for some new requests.
    senderIdle :: forall (n :: N).
Nat n
-> Client
     (BlockFetch block (Point block)) ('Pipelined n ()) 'BFIdle m ()
senderIdle (Succ Nat n
outstanding) =
      Maybe
  (Client
     (BlockFetch block (Point block))
     ('Pipelined ('S n) ())
     'BFIdle
     m
     ())
-> (()
    -> Client
         (BlockFetch block (Point block)) ('Pipelined n ()) 'BFIdle m ())
-> Client
     (BlockFetch block (Point block))
     ('Pipelined ('S n) ())
     'BFIdle
     m
     ()
forall ps (st :: ps) (n :: N) c (m :: * -> *) a.
(StateTokenI st, ActiveState st) =>
Maybe (Client ps ('Pipelined ('S n) c) st m a)
-> (c -> Client ps ('Pipelined n c) st m a)
-> Client ps ('Pipelined ('S n) c) st m a
Collect (Client
  (BlockFetch block (Point block))
  ('Pipelined ('S n) ())
  'BFIdle
  m
  ()
-> Maybe
     (Client
        (BlockFetch block (Point block))
        ('Pipelined ('S n) ())
        'BFIdle
        m
        ())
forall a. a -> Maybe a
Just (Nat ('S n)
-> Client
     (BlockFetch block (Point block))
     ('Pipelined ('S n) ())
     'BFIdle
     m
     ()
forall (n :: N).
Nat n
-> Client
     (BlockFetch block (Point block)) ('Pipelined n ()) 'BFIdle m ()
senderAwait (Nat n -> Nat ('S n)
forall (m :: N) (n :: N). (m ~ 'S n) => Nat n -> Nat m
Succ Nat n
outstanding)))
              (\()
_ -> Nat n
-> Client
     (BlockFetch block (Point block)) ('Pipelined n ()) 'BFIdle m ()
forall (n :: N).
Nat n
-> Client
     (BlockFetch block (Point block)) ('Pipelined n ()) 'BFIdle m ()
senderIdle Nat n
outstanding)

    -- And similarly if there are no pending pipelined results at all.
    senderIdle Nat n
Zero = Nat n
-> Peer
     (BlockFetch block (Point block))
     'AsClient
     ('Pipelined n ())
     'BFIdle
     m
     ()
forall (n :: N).
Nat n
-> Client
     (BlockFetch block (Point block)) ('Pipelined n ()) 'BFIdle m ()
senderAwait Nat n
forall (n :: N). ('Z ~ n) => Nat n
Zero

    senderAwait :: forall n.
                   Nat n
                -> Client (BlockFetch block (Point block)) (Pipelined n ()) BFIdle m ()
    senderAwait :: forall (n :: N).
Nat n
-> Client
     (BlockFetch block (Point block)) ('Pipelined n ()) 'BFIdle m ()
senderAwait Nat n
outstanding =
      m (Client
     (BlockFetch block (Point block)) ('Pipelined n ()) 'BFIdle m ())
-> Client
     (BlockFetch block (Point block)) ('Pipelined n ()) 'BFIdle m ()
forall ps (pl :: IsPipelined) (st :: ps) (m :: * -> *) a.
m (Client ps pl st m a) -> Client ps pl st m a
Effect (m (Client
      (BlockFetch block (Point block)) ('Pipelined n ()) 'BFIdle m ())
 -> Client
      (BlockFetch block (Point block)) ('Pipelined n ()) 'BFIdle m ())
-> m (Client
        (BlockFetch block (Point block)) ('Pipelined n ()) 'BFIdle m ())
-> Client
     (BlockFetch block (Point block)) ('Pipelined n ()) 'BFIdle m ()
forall a b. (a -> b) -> a -> b
$ do
      -- Atomically grab our next request and update our tracking state.
      -- We have now accepted this request.
      --
      -- It is important to note that we only update our tracking state when
      -- we /accept/ the request, not when the fetch logic /sets/ the request.
      -- The fetching logic can update the request up until the point where
      -- we accept it here. From here on the request is considered to be
      -- in-flight, and the tracking state that the fetch logic uses now
      -- reflects that.
      --
      result <-
          Tracer m (TraceFetchClientState header)
-> ControlMessageSTM m
-> FetchClientStateVars m header
-> m (Maybe
        (FetchRequest header, PeerGSV, PeerFetchInFlightLimits))
forall (m :: * -> *) header.
MonadSTM m =>
Tracer m (TraceFetchClientState header)
-> ControlMessageSTM m
-> FetchClientStateVars m header
-> m (Maybe
        (FetchRequest header, PeerGSV, PeerFetchInFlightLimits))
acknowledgeFetchRequest Tracer m (TraceFetchClientState header)
tracer ControlMessageSTM m
controlMessageSTM FetchClientStateVars m header
stateVars

      case result of
        Maybe (FetchRequest header, PeerGSV, PeerFetchInFlightLimits)
Nothing -> do
          Tracer m (TraceFetchClientState header)
-> TraceFetchClientState header -> m ()
forall (m :: * -> *) a. Tracer m a -> a -> m ()
traceWith Tracer m (TraceFetchClientState header)
tracer (Int -> TraceFetchClientState header
forall header. Int -> TraceFetchClientState header
ClientTerminating (Int -> TraceFetchClientState header)
-> Int -> TraceFetchClientState header
forall a b. (a -> b) -> a -> b
$ Nat n -> Int
forall (n :: N). Nat n -> Int
natToInt Nat n
outstanding)
          Client
  (BlockFetch block (Point block)) ('Pipelined n ()) 'BFIdle m ()
-> m (Client
        (BlockFetch block (Point block)) ('Pipelined n ()) 'BFIdle m ())
forall a. a -> m a
forall (m :: * -> *) a. Monad m => a -> m a
return (Client
   (BlockFetch block (Point block)) ('Pipelined n ()) 'BFIdle m ()
 -> m (Client
         (BlockFetch block (Point block)) ('Pipelined n ()) 'BFIdle m ()))
-> Client
     (BlockFetch block (Point block)) ('Pipelined n ()) 'BFIdle m ()
-> m (Client
        (BlockFetch block (Point block)) ('Pipelined n ()) 'BFIdle m ())
forall a b. (a -> b) -> a -> b
$ Nat n
-> Client
     (BlockFetch block (Point block)) ('Pipelined n ()) 'BFIdle m ()
forall (n :: N).
Nat n
-> Client
     (BlockFetch block (Point block)) ('Pipelined n ()) 'BFIdle m ()
senderTerminate Nat n
outstanding
        Just (FetchRequest header
request, PeerGSV
gsvs, PeerFetchInFlightLimits
inflightlimits) ->
          Client
  (BlockFetch block (Point block)) ('Pipelined n ()) 'BFIdle m ()
-> m (Client
        (BlockFetch block (Point block)) ('Pipelined n ()) 'BFIdle m ())
forall a. a -> m a
forall (m :: * -> *) a. Monad m => a -> m a
return (Client
   (BlockFetch block (Point block)) ('Pipelined n ()) 'BFIdle m ()
 -> m (Client
         (BlockFetch block (Point block)) ('Pipelined n ()) 'BFIdle m ()))
-> Client
     (BlockFetch block (Point block)) ('Pipelined n ()) 'BFIdle m ()
-> m (Client
        (BlockFetch block (Point block)) ('Pipelined n ()) 'BFIdle m ())
forall a b. (a -> b) -> a -> b
$ Nat n
-> PeerGSV
-> PeerFetchInFlightLimits
-> [AnchoredFragment header]
-> Client
     (BlockFetch block (Point block)) ('Pipelined n ()) 'BFIdle m ()
forall (n :: N).
Nat n
-> PeerGSV
-> PeerFetchInFlightLimits
-> [AnchoredFragment header]
-> Client
     (BlockFetch block (Point block)) ('Pipelined n ()) 'BFIdle m ()
senderActive Nat n
outstanding PeerGSV
gsvs PeerFetchInFlightLimits
inflightlimits
                                (FetchRequest header -> [AnchoredFragment header]
forall header. FetchRequest header -> [AnchoredFragment header]
fetchRequestFragments FetchRequest header
request)

    senderActive :: forall n.
                    Nat n
                 -> PeerGSV
                 -> PeerFetchInFlightLimits
                 -> [AnchoredFragment header]
                 -> Client (BlockFetch block (Point block)) (Pipelined n ()) BFIdle m ()

    -- We now do have some requests that we have accepted but have yet to
    -- actually send out. Lets send out the first one.
    senderActive :: forall (n :: N).
Nat n
-> PeerGSV
-> PeerFetchInFlightLimits
-> [AnchoredFragment header]
-> Client
     (BlockFetch block (Point block)) ('Pipelined n ()) 'BFIdle m ()
senderActive Nat n
outstanding PeerGSV
gsvs PeerFetchInFlightLimits
inflightlimits (AnchoredFragment header
fragment:[AnchoredFragment header]
fragments) =
      m (Client
     (BlockFetch block (Point block)) ('Pipelined n ()) 'BFIdle m ())
-> Client
     (BlockFetch block (Point block)) ('Pipelined n ()) 'BFIdle m ()
forall ps (pl :: IsPipelined) (st :: ps) (m :: * -> *) a.
m (Client ps pl st m a) -> Client ps pl st m a
Effect (m (Client
      (BlockFetch block (Point block)) ('Pipelined n ()) 'BFIdle m ())
 -> Client
      (BlockFetch block (Point block)) ('Pipelined n ()) 'BFIdle m ())
-> m (Client
        (BlockFetch block (Point block)) ('Pipelined n ()) 'BFIdle m ())
-> Client
     (BlockFetch block (Point block)) ('Pipelined n ()) 'BFIdle m ()
forall a b. (a -> b) -> a -> b
$ do
        {-
        now <- getMonotonicTime
        --TODO: should we pair this up with the senderAwait earlier?
        inFlight  <- readTVar fetchClientInFlightVar

        let blockTrailingEdges =
              blockArrivalShedule
                gsvs
                inFlight
                (map snd fragment)

        timeout <- newTimeout (head blockTrailingEdges)
        fork $ do
          fired <- awaitTimeout timeout
          when fired $
            atomically (writeTVar _ PeerFetchStatusAberrant)
        -}
        let range :: ChainRange (Point header)
            !range :: ChainRange (Point header)
range = Bool -> ChainRange (Point header) -> ChainRange (Point header)
forall a. (?callStack::CallStack) => Bool -> a -> a
assert (Bool -> Bool
not (AnchoredFragment header -> Bool
forall v a b. AnchoredSeq v a b -> Bool
AF.null AnchoredFragment header
fragment)) (ChainRange (Point header) -> ChainRange (Point header))
-> ChainRange (Point header) -> ChainRange (Point header)
forall a b. (a -> b) -> a -> b
$
                     Point header -> Point header -> ChainRange (Point header)
forall point. point -> point -> ChainRange point
ChainRange (header -> Point header
forall block. HasHeader block => block -> Point block
blockPoint header
lower)
                                (header -> Point header
forall block. HasHeader block => block -> Point block
blockPoint header
upper)
              where
                Right header
lower = AnchoredFragment header -> Either (Anchor header) header
forall v a b. Anchorable v a b => AnchoredSeq v a b -> Either a b
AF.last AnchoredFragment header
fragment
                Right header
upper = AnchoredFragment header -> Either (Anchor header) header
forall v a b. Anchorable v a b => AnchoredSeq v a b -> Either a b
AF.head AnchoredFragment header
fragment

        Tracer m (TraceFetchClientState header)
-> TraceFetchClientState header -> m ()
forall (m :: * -> *) a. Tracer m a -> a -> m ()
traceWith Tracer m (TraceFetchClientState header)
tracer (AnchoredFragment header -> PeerGSV -> TraceFetchClientState header
forall header.
AnchoredFragment header -> PeerGSV -> TraceFetchClientState header
SendFetchRequest AnchoredFragment header
fragment PeerGSV
gsvs)
        Client
  (BlockFetch block (Point block)) ('Pipelined n ()) 'BFIdle m ()
-> m (Client
        (BlockFetch block (Point block)) ('Pipelined n ()) 'BFIdle m ())
forall a. a -> m a
forall (m :: * -> *) a. Monad m => a -> m a
return (Client
   (BlockFetch block (Point block)) ('Pipelined n ()) 'BFIdle m ()
 -> m (Client
         (BlockFetch block (Point block)) ('Pipelined n ()) 'BFIdle m ()))
-> Client
     (BlockFetch block (Point block)) ('Pipelined n ()) 'BFIdle m ()
-> m (Client
        (BlockFetch block (Point block)) ('Pipelined n ()) 'BFIdle m ())
forall a b. (a -> b) -> a -> b
$
          Message (BlockFetch block (Point block)) 'BFIdle 'BFBusy
-> Receiver (BlockFetch block (Point block)) 'BFBusy 'BFIdle m ()
-> Client
     (BlockFetch block (Point block))
     ('Pipelined ('S n) ())
     'BFIdle
     m
     ()
-> Client
     (BlockFetch block (Point block)) ('Pipelined n ()) 'BFIdle m ()
forall ps (st :: ps) (n :: N) c (m :: * -> *) a (st' :: ps)
       (st'' :: ps).
(StateTokenI st, StateTokenI st',
 StateAgency st ~ 'ClientAgency) =>
Message ps st st'
-> Receiver ps st' st'' m c
-> Client ps ('Pipelined ('S n) c) st'' m a
-> Client ps ('Pipelined n c) st m a
YieldPipelined
            (ChainRange (Point block)
-> Message (BlockFetch block (Point block)) 'BFIdle 'BFBusy
forall point block.
ChainRange point
-> Message (BlockFetch block point) 'BFIdle 'BFBusy
MsgRequestRange (ChainRange (Point header) -> ChainRange (Point block)
forall a b.
(HeaderHash a ~ HeaderHash b) =>
ChainRange (Point a) -> ChainRange (Point b)
castRange ChainRange (Point header)
range))
            (ChainRange (Point header)
-> AnchoredFragment header
-> PeerFetchInFlightLimits
-> Receiver (BlockFetch block (Point block)) 'BFBusy 'BFIdle m ()
receiverBusy ChainRange (Point header)
range AnchoredFragment header
fragment PeerFetchInFlightLimits
inflightlimits)
            (Nat ('S n)
-> PeerGSV
-> PeerFetchInFlightLimits
-> [AnchoredFragment header]
-> Client
     (BlockFetch block (Point block))
     ('Pipelined ('S n) ())
     'BFIdle
     m
     ()
forall (n :: N).
Nat n
-> PeerGSV
-> PeerFetchInFlightLimits
-> [AnchoredFragment header]
-> Client
     (BlockFetch block (Point block)) ('Pipelined n ()) 'BFIdle m ()
senderActive (Nat n -> Nat ('S n)
forall (m :: N) (n :: N). (m ~ 'S n) => Nat n -> Nat m
Succ Nat n
outstanding) PeerGSV
gsvs PeerFetchInFlightLimits
inflightlimits [AnchoredFragment header]
fragments)

    -- And when we run out, go back to idle.
    senderActive Nat n
outstanding PeerGSV
_ PeerFetchInFlightLimits
_ [] = Nat n
-> Client
     (BlockFetch block (Point block)) ('Pipelined n ()) 'BFIdle m ()
forall (n :: N).
Nat n
-> Client
     (BlockFetch block (Point block)) ('Pipelined n ()) 'BFIdle m ()
senderIdle Nat n
outstanding


    -- Terminate the sender; 'controlMessageSTM' returned 'Terminate'.
    senderTerminate :: forall n.
                       Nat n
                    -> Client (BlockFetch block (Point block)) (Pipelined n ()) BFIdle m ()
    senderTerminate :: forall (n :: N).
Nat n
-> Client
     (BlockFetch block (Point block)) ('Pipelined n ()) 'BFIdle m ()
senderTerminate Nat n
Zero =
      Message (BlockFetch block (Point block)) 'BFIdle 'BFDone
-> Client
     (BlockFetch block (Point block)) ('Pipelined n ()) 'BFDone m ()
-> Client
     (BlockFetch block (Point block)) ('Pipelined n ()) 'BFIdle m ()
forall ps (pl :: IsPipelined) (st :: ps) (m :: * -> *) a
       (st' :: ps).
(StateTokenI st, StateTokenI st', StateAgency st ~ 'ClientAgency,
 Outstanding pl ~ 'Z) =>
Message ps st st' -> Client ps pl st' m a -> Client ps pl st m a
Yield Message (BlockFetch block (Point block)) 'BFIdle 'BFDone
forall block point.
Message (BlockFetch block point) 'BFIdle 'BFDone
MsgClientDone (()
-> Client
     (BlockFetch block (Point block)) ('Pipelined n ()) 'BFDone m ()
forall ps (pl :: IsPipelined) (st :: ps) (m :: * -> *) a.
(StateTokenI st, StateAgency st ~ 'NobodyAgency,
 Outstanding pl ~ 'Z) =>
a -> Client ps pl st m a
Done ())
    senderTerminate (Succ Nat n
n) =
      Maybe
  (Client
     (BlockFetch block (Point block))
     ('Pipelined ('S n) ())
     'BFIdle
     m
     ())
-> (()
    -> Client
         (BlockFetch block (Point block)) ('Pipelined n ()) 'BFIdle m ())
-> Client
     (BlockFetch block (Point block))
     ('Pipelined ('S n) ())
     'BFIdle
     m
     ()
forall ps (st :: ps) (n :: N) c (m :: * -> *) a.
(StateTokenI st, ActiveState st) =>
Maybe (Client ps ('Pipelined ('S n) c) st m a)
-> (c -> Client ps ('Pipelined n c) st m a)
-> Client ps ('Pipelined ('S n) c) st m a
Collect Maybe
  (Client
     (BlockFetch block (Point block))
     ('Pipelined ('S n) ())
     'BFIdle
     m
     ())
forall a. Maybe a
Nothing
              (\()
_ -> Nat n
-> Client
     (BlockFetch block (Point block)) ('Pipelined n ()) 'BFIdle m ()
forall (n :: N).
Nat n
-> Client
     (BlockFetch block (Point block)) ('Pipelined n ()) 'BFIdle m ()
senderTerminate Nat n
n)


    receiverBusy :: ChainRange (Point header)
                 -> AnchoredFragment header
                 -> PeerFetchInFlightLimits
                 -> Receiver (BlockFetch block (Point block))
                             BFBusy BFIdle m ()
    receiverBusy :: ChainRange (Point header)
-> AnchoredFragment header
-> PeerFetchInFlightLimits
-> Receiver (BlockFetch block (Point block)) 'BFBusy 'BFIdle m ()
receiverBusy ChainRange (Point header)
range AnchoredFragment header
fragment PeerFetchInFlightLimits
inflightlimits =
      (forall (st' :: BlockFetch block (Point block)).
 Message (BlockFetch block (Point block)) 'BFBusy st'
 -> Receiver (BlockFetch block (Point block)) st' 'BFIdle m ())
-> Receiver (BlockFetch block (Point block)) 'BFBusy 'BFIdle m ()
forall ps (st :: ps) (stdone :: ps) (m :: * -> *) c.
(StateTokenI st, ActiveState st, StateAgency st ~ 'ServerAgency) =>
(forall (st' :: ps).
 Message ps st st' -> Receiver ps st' stdone m c)
-> Receiver ps st stdone m c
ReceiverAwait ((forall (st' :: BlockFetch block (Point block)).
  Message (BlockFetch block (Point block)) 'BFBusy st'
  -> Receiver (BlockFetch block (Point block)) st' 'BFIdle m ())
 -> Receiver (BlockFetch block (Point block)) 'BFBusy 'BFIdle m ())
-> (forall (st' :: BlockFetch block (Point block)).
    Message (BlockFetch block (Point block)) 'BFBusy st'
    -> Receiver (BlockFetch block (Point block)) st' 'BFIdle m ())
-> Receiver (BlockFetch block (Point block)) 'BFBusy 'BFIdle m ()
forall a b. (a -> b) -> a -> b
$ \Message (BlockFetch block (Point block)) 'BFBusy st'
msg ->
        case Message (BlockFetch block (Point block)) 'BFBusy st'
msg of
          -- The server is reporting that the range we asked for does not exist.
          -- This can happen (even if we didn't make any mistakes) if their
          -- chain forked in the time between when they told us and when we
          -- asked for this range of blocks. If this happens, it should
          -- certainly be the case that this peer doesn't continue to tell us
          -- that this range of blocks is in their chain.
          --
          -- FIXME: For now we will not do the detailed error checking to check
          -- that the peer is not cheating us. Nor will we track these failure
          -- points to make sure we do not ask for extensions of this again.
          Message (BlockFetch block (Point block)) 'BFBusy st'
R:MessageBlockFetchfromto block (Point block) 'BFBusy st'
MsgNoBlocks   ->
            m (Receiver (BlockFetch block (Point block)) st' 'BFIdle m ())
-> Receiver (BlockFetch block (Point block)) st' 'BFIdle m ()
forall ps (st :: ps) (stdone :: ps) (m :: * -> *) c.
m (Receiver ps st stdone m c) -> Receiver ps st stdone m c
ReceiverEffect (m (Receiver (BlockFetch block (Point block)) st' 'BFIdle m ())
 -> Receiver (BlockFetch block (Point block)) st' 'BFIdle m ())
-> m (Receiver (BlockFetch block (Point block)) st' 'BFIdle m ())
-> Receiver (BlockFetch block (Point block)) st' 'BFIdle m ()
forall a b. (a -> b) -> a -> b
$ do
              -- Update our in-flight stats and our current status
              Tracer m (TraceFetchClientState header)
-> (header -> SizeInBytes)
-> PeerFetchInFlightLimits
-> ChainRange (Point header)
-> [header]
-> FetchClientStateVars m header
-> m ()
forall (m :: * -> *) header.
(MonadSTM m, HasHeader header) =>
Tracer m (TraceFetchClientState header)
-> (header -> SizeInBytes)
-> PeerFetchInFlightLimits
-> ChainRange (Point header)
-> [header]
-> FetchClientStateVars m header
-> m ()
rejectedFetchBatch Tracer m (TraceFetchClientState header)
tracer header -> SizeInBytes
blockFetchSize PeerFetchInFlightLimits
inflightlimits
                                 ChainRange (Point header)
range [header]
headers FetchClientStateVars m header
stateVars
              Receiver (BlockFetch block (Point block)) st' 'BFIdle m ()
-> m (Receiver (BlockFetch block (Point block)) st' 'BFIdle m ())
forall a. a -> m a
forall (m :: * -> *) a. Monad m => a -> m a
return (() -> Receiver (BlockFetch block (Point block)) st' st' m ()
forall ps (stdone :: ps) (m :: * -> *) c.
c -> Receiver ps stdone stdone m c
ReceiverDone ())
            where
              headers :: [header]
headers = AnchoredFragment header -> [header]
forall v a b. AnchoredSeq v a b -> [b]
AF.toOldestFirst AnchoredFragment header
fragment

          Message (BlockFetch block (Point block)) 'BFBusy st'
R:MessageBlockFetchfromto block (Point block) 'BFBusy st'
MsgStartBatch ->
            m (Receiver (BlockFetch block (Point block)) st' 'BFIdle m ())
-> Receiver (BlockFetch block (Point block)) st' 'BFIdle m ()
forall ps (st :: ps) (stdone :: ps) (m :: * -> *) c.
m (Receiver ps st stdone m c) -> Receiver ps st stdone m c
ReceiverEffect (m (Receiver (BlockFetch block (Point block)) st' 'BFIdle m ())
 -> Receiver (BlockFetch block (Point block)) st' 'BFIdle m ())
-> m (Receiver (BlockFetch block (Point block)) st' 'BFIdle m ())
-> Receiver (BlockFetch block (Point block)) st' 'BFIdle m ()
forall a b. (a -> b) -> a -> b
$ do
              Tracer m (TraceFetchClientState header)
-> PeerFetchInFlightLimits
-> ChainRange (Point header)
-> FetchClientStateVars m header
-> m ()
forall (m :: * -> *) header.
MonadSTM m =>
Tracer m (TraceFetchClientState header)
-> PeerFetchInFlightLimits
-> ChainRange (Point header)
-> FetchClientStateVars m header
-> m ()
startedFetchBatch Tracer m (TraceFetchClientState header)
tracer PeerFetchInFlightLimits
inflightlimits ChainRange (Point header)
range FetchClientStateVars m header
stateVars
              Receiver (BlockFetch block (Point block)) st' 'BFIdle m ()
-> m (Receiver (BlockFetch block (Point block)) st' 'BFIdle m ())
forall a. a -> m a
forall (m :: * -> *) a. Monad m => a -> m a
return (PeerFetchInFlightLimits
-> ChainRange (Point header)
-> [header]
-> Receiver
     (BlockFetch block (Point block)) 'BFStreaming 'BFIdle m ()
receiverStreaming PeerFetchInFlightLimits
inflightlimits ChainRange (Point header)
range [header]
headers)
            where
              headers :: [header]
headers = AnchoredFragment header -> [header]
forall v a b. AnchoredSeq v a b -> [b]
AF.toOldestFirst AnchoredFragment header
fragment

    receiverStreaming :: PeerFetchInFlightLimits
                      -> ChainRange (Point header)
                      -> [header]
                      -> Receiver (BlockFetch block (Point block))
                                  BFStreaming BFIdle m ()
    receiverStreaming :: PeerFetchInFlightLimits
-> ChainRange (Point header)
-> [header]
-> Receiver
     (BlockFetch block (Point block)) 'BFStreaming 'BFIdle m ()
receiverStreaming PeerFetchInFlightLimits
inflightlimits ChainRange (Point header)
range [header]
headers =
      (forall (st' :: BlockFetch block (Point block)).
 Message (BlockFetch block (Point block)) 'BFStreaming st'
 -> Receiver (BlockFetch block (Point block)) st' 'BFIdle m ())
-> Receiver
     (BlockFetch block (Point block)) 'BFStreaming 'BFIdle m ()
forall ps (st :: ps) (stdone :: ps) (m :: * -> *) c.
(StateTokenI st, ActiveState st, StateAgency st ~ 'ServerAgency) =>
(forall (st' :: ps).
 Message ps st st' -> Receiver ps st' stdone m c)
-> Receiver ps st stdone m c
ReceiverAwait ((forall (st' :: BlockFetch block (Point block)).
  Message (BlockFetch block (Point block)) 'BFStreaming st'
  -> Receiver (BlockFetch block (Point block)) st' 'BFIdle m ())
 -> Receiver
      (BlockFetch block (Point block)) 'BFStreaming 'BFIdle m ())
-> (forall (st' :: BlockFetch block (Point block)).
    Message (BlockFetch block (Point block)) 'BFStreaming st'
    -> Receiver (BlockFetch block (Point block)) st' 'BFIdle m ())
-> Receiver
     (BlockFetch block (Point block)) 'BFStreaming 'BFIdle m ()
forall a b. (a -> b) -> a -> b
$ \Message (BlockFetch block (Point block)) 'BFStreaming st'
msg ->
        case (Message (BlockFetch block (Point block)) 'BFStreaming st'
msg, [header]
headers) of
          (Message (BlockFetch block (Point block)) 'BFStreaming st'
R:MessageBlockFetchfromto block (Point block) 'BFStreaming st'
MsgBatchDone, []) -> m (Receiver (BlockFetch block (Point block)) st' 'BFIdle m ())
-> Receiver (BlockFetch block (Point block)) st' 'BFIdle m ()
forall ps (st :: ps) (stdone :: ps) (m :: * -> *) c.
m (Receiver ps st stdone m c) -> Receiver ps st stdone m c
ReceiverEffect (m (Receiver (BlockFetch block (Point block)) st' 'BFIdle m ())
 -> Receiver (BlockFetch block (Point block)) st' 'BFIdle m ())
-> m (Receiver (BlockFetch block (Point block)) st' 'BFIdle m ())
-> Receiver (BlockFetch block (Point block)) st' 'BFIdle m ()
forall a b. (a -> b) -> a -> b
$ do
            Tracer m (TraceFetchClientState header)
-> PeerFetchInFlightLimits
-> ChainRange (Point header)
-> FetchClientStateVars m header
-> m ()
forall (m :: * -> *) header.
MonadSTM m =>
Tracer m (TraceFetchClientState header)
-> PeerFetchInFlightLimits
-> ChainRange (Point header)
-> FetchClientStateVars m header
-> m ()
completeFetchBatch Tracer m (TraceFetchClientState header)
tracer PeerFetchInFlightLimits
inflightlimits ChainRange (Point header)
range FetchClientStateVars m header
stateVars
            Receiver (BlockFetch block (Point block)) st' 'BFIdle m ()
-> m (Receiver (BlockFetch block (Point block)) st' 'BFIdle m ())
forall a. a -> m a
forall (m :: * -> *) a. Monad m => a -> m a
return (() -> Receiver (BlockFetch block (Point block)) st' st' m ()
forall ps (stdone :: ps) (m :: * -> *) c.
c -> Receiver ps stdone stdone m c
ReceiverDone ())


          (MsgBlock block
block, header
header:[header]
headers') -> m (Receiver (BlockFetch block (Point block)) st' 'BFIdle m ())
-> Receiver (BlockFetch block (Point block)) st' 'BFIdle m ()
forall ps (st :: ps) (stdone :: ps) (m :: * -> *) c.
m (Receiver ps st stdone m c) -> Receiver ps st stdone m c
ReceiverEffect (m (Receiver (BlockFetch block (Point block)) st' 'BFIdle m ())
 -> Receiver (BlockFetch block (Point block)) st' 'BFIdle m ())
-> m (Receiver (BlockFetch block (Point block)) st' 'BFIdle m ())
-> Receiver (BlockFetch block (Point block)) st' 'BFIdle m ()
forall a b. (a -> b) -> a -> b
$ do
            now <- m UTCTime
forall (m :: * -> *). MonadTime m => m UTCTime
getCurrentTime
            nowMono <- getMonotonicTime
            --TODO: consider how to enforce expected block size limit.
            -- They've lied and are sending us a massive amount of data.
            -- Resource consumption attack.

            {-
            -- Now it's totally possible that the timeout already fired
            -- if not, we can update it, making sure the delay is > 0
            now <- getMonotonicTime
            updateTimeout timeout (diffTime now )
            -}

            unless (blockPoint header == castPoint (blockPoint block)) $
              throwIO BlockFetchProtocolFailureWrongBlock

            -- This is moderately expensive.
            unless (blockMatchesHeader header block) $
              throwIO BlockFetchProtocolFailureInvalidBody

            -- write it to the volatile block store
            --FIXME: this is not atomic wrt the in-flight and status updates
            -- above. This would allow a read where the block is no longer
            -- in-flight but is still not in the fetched block store.
            -- either 1. make it atomic, or 2. do this first, or 3. some safe
            -- interleaving

            -- Add the block to the chain DB, notifying of any new chains.
            addFetchedBlock (castPoint (blockPoint header)) block

            forgeTime <- atomically $ headerForgeUTCTime $ FromConsensus header
            let blockDelay = UTCTime -> UTCTime -> NominalDiffTime
diffUTCTime UTCTime
now UTCTime
forgeTime

            let hf = header -> HeaderFields header
forall b. HasHeader b => b -> HeaderFields b
getHeaderFields header
header
                slotNo = HeaderFields header -> SlotNo
forall k (b :: k). HeaderFields b -> SlotNo
headerFieldSlot HeaderFields header
hf
            atomically $ traceWith reportFetched ( blockFetchSize header
                                                 , slotNo
                                                 , nowMono
                                                 )

            -- Note that we add the block to the chain DB /before/ updating our
            -- current status and in-flight stats. Otherwise blocks will
            -- disappear from our in-flight set without yet appearing in the
            -- fetched block set. The fetch logic would conclude it has to
            -- download the missing block(s) again.

            -- Update our in-flight stats and our current status
            completeBlockDownload tracer blockFetchSize inflightlimits
                                  header blockDelay stateVars

            return (receiverStreaming inflightlimits range headers')

          (Message (BlockFetch block (Point block)) 'BFStreaming st'
R:MessageBlockFetchfromto block (Point block) 'BFStreaming st'
MsgBatchDone, header
_:[header]
_) -> m (Receiver (BlockFetch block (Point block)) st' 'BFIdle m ())
-> Receiver (BlockFetch block (Point block)) st' 'BFIdle m ()
forall ps (st :: ps) (stdone :: ps) (m :: * -> *) c.
m (Receiver ps st stdone m c) -> Receiver ps st stdone m c
ReceiverEffect (m (Receiver (BlockFetch block (Point block)) st' 'BFIdle m ())
 -> Receiver (BlockFetch block (Point block)) st' 'BFIdle m ())
-> m (Receiver (BlockFetch block (Point block)) st' 'BFIdle m ())
-> Receiver (BlockFetch block (Point block)) st' 'BFIdle m ()
forall a b. (a -> b) -> a -> b
$
            BlockFetchProtocolFailure
-> m (Receiver (BlockFetch block (Point block)) st' 'BFIdle m ())
forall e a. Exception e => e -> m a
forall (m :: * -> *) e a. (MonadThrow m, Exception e) => e -> m a
throwIO BlockFetchProtocolFailure
BlockFetchProtocolFailureTooFewBlocks

          (MsgBlock block
_, []) -> m (Receiver (BlockFetch block (Point block)) st' 'BFIdle m ())
-> Receiver (BlockFetch block (Point block)) st' 'BFIdle m ()
forall ps (st :: ps) (stdone :: ps) (m :: * -> *) c.
m (Receiver ps st stdone m c) -> Receiver ps st stdone m c
ReceiverEffect (m (Receiver (BlockFetch block (Point block)) st' 'BFIdle m ())
 -> Receiver (BlockFetch block (Point block)) st' 'BFIdle m ())
-> m (Receiver (BlockFetch block (Point block)) st' 'BFIdle m ())
-> Receiver (BlockFetch block (Point block)) st' 'BFIdle m ()
forall a b. (a -> b) -> a -> b
$
            BlockFetchProtocolFailure
-> m (Receiver (BlockFetch block (Point block)) st' 'BFIdle m ())
forall e a. Exception e => e -> m a
forall (m :: * -> *) e a. (MonadThrow m, Exception e) => e -> m a
throwIO BlockFetchProtocolFailure
BlockFetchProtocolFailureTooManyBlocks

castRange :: (HeaderHash a ~ HeaderHash b)
          => ChainRange (Point a) -> ChainRange (Point b)
castRange :: forall a b.
(HeaderHash a ~ HeaderHash b) =>
ChainRange (Point a) -> ChainRange (Point b)
castRange (ChainRange Point a
l Point a
u) = Point b -> Point b -> ChainRange (Point b)
forall point. point -> point -> ChainRange point
ChainRange (Point a -> Point b
forall {k1} {k2} (b :: k1) (b' :: k2).
Coercible (HeaderHash b) (HeaderHash b') =>
Point b -> Point b'
castPoint Point a
l) (Point a -> Point b
forall {k1} {k2} (b :: k1) (b' :: k2).
Coercible (HeaderHash b) (HeaderHash b') =>
Point b -> Point b'
castPoint Point a
u)