{-# LANGUAGE FlexibleContexts    #-}
{-# LANGUAGE ScopedTypeVariables #-}

module Ouroboros.Network.BlockFetch.ClientRegistry
  ( -- * Registry of block fetch clients
    FetchClientRegistry (..)
  , newFetchClientRegistry
  , bracketFetchClient
  , bracketKeepAliveClient
  , bracketSyncWithFetchClient
  , setFetchClientContext
  , FetchClientPolicy (..)
  , readFetchClientsStatus
  , readFetchClientsStateVars
  , readPeerGSVs
  ) where

import Data.Functor.Contravariant (contramap)
import Data.Map (Map)
import Data.Map qualified as Map
import Data.Set (Set)
import Data.Set qualified as Set

import Control.Concurrent.Class.MonadSTM.Strict
import Control.Exception (assert)
import Control.Monad (unless)
import Control.Monad.Class.MonadAsync
import Control.Monad.Class.MonadFork (MonadFork (throwTo),
           MonadThread (ThreadId, myThreadId))
import Control.Monad.Class.MonadThrow
import Control.Monad.Class.MonadTimer.SI
import Control.Tracer (Tracer)

import Ouroboros.Network.BlockFetch.ClientState
import Ouroboros.Network.DeltaQ
import Ouroboros.Network.Diffusion.Policies (deactivateTimeout)



-- | A registry for the threads that are executing the client side of the
-- 'BlockFetch' protocol to communicate with our peers.
--
-- The registry contains the shared variables we use to communicate with these
-- threads, both to track their status and to provide instructions.
--
-- The threads add\/remove themselves to\/from this registry when they start up
-- and shut down.
--
data FetchClientRegistry peer header block m =
     FetchClientRegistry {
       forall peer header block (m :: * -> *).
FetchClientRegistry peer header block m
-> StrictTMVar
     m
     (Tracer m (TraceLabelPeer peer (TraceFetchClientState header)),
      WhetherReceivingTentativeBlocks
      -> STM m (FetchClientPolicy header block m))
fcrCtxVar
         :: StrictTMVar
              m ( Tracer m (TraceLabelPeer peer (TraceFetchClientState header))
                , WhetherReceivingTentativeBlocks
                    -> STM m (FetchClientPolicy header block m)
                ),
       forall peer header block (m :: * -> *).
FetchClientRegistry peer header block m
-> StrictTVar m (Map peer (FetchClientStateVars m header))
fcrFetchRegistry
         :: StrictTVar  m (Map peer (FetchClientStateVars m header)),
       forall peer header block (m :: * -> *).
FetchClientRegistry peer header block m
-> StrictTVar
     m (Map peer (ThreadId m, StrictTMVar m (), StrictTMVar m ()))
fcrSyncRegistry
         :: StrictTVar  m (Map peer (ThreadId m, StrictTMVar m (), StrictTMVar m ())),
       forall peer header block (m :: * -> *).
FetchClientRegistry peer header block m
-> StrictTVar m (Map peer PeerGSV)
fcrDqRegistry
         :: StrictTVar  m (Map peer PeerGSV),
       forall peer header block (m :: * -> *).
FetchClientRegistry peer header block m
-> StrictTVar m (Map peer (ThreadId m, StrictTMVar m ()))
fcrKeepRegistry
         :: StrictTVar  m (Map peer (ThreadId m, StrictTMVar m ())),
       forall peer header block (m :: * -> *).
FetchClientRegistry peer header block m -> StrictTVar m (Set peer)
fcrDying
         :: StrictTVar m (Set peer)
                         }

newFetchClientRegistry :: MonadSTM m
                       => m (FetchClientRegistry peer header block m)
newFetchClientRegistry :: forall (m :: * -> *) peer header block.
MonadSTM m =>
m (FetchClientRegistry peer header block m)
newFetchClientRegistry = StrictTMVar
  m
  (Tracer m (TraceLabelPeer peer (TraceFetchClientState header)),
   WhetherReceivingTentativeBlocks
   -> STM m (FetchClientPolicy header block m))
-> StrictTVar m (Map peer (FetchClientStateVars m header))
-> StrictTVar
     m (Map peer (ThreadId m, StrictTMVar m (), StrictTMVar m ()))
-> StrictTVar m (Map peer PeerGSV)
-> StrictTVar m (Map peer (ThreadId m, StrictTMVar m ()))
-> StrictTVar m (Set peer)
-> FetchClientRegistry peer header block m
forall peer header block (m :: * -> *).
StrictTMVar
  m
  (Tracer m (TraceLabelPeer peer (TraceFetchClientState header)),
   WhetherReceivingTentativeBlocks
   -> STM m (FetchClientPolicy header block m))
-> StrictTVar m (Map peer (FetchClientStateVars m header))
-> StrictTVar
     m (Map peer (ThreadId m, StrictTMVar m (), StrictTMVar m ()))
-> StrictTVar m (Map peer PeerGSV)
-> StrictTVar m (Map peer (ThreadId m, StrictTMVar m ()))
-> StrictTVar m (Set peer)
-> FetchClientRegistry peer header block m
FetchClientRegistry (StrictTMVar
   m
   (Tracer m (TraceLabelPeer peer (TraceFetchClientState header)),
    WhetherReceivingTentativeBlocks
    -> STM m (FetchClientPolicy header block m))
 -> StrictTVar m (Map peer (FetchClientStateVars m header))
 -> StrictTVar
      m (Map peer (ThreadId m, StrictTMVar m (), StrictTMVar m ()))
 -> StrictTVar m (Map peer PeerGSV)
 -> StrictTVar m (Map peer (ThreadId m, StrictTMVar m ()))
 -> StrictTVar m (Set peer)
 -> FetchClientRegistry peer header block m)
-> m (StrictTMVar
        m
        (Tracer m (TraceLabelPeer peer (TraceFetchClientState header)),
         WhetherReceivingTentativeBlocks
         -> STM m (FetchClientPolicy header block m)))
-> m (StrictTVar m (Map peer (FetchClientStateVars m header))
      -> StrictTVar
           m (Map peer (ThreadId m, StrictTMVar m (), StrictTMVar m ()))
      -> StrictTVar m (Map peer PeerGSV)
      -> StrictTVar m (Map peer (ThreadId m, StrictTMVar m ()))
      -> StrictTVar m (Set peer)
      -> FetchClientRegistry peer header block m)
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> m (StrictTMVar
     m
     (Tracer m (TraceLabelPeer peer (TraceFetchClientState header)),
      WhetherReceivingTentativeBlocks
      -> STM m (FetchClientPolicy header block m)))
forall (m :: * -> *) a. MonadSTM m => m (StrictTMVar m a)
newEmptyTMVarIO
                                             m (StrictTVar m (Map peer (FetchClientStateVars m header))
   -> StrictTVar
        m (Map peer (ThreadId m, StrictTMVar m (), StrictTMVar m ()))
   -> StrictTVar m (Map peer PeerGSV)
   -> StrictTVar m (Map peer (ThreadId m, StrictTMVar m ()))
   -> StrictTVar m (Set peer)
   -> FetchClientRegistry peer header block m)
-> m (StrictTVar m (Map peer (FetchClientStateVars m header)))
-> m (StrictTVar
        m (Map peer (ThreadId m, StrictTMVar m (), StrictTMVar m ()))
      -> StrictTVar m (Map peer PeerGSV)
      -> StrictTVar m (Map peer (ThreadId m, StrictTMVar m ()))
      -> StrictTVar m (Set peer)
      -> FetchClientRegistry peer header block m)
forall a b. m (a -> b) -> m a -> m b
forall (f :: * -> *) a b. Applicative f => f (a -> b) -> f a -> f b
<*> Map peer (FetchClientStateVars m header)
-> m (StrictTVar m (Map peer (FetchClientStateVars m header)))
forall (m :: * -> *) a. MonadSTM m => a -> m (StrictTVar m a)
newTVarIO Map peer (FetchClientStateVars m header)
forall k a. Map k a
Map.empty
                                             m (StrictTVar
     m (Map peer (ThreadId m, StrictTMVar m (), StrictTMVar m ()))
   -> StrictTVar m (Map peer PeerGSV)
   -> StrictTVar m (Map peer (ThreadId m, StrictTMVar m ()))
   -> StrictTVar m (Set peer)
   -> FetchClientRegistry peer header block m)
-> m (StrictTVar
        m (Map peer (ThreadId m, StrictTMVar m (), StrictTMVar m ())))
-> m (StrictTVar m (Map peer PeerGSV)
      -> StrictTVar m (Map peer (ThreadId m, StrictTMVar m ()))
      -> StrictTVar m (Set peer)
      -> FetchClientRegistry peer header block m)
forall a b. m (a -> b) -> m a -> m b
forall (f :: * -> *) a b. Applicative f => f (a -> b) -> f a -> f b
<*> Map peer (ThreadId m, StrictTMVar m (), StrictTMVar m ())
-> m (StrictTVar
        m (Map peer (ThreadId m, StrictTMVar m (), StrictTMVar m ())))
forall (m :: * -> *) a. MonadSTM m => a -> m (StrictTVar m a)
newTVarIO Map peer (ThreadId m, StrictTMVar m (), StrictTMVar m ())
forall k a. Map k a
Map.empty
                                             m (StrictTVar m (Map peer PeerGSV)
   -> StrictTVar m (Map peer (ThreadId m, StrictTMVar m ()))
   -> StrictTVar m (Set peer)
   -> FetchClientRegistry peer header block m)
-> m (StrictTVar m (Map peer PeerGSV))
-> m (StrictTVar m (Map peer (ThreadId m, StrictTMVar m ()))
      -> StrictTVar m (Set peer)
      -> FetchClientRegistry peer header block m)
forall a b. m (a -> b) -> m a -> m b
forall (f :: * -> *) a b. Applicative f => f (a -> b) -> f a -> f b
<*> Map peer PeerGSV -> m (StrictTVar m (Map peer PeerGSV))
forall (m :: * -> *) a. MonadSTM m => a -> m (StrictTVar m a)
newTVarIO Map peer PeerGSV
forall k a. Map k a
Map.empty
                                             m (StrictTVar m (Map peer (ThreadId m, StrictTMVar m ()))
   -> StrictTVar m (Set peer)
   -> FetchClientRegistry peer header block m)
-> m (StrictTVar m (Map peer (ThreadId m, StrictTMVar m ())))
-> m (StrictTVar m (Set peer)
      -> FetchClientRegistry peer header block m)
forall a b. m (a -> b) -> m a -> m b
forall (f :: * -> *) a b. Applicative f => f (a -> b) -> f a -> f b
<*> Map peer (ThreadId m, StrictTMVar m ())
-> m (StrictTVar m (Map peer (ThreadId m, StrictTMVar m ())))
forall (m :: * -> *) a. MonadSTM m => a -> m (StrictTVar m a)
newTVarIO Map peer (ThreadId m, StrictTMVar m ())
forall k a. Map k a
Map.empty
                                             m (StrictTVar m (Set peer)
   -> FetchClientRegistry peer header block m)
-> m (StrictTVar m (Set peer))
-> m (FetchClientRegistry peer header block m)
forall a b. m (a -> b) -> m a -> m b
forall (f :: * -> *) a b. Applicative f => f (a -> b) -> f a -> f b
<*> Set peer -> m (StrictTVar m (Set peer))
forall (m :: * -> *) a. MonadSTM m => a -> m (StrictTVar m a)
newTVarIO Set peer
forall a. Set a
Set.empty

-- | This is needed to start a block fetch client. It provides the required
-- 'FetchClientContext'. It registers and unregisters the fetch client on
-- start and end.
--
-- It also manages synchronisation with the corresponding chain sync client.
--
bracketFetchClient :: forall m a peer header block version.
                      (MonadFork m, MonadMask m, MonadTimer m, Ord peer)
                   => FetchClientRegistry peer header block m
                   -> version
                   -> (version -> WhetherReceivingTentativeBlocks)
                   -- ^ is pipelining enabled function
                   -> peer
                   -> (FetchClientContext header block m -> m a)
                   -> m a
bracketFetchClient :: forall (m :: * -> *) a peer header block version.
(MonadFork m, MonadMask m, MonadTimer m, Ord peer) =>
FetchClientRegistry peer header block m
-> version
-> (version -> WhetherReceivingTentativeBlocks)
-> peer
-> (FetchClientContext header block m -> m a)
-> m a
bracketFetchClient (FetchClientRegistry StrictTMVar
  m
  (Tracer m (TraceLabelPeer peer (TraceFetchClientState header)),
   WhetherReceivingTentativeBlocks
   -> STM m (FetchClientPolicy header block m))
ctxVar
                      StrictTVar m (Map peer (FetchClientStateVars m header))
fetchRegistry StrictTVar
  m (Map peer (ThreadId m, StrictTMVar m (), StrictTMVar m ()))
syncRegistry StrictTVar m (Map peer PeerGSV)
dqRegistry StrictTVar m (Map peer (ThreadId m, StrictTMVar m ()))
keepRegistry StrictTVar m (Set peer)
dyingRegistry)
                   version
version version -> WhetherReceivingTentativeBlocks
isPipeliningEnabled peer
peer FetchClientContext header block m -> m a
action = do
    ksVar <- m (StrictTMVar m ())
forall (m :: * -> *) a. MonadSTM m => m (StrictTMVar m a)
newEmptyTMVarIO
    bracket (register ksVar) (uncurry (unregister ksVar)) (action . fst)
  where
    register :: StrictTMVar m ()
             -> m ( FetchClientContext header block m
                  , (ThreadId m, StrictTMVar m ()) )
    register :: StrictTMVar m ()
-> m (FetchClientContext header block m,
      (ThreadId m, StrictTMVar m ()))
register StrictTMVar m ()
ksVar = do
      tid <- m (ThreadId m)
forall (m :: * -> *). MonadThread m => m (ThreadId m)
myThreadId
      ctx <- atomically $ do
        -- wait for any potential older blockfetch to finish cleanup
        fr <- readTVar fetchRegistry
        check (peer `Map.notMember` fr)

        -- don't start if keepalive is attempting to die
        dr <- readTVar dyingRegistry
        check (peer `Set.notMember` dr)

        -- blocks until setFetchClientContext is called
        (tracer, mkPolicy) <- readTMVar ctxVar

        -- wait for and register with keepAlive
        dqPeers <- readTVar dqRegistry
        check (peer `Map.member` dqPeers)
        modifyTVar keepRegistry $ \Map peer (ThreadId m, StrictTMVar m ())
m ->
          Bool
-> Map peer (ThreadId m, StrictTMVar m ())
-> Map peer (ThreadId m, StrictTMVar m ())
forall a. HasCallStack => Bool -> a -> a
assert (peer
peer peer -> Map peer (ThreadId m, StrictTMVar m ()) -> Bool
forall k a. Ord k => k -> Map k a -> Bool
`Map.notMember` Map peer (ThreadId m, StrictTMVar m ())
m) (Map peer (ThreadId m, StrictTMVar m ())
 -> Map peer (ThreadId m, StrictTMVar m ()))
-> Map peer (ThreadId m, StrictTMVar m ())
-> Map peer (ThreadId m, StrictTMVar m ())
forall a b. (a -> b) -> a -> b
$
          peer
-> (ThreadId m, StrictTMVar m ())
-> Map peer (ThreadId m, StrictTMVar m ())
-> Map peer (ThreadId m, StrictTMVar m ())
forall k a. Ord k => k -> a -> Map k a -> Map k a
Map.insert peer
peer (ThreadId m
tid, StrictTMVar m ()
ksVar) Map peer (ThreadId m, StrictTMVar m ())
m

        -- allocate the policy specific for this peer's negotiated version
        policy <- do
          let pipeliningEnabled = version -> WhetherReceivingTentativeBlocks
isPipeliningEnabled version
version
          mkPolicy pipeliningEnabled

        stateVars <- newFetchClientStateVars
        modifyTVar fetchRegistry $ \Map peer (FetchClientStateVars m header)
m ->
          Bool
-> Map peer (FetchClientStateVars m header)
-> Map peer (FetchClientStateVars m header)
forall a. HasCallStack => Bool -> a -> a
assert (peer
peer peer -> Map peer (FetchClientStateVars m header) -> Bool
forall k a. Ord k => k -> Map k a -> Bool
`Map.notMember` Map peer (FetchClientStateVars m header)
m) (Map peer (FetchClientStateVars m header)
 -> Map peer (FetchClientStateVars m header))
-> Map peer (FetchClientStateVars m header)
-> Map peer (FetchClientStateVars m header)
forall a b. (a -> b) -> a -> b
$
          peer
-> FetchClientStateVars m header
-> Map peer (FetchClientStateVars m header)
-> Map peer (FetchClientStateVars m header)
forall k a. Ord k => k -> a -> Map k a -> Map k a
Map.insert peer
peer FetchClientStateVars m header
stateVars Map peer (FetchClientStateVars m header)
m
        return FetchClientContext {
          fetchClientCtxTracer    = contramap (TraceLabelPeer peer) tracer,
          fetchClientCtxPolicy    = policy,
          fetchClientCtxStateVars = stateVars
          }

      -- Now wait for the sync client to start up.
      onException
        (atomically $ do
            syncclients <- readTVar syncRegistry
            case Map.lookup peer syncclients of
                 Maybe (ThreadId m, StrictTMVar m (), StrictTMVar m ())
Nothing -> STM
  m
  (FetchClientContext header block m, (ThreadId m, StrictTMVar m ()))
forall a. STM m a
forall (m :: * -> *) a. MonadSTM m => STM m a
retry
                 Just (ThreadId m
cTid, StrictTMVar m ()
doneVar, StrictTMVar m ()
startVar) -> do
                   StrictTMVar m () -> () -> STM m ()
forall (m :: * -> *) a.
MonadSTM m =>
StrictTMVar m a -> a -> STM m ()
putTMVar StrictTMVar m ()
startVar ()
                   StrictTVar m (PeerFetchStatus header)
-> PeerFetchStatus header -> STM m ()
forall (m :: * -> *) a.
MonadSTM m =>
StrictTVar m a -> a -> STM m ()
writeTVar (FetchClientStateVars m header
-> StrictTVar m (PeerFetchStatus header)
forall (m :: * -> *) header.
FetchClientStateVars m header
-> StrictTVar m (PeerFetchStatus header)
fetchClientStatusVar (FetchClientStateVars m header
 -> StrictTVar m (PeerFetchStatus header))
-> FetchClientStateVars m header
-> StrictTVar m (PeerFetchStatus header)
forall a b. (a -> b) -> a -> b
$ FetchClientContext header block m -> FetchClientStateVars m header
forall header block (m :: * -> *).
FetchClientContext header block m -> FetchClientStateVars m header
fetchClientCtxStateVars FetchClientContext header block m
ctx)
                             (Set (Point header) -> IsIdle -> PeerFetchStatus header
forall header.
Set (Point header) -> IsIdle -> PeerFetchStatus header
PeerFetchStatusReady Set (Point header)
forall a. Set a
Set.empty IsIdle
IsIdle)
                   (FetchClientContext header block m, (ThreadId m, StrictTMVar m ()))
-> STM
     m
     (FetchClientContext header block m, (ThreadId m, StrictTMVar m ()))
forall a. a -> STM m a
forall (m :: * -> *) a. Monad m => a -> m a
return (FetchClientContext header block m
ctx, (ThreadId m
cTid, StrictTMVar m ()
doneVar))
            )

        (atomically $ do
         -- we've been killed before the sync client started, cleanup
         writeTVar (fetchClientStatusVar $ fetchClientCtxStateVars ctx) PeerFetchStatusShutdown
         putTMVar ksVar ()
         modifyTVar keepRegistry $ \Map peer (ThreadId m, StrictTMVar m ())
m ->
           Bool
-> Map peer (ThreadId m, StrictTMVar m ())
-> Map peer (ThreadId m, StrictTMVar m ())
forall a. HasCallStack => Bool -> a -> a
assert (peer
peer peer -> Map peer (ThreadId m, StrictTMVar m ()) -> Bool
forall k a. Ord k => k -> Map k a -> Bool
`Map.member` Map peer (ThreadId m, StrictTMVar m ())
m) (Map peer (ThreadId m, StrictTMVar m ())
 -> Map peer (ThreadId m, StrictTMVar m ()))
-> Map peer (ThreadId m, StrictTMVar m ())
-> Map peer (ThreadId m, StrictTMVar m ())
forall a b. (a -> b) -> a -> b
$
           peer
-> Map peer (ThreadId m, StrictTMVar m ())
-> Map peer (ThreadId m, StrictTMVar m ())
forall k a. Ord k => k -> Map k a -> Map k a
Map.delete peer
peer Map peer (ThreadId m, StrictTMVar m ())
m

         modifyTVar fetchRegistry $ \Map peer (FetchClientStateVars m header)
m ->
           Bool
-> Map peer (FetchClientStateVars m header)
-> Map peer (FetchClientStateVars m header)
forall a. HasCallStack => Bool -> a -> a
assert (peer
peer peer -> Map peer (FetchClientStateVars m header) -> Bool
forall k a. Ord k => k -> Map k a -> Bool
`Map.member` Map peer (FetchClientStateVars m header)
m) (Map peer (FetchClientStateVars m header)
 -> Map peer (FetchClientStateVars m header))
-> Map peer (FetchClientStateVars m header)
-> Map peer (FetchClientStateVars m header)
forall a b. (a -> b) -> a -> b
$
           peer
-> Map peer (FetchClientStateVars m header)
-> Map peer (FetchClientStateVars m header)
forall k a. Ord k => k -> Map k a -> Map k a
Map.delete peer
peer Map peer (FetchClientStateVars m header)
m
         )

    unregister :: StrictTMVar m ()
               -> FetchClientContext header block m
               -> (ThreadId m, StrictTMVar m ())
               -> m ()
    unregister :: StrictTMVar m ()
-> FetchClientContext header block m
-> (ThreadId m, StrictTMVar m ())
-> m ()
unregister StrictTMVar m ()
ksVar FetchClientContext { fetchClientCtxStateVars :: forall header block (m :: * -> *).
FetchClientContext header block m -> FetchClientStateVars m header
fetchClientCtxStateVars = FetchClientStateVars m header
stateVars }
               (ThreadId m
tid, StrictTMVar m ()
doneVar)  = ((forall a. m a -> m a) -> m ()) -> m ()
forall b. ((forall a. m a -> m a) -> m b) -> m b
forall (m :: * -> *) b.
MonadMask m =>
((forall a. m a -> m a) -> m b) -> m b
uninterruptibleMask (((forall a. m a -> m a) -> m ()) -> m ())
-> ((forall a. m a -> m a) -> m ()) -> m ()
forall a b. (a -> b) -> a -> b
$ \forall a. m a -> m a
unmask -> do
      dead <- do
        -- Signal we are shutting down
        dieFast <- STM m Bool -> m Bool
forall a. HasCallStack => STM m a -> m a
forall (m :: * -> *) a.
(MonadSTM m, HasCallStack) =>
STM m a -> m a
atomically (STM m Bool -> m Bool) -> STM m Bool -> m Bool
forall a b. (a -> b) -> a -> b
$ do
          StrictTVar m (PeerFetchStatus header)
-> PeerFetchStatus header -> STM m ()
forall (m :: * -> *) a.
MonadSTM m =>
StrictTVar m a -> a -> STM m ()
writeTVar (FetchClientStateVars m header
-> StrictTVar m (PeerFetchStatus header)
forall (m :: * -> *) header.
FetchClientStateVars m header
-> StrictTVar m (PeerFetchStatus header)
fetchClientStatusVar FetchClientStateVars m header
stateVars) PeerFetchStatus header
forall header. PeerFetchStatus header
PeerFetchStatusShutdown

          dr <- StrictTVar m (Set peer) -> STM m (Set peer)
forall (m :: * -> *) a. MonadSTM m => StrictTVar m a -> STM m a
readTVar StrictTVar m (Set peer)
dyingRegistry
          return $ Set.member peer dr

        -- If keepAlive is dying we don't need to let chainsync exit cleanly
        if dieFast
           then do
             throwTo tid AsyncCancelled
             atomically $ readTMVar doneVar >> cleanup
             return True
           else return False

      if dead
         then return ()
         else do
           -- Give the sync client a chance to exit cleanly before killing it.
           res <- onException
                   (unmask $ timeout deactivateTimeout $ atomically $ readTMVar doneVar)
                   (-- no time to wait, die die die!
                    uninterruptibleMask_ $ do
                    throwTo tid AsyncCancelled
                    atomically $ readTMVar doneVar >> cleanup
                   )
           case res of
                  Maybe ()
Nothing -> do
                    ThreadId m -> AsyncCancelled -> m ()
forall e. Exception e => ThreadId m -> e -> m ()
forall (m :: * -> *) e.
(MonadFork m, Exception e) =>
ThreadId m -> e -> m ()
throwTo ThreadId m
tid AsyncCancelled
AsyncCancelled
                    STM m () -> m ()
forall a. HasCallStack => STM m a -> m a
forall (m :: * -> *) a.
(MonadSTM m, HasCallStack) =>
STM m a -> m a
atomically (STM m () -> m ()) -> STM m () -> m ()
forall a b. (a -> b) -> a -> b
$ StrictTMVar m () -> STM m ()
forall (m :: * -> *) a. MonadSTM m => StrictTMVar m a -> STM m a
readTMVar StrictTMVar m ()
doneVar STM m () -> STM m () -> STM m ()
forall a b. STM m a -> STM m b -> STM m b
forall (m :: * -> *) a b. Monad m => m a -> m b -> m b
>> STM m ()
cleanup
                  Just ()
_ -> STM m () -> m ()
forall a. HasCallStack => STM m a -> m a
forall (m :: * -> *) a.
(MonadSTM m, HasCallStack) =>
STM m a -> m a
atomically STM m ()
cleanup
     where
       cleanup :: STM m ()
cleanup = do
         StrictTVar m (Map peer (FetchClientStateVars m header))
-> (Map peer (FetchClientStateVars m header)
    -> Map peer (FetchClientStateVars m header))
-> STM m ()
forall (m :: * -> *) a.
MonadSTM m =>
StrictTVar m a -> (a -> a) -> STM m ()
modifyTVar StrictTVar m (Map peer (FetchClientStateVars m header))
fetchRegistry ((Map peer (FetchClientStateVars m header)
  -> Map peer (FetchClientStateVars m header))
 -> STM m ())
-> (Map peer (FetchClientStateVars m header)
    -> Map peer (FetchClientStateVars m header))
-> STM m ()
forall a b. (a -> b) -> a -> b
$ \Map peer (FetchClientStateVars m header)
m ->
           Bool
-> Map peer (FetchClientStateVars m header)
-> Map peer (FetchClientStateVars m header)
forall a. HasCallStack => Bool -> a -> a
assert (peer
peer peer -> Map peer (FetchClientStateVars m header) -> Bool
forall k a. Ord k => k -> Map k a -> Bool
`Map.member` Map peer (FetchClientStateVars m header)
m) (Map peer (FetchClientStateVars m header)
 -> Map peer (FetchClientStateVars m header))
-> Map peer (FetchClientStateVars m header)
-> Map peer (FetchClientStateVars m header)
forall a b. (a -> b) -> a -> b
$
           peer
-> Map peer (FetchClientStateVars m header)
-> Map peer (FetchClientStateVars m header)
forall k a. Ord k => k -> Map k a -> Map k a
Map.delete peer
peer Map peer (FetchClientStateVars m header)
m

         -- Signal to keepAlive that we're going away
         StrictTMVar m () -> () -> STM m ()
forall (m :: * -> *) a.
MonadSTM m =>
StrictTMVar m a -> a -> STM m ()
putTMVar StrictTMVar m ()
ksVar ()
         StrictTVar m (Map peer (ThreadId m, StrictTMVar m ()))
-> (Map peer (ThreadId m, StrictTMVar m ())
    -> Map peer (ThreadId m, StrictTMVar m ()))
-> STM m ()
forall (m :: * -> *) a.
MonadSTM m =>
StrictTVar m a -> (a -> a) -> STM m ()
modifyTVar StrictTVar m (Map peer (ThreadId m, StrictTMVar m ()))
keepRegistry ((Map peer (ThreadId m, StrictTMVar m ())
  -> Map peer (ThreadId m, StrictTMVar m ()))
 -> STM m ())
-> (Map peer (ThreadId m, StrictTMVar m ())
    -> Map peer (ThreadId m, StrictTMVar m ()))
-> STM m ()
forall a b. (a -> b) -> a -> b
$ \Map peer (ThreadId m, StrictTMVar m ())
m ->
           Bool
-> Map peer (ThreadId m, StrictTMVar m ())
-> Map peer (ThreadId m, StrictTMVar m ())
forall a. HasCallStack => Bool -> a -> a
assert (peer
peer peer -> Map peer (ThreadId m, StrictTMVar m ()) -> Bool
forall k a. Ord k => k -> Map k a -> Bool
`Map.member` Map peer (ThreadId m, StrictTMVar m ())
m) (Map peer (ThreadId m, StrictTMVar m ())
 -> Map peer (ThreadId m, StrictTMVar m ()))
-> Map peer (ThreadId m, StrictTMVar m ())
-> Map peer (ThreadId m, StrictTMVar m ())
forall a b. (a -> b) -> a -> b
$
           peer
-> Map peer (ThreadId m, StrictTMVar m ())
-> Map peer (ThreadId m, StrictTMVar m ())
forall k a. Ord k => k -> Map k a -> Map k a
Map.delete peer
peer Map peer (ThreadId m, StrictTMVar m ())
m



-- | The block fetch and chain sync clients for each peer need to synchronise
-- their startup and shutdown. This bracket operation provides that
-- synchronisation for the chain sync client.
--
-- This must be used for the chain sync client /outside/ of its own state
-- registration and deregistration.
--
bracketSyncWithFetchClient :: forall m a peer header block.
                              (MonadSTM m, MonadFork m, MonadCatch m,
                               Ord peer)
                           => FetchClientRegistry peer header block m
                           -> peer
                           -> m a
                           -> m a
bracketSyncWithFetchClient :: forall (m :: * -> *) a peer header block.
(MonadSTM m, MonadFork m, MonadCatch m, Ord peer) =>
FetchClientRegistry peer header block m -> peer -> m a -> m a
bracketSyncWithFetchClient (FetchClientRegistry StrictTMVar
  m
  (Tracer m (TraceLabelPeer peer (TraceFetchClientState header)),
   WhetherReceivingTentativeBlocks
   -> STM m (FetchClientPolicy header block m))
_ctxVar
                              StrictTVar m (Map peer (FetchClientStateVars m header))
_fetchRegistry StrictTVar
  m (Map peer (ThreadId m, StrictTMVar m (), StrictTMVar m ()))
syncRegistry StrictTVar m (Map peer PeerGSV)
_dqRegistry StrictTVar m (Map peer (ThreadId m, StrictTMVar m ()))
_keepRegistry StrictTVar m (Set peer)
_dyingRegistry) peer
peer m a
action = do
    doneVar <- m (StrictTMVar m ())
forall (m :: * -> *) a. MonadSTM m => m (StrictTMVar m a)
newEmptyTMVarIO
    startVar <- newEmptyTMVarIO
    bracket_ (register doneVar startVar) (unregister doneVar) action
  where
    -- The goal here is that the block fetch client should be registered
    -- before the sync client starts running.
    --
    -- On the shutdown side, the sync client should stop before the block fetch
    -- is unregistered. This has to happen even if either client is terminated
    -- abnormally or being cancelled (which of course can happen in any order).

    register :: StrictTMVar m () -> StrictTMVar m () -> m ()
    register :: StrictTMVar m () -> StrictTMVar m () -> m ()
register StrictTMVar m ()
doneVar StrictTMVar m ()
startVar = do
      tid <- m (ThreadId m)
forall (m :: * -> *). MonadThread m => m (ThreadId m)
myThreadId
      -- We first register ourselves
      atomically $ do
        -- wait for any potential older chainsync clients to finish cleanup
        sr <- readTVar syncRegistry
        check (peer `Map.notMember` sr)

        modifyTVar syncRegistry $ \Map peer (ThreadId m, StrictTMVar m (), StrictTMVar m ())
m ->
          Bool
-> Map peer (ThreadId m, StrictTMVar m (), StrictTMVar m ())
-> Map peer (ThreadId m, StrictTMVar m (), StrictTMVar m ())
forall a. HasCallStack => Bool -> a -> a
assert (peer
peer peer
-> Map peer (ThreadId m, StrictTMVar m (), StrictTMVar m ())
-> Bool
forall k a. Ord k => k -> Map k a -> Bool
`Map.notMember` Map peer (ThreadId m, StrictTMVar m (), StrictTMVar m ())
m) (Map peer (ThreadId m, StrictTMVar m (), StrictTMVar m ())
 -> Map peer (ThreadId m, StrictTMVar m (), StrictTMVar m ()))
-> Map peer (ThreadId m, StrictTMVar m (), StrictTMVar m ())
-> Map peer (ThreadId m, StrictTMVar m (), StrictTMVar m ())
forall a b. (a -> b) -> a -> b
$
          peer
-> (ThreadId m, StrictTMVar m (), StrictTMVar m ())
-> Map peer (ThreadId m, StrictTMVar m (), StrictTMVar m ())
-> Map peer (ThreadId m, StrictTMVar m (), StrictTMVar m ())
forall k a. Ord k => k -> a -> Map k a -> Map k a
Map.insert peer
peer (ThreadId m
tid, StrictTMVar m ()
doneVar, StrictTMVar m ()
startVar) Map peer (ThreadId m, StrictTMVar m (), StrictTMVar m ())
m
      -- Then we wait for fetch to notice us
      onException (atomically $ readTMVar startVar) (unregister doneVar)

    unregister :: StrictTMVar m () -> m ()
    unregister :: StrictTMVar m () -> m ()
unregister StrictTMVar m ()
doneVar =
      STM m () -> m ()
forall a. HasCallStack => STM m a -> m a
forall (m :: * -> *) a.
(MonadSTM m, HasCallStack) =>
STM m a -> m a
atomically (STM m () -> m ()) -> STM m () -> m ()
forall a b. (a -> b) -> a -> b
$ do
        StrictTMVar m () -> () -> STM m ()
forall (m :: * -> *) a.
MonadSTM m =>
StrictTMVar m a -> a -> STM m ()
putTMVar StrictTMVar m ()
doneVar ()
        StrictTVar
  m (Map peer (ThreadId m, StrictTMVar m (), StrictTMVar m ()))
-> (Map peer (ThreadId m, StrictTMVar m (), StrictTMVar m ())
    -> Map peer (ThreadId m, StrictTMVar m (), StrictTMVar m ()))
-> STM m ()
forall (m :: * -> *) a.
MonadSTM m =>
StrictTVar m a -> (a -> a) -> STM m ()
modifyTVar StrictTVar
  m (Map peer (ThreadId m, StrictTMVar m (), StrictTMVar m ()))
syncRegistry ((Map peer (ThreadId m, StrictTMVar m (), StrictTMVar m ())
  -> Map peer (ThreadId m, StrictTMVar m (), StrictTMVar m ()))
 -> STM m ())
-> (Map peer (ThreadId m, StrictTMVar m (), StrictTMVar m ())
    -> Map peer (ThreadId m, StrictTMVar m (), StrictTMVar m ()))
-> STM m ()
forall a b. (a -> b) -> a -> b
$ \Map peer (ThreadId m, StrictTMVar m (), StrictTMVar m ())
m ->
          Bool
-> Map peer (ThreadId m, StrictTMVar m (), StrictTMVar m ())
-> Map peer (ThreadId m, StrictTMVar m (), StrictTMVar m ())
forall a. HasCallStack => Bool -> a -> a
assert (peer
peer peer
-> Map peer (ThreadId m, StrictTMVar m (), StrictTMVar m ())
-> Bool
forall k a. Ord k => k -> Map k a -> Bool
`Map.member` Map peer (ThreadId m, StrictTMVar m (), StrictTMVar m ())
m) (Map peer (ThreadId m, StrictTMVar m (), StrictTMVar m ())
 -> Map peer (ThreadId m, StrictTMVar m (), StrictTMVar m ()))
-> Map peer (ThreadId m, StrictTMVar m (), StrictTMVar m ())
-> Map peer (ThreadId m, StrictTMVar m (), StrictTMVar m ())
forall a b. (a -> b) -> a -> b
$
          peer
-> Map peer (ThreadId m, StrictTMVar m (), StrictTMVar m ())
-> Map peer (ThreadId m, StrictTMVar m (), StrictTMVar m ())
forall k a. Ord k => k -> Map k a -> Map k a
Map.delete peer
peer Map peer (ThreadId m, StrictTMVar m (), StrictTMVar m ())
m

bracketKeepAliveClient :: forall m a peer header block.
                              (MonadSTM m, MonadFork m, MonadMask m, Ord peer)
                       => FetchClientRegistry peer header block m
                       -> peer
                       -> (StrictTVar m (Map peer PeerGSV) -> m a)
                       -> m a
bracketKeepAliveClient :: forall (m :: * -> *) a peer header block.
(MonadSTM m, MonadFork m, MonadMask m, Ord peer) =>
FetchClientRegistry peer header block m
-> peer -> (StrictTVar m (Map peer PeerGSV) -> m a) -> m a
bracketKeepAliveClient(FetchClientRegistry StrictTMVar
  m
  (Tracer m (TraceLabelPeer peer (TraceFetchClientState header)),
   WhetherReceivingTentativeBlocks
   -> STM m (FetchClientPolicy header block m))
_ctxVar
                              StrictTVar m (Map peer (FetchClientStateVars m header))
_fetchRegistry StrictTVar
  m (Map peer (ThreadId m, StrictTMVar m (), StrictTMVar m ()))
_syncRegistry StrictTVar m (Map peer PeerGSV)
dqRegistry StrictTVar m (Map peer (ThreadId m, StrictTMVar m ()))
keepRegistry StrictTVar m (Set peer)
dyingRegistry) peer
peer StrictTVar m (Map peer PeerGSV) -> m a
action = do
    m () -> m () -> m a -> m a
forall a b c. m a -> m b -> m c -> m c
forall (m :: * -> *) a b c.
MonadThrow m =>
m a -> m b -> m c -> m c
bracket_ m ()
register m ()
unregister (StrictTVar m (Map peer PeerGSV) -> m a
action StrictTVar m (Map peer PeerGSV)
dqRegistry)
  where
    -- the keepAliveClient will register a PeerGSV and the block fetch client will wait on it.
    register :: m ()
    register :: m ()
register =
      STM m () -> m ()
forall a. HasCallStack => STM m a -> m a
forall (m :: * -> *) a.
(MonadSTM m, HasCallStack) =>
STM m a -> m a
atomically (STM m () -> m ()) -> STM m () -> m ()
forall a b. (a -> b) -> a -> b
$ do
        -- Wait for previous keep alive client to cleanup
        dr <- StrictTVar m (Map peer PeerGSV) -> STM m (Map peer PeerGSV)
forall (m :: * -> *) a. MonadSTM m => StrictTVar m a -> STM m a
readTVar StrictTVar m (Map peer PeerGSV)
dqRegistry
        check (peer `Map.notMember` dr)

        modifyTVar dqRegistry $ \Map peer PeerGSV
m ->
          Bool -> Map peer PeerGSV -> Map peer PeerGSV
forall a. HasCallStack => Bool -> a -> a
assert (peer
peer peer -> Map peer PeerGSV -> Bool
forall k a. Ord k => k -> Map k a -> Bool
`Map.notMember` Map peer PeerGSV
m) (Map peer PeerGSV -> Map peer PeerGSV)
-> Map peer PeerGSV -> Map peer PeerGSV
forall a b. (a -> b) -> a -> b
$
          peer -> PeerGSV -> Map peer PeerGSV -> Map peer PeerGSV
forall k a. Ord k => k -> a -> Map k a -> Map k a
Map.insert peer
peer PeerGSV
defaultGSV Map peer PeerGSV
m

    -- It is possible for the keepAlive client to keep running even without a fetch client, but
    -- a fetch client shouldn't run without a keepAlive client.
    unregister :: m ()
    unregister :: m ()
unregister = m () -> m ()
forall a. m a -> m a
forall (m :: * -> *) a. MonadMask m => m a -> m a
uninterruptibleMask_ (m () -> m ()) -> m () -> m ()
forall a b. (a -> b) -> a -> b
$ do
      fetchclient_m <- STM m (Maybe (ThreadId m, StrictTMVar m ()))
-> m (Maybe (ThreadId m, StrictTMVar m ()))
forall a. HasCallStack => STM m a -> m a
forall (m :: * -> *) a.
(MonadSTM m, HasCallStack) =>
STM m a -> m a
atomically (STM m (Maybe (ThreadId m, StrictTMVar m ()))
 -> m (Maybe (ThreadId m, StrictTMVar m ())))
-> STM m (Maybe (ThreadId m, StrictTMVar m ()))
-> m (Maybe (ThreadId m, StrictTMVar m ()))
forall a b. (a -> b) -> a -> b
$ do
        fetchclients <- StrictTVar m (Map peer (ThreadId m, StrictTMVar m ()))
-> STM m (Map peer (ThreadId m, StrictTMVar m ()))
forall (m :: * -> *) a. MonadSTM m => StrictTVar m a -> STM m a
readTVar StrictTVar m (Map peer (ThreadId m, StrictTMVar m ()))
keepRegistry
        case Map.lookup peer fetchclients of
             Maybe (ThreadId m, StrictTMVar m ())
Nothing -> do
               -- If the fetch client is already dead we remove PeerGSV ourself directly.
               StrictTVar m (Map peer PeerGSV)
-> (Map peer PeerGSV -> Map peer PeerGSV) -> STM m ()
forall (m :: * -> *) a.
MonadSTM m =>
StrictTVar m a -> (a -> a) -> STM m ()
modifyTVar StrictTVar m (Map peer PeerGSV)
dqRegistry ((Map peer PeerGSV -> Map peer PeerGSV) -> STM m ())
-> (Map peer PeerGSV -> Map peer PeerGSV) -> STM m ()
forall a b. (a -> b) -> a -> b
$ \Map peer PeerGSV
m ->
                 Bool -> Map peer PeerGSV -> Map peer PeerGSV
forall a. HasCallStack => Bool -> a -> a
assert (peer
peer peer -> Map peer PeerGSV -> Bool
forall k a. Ord k => k -> Map k a -> Bool
`Map.member` Map peer PeerGSV
m) (Map peer PeerGSV -> Map peer PeerGSV)
-> Map peer PeerGSV -> Map peer PeerGSV
forall a b. (a -> b) -> a -> b
$
                 peer -> Map peer PeerGSV -> Map peer PeerGSV
forall k a. Ord k => k -> Map k a -> Map k a
Map.delete peer
peer Map peer PeerGSV
m
               Maybe (ThreadId m, StrictTMVar m ())
-> STM m (Maybe (ThreadId m, StrictTMVar m ()))
forall a. a -> STM m a
forall (m :: * -> *) a. Monad m => a -> m a
return Maybe (ThreadId m, StrictTMVar m ())
forall a. Maybe a
Nothing
             Just (ThreadId m, StrictTMVar m ())
rc -> do
               -- Prevent a new fetchclient from starting while we are killing the old one.
               StrictTVar m (Set peer) -> (Set peer -> Set peer) -> STM m ()
forall (m :: * -> *) a.
MonadSTM m =>
StrictTVar m a -> (a -> a) -> STM m ()
modifyTVar StrictTVar m (Set peer)
dyingRegistry ((Set peer -> Set peer) -> STM m ())
-> (Set peer -> Set peer) -> STM m ()
forall a b. (a -> b) -> a -> b
$ \Set peer
s ->
                 Bool -> Set peer -> Set peer
forall a. HasCallStack => Bool -> a -> a
assert (peer
peer peer -> Set peer -> Bool
forall a. Ord a => a -> Set a -> Bool
`Set.notMember` Set peer
s) (Set peer -> Set peer) -> Set peer -> Set peer
forall a b. (a -> b) -> a -> b
$
                 peer -> Set peer -> Set peer
forall a. Ord a => a -> Set a -> Set a
Set.insert peer
peer Set peer
s
               Maybe (ThreadId m, StrictTMVar m ())
-> STM m (Maybe (ThreadId m, StrictTMVar m ()))
forall a. a -> STM m a
forall (m :: * -> *) a. Monad m => a -> m a
return (Maybe (ThreadId m, StrictTMVar m ())
 -> STM m (Maybe (ThreadId m, StrictTMVar m ())))
-> Maybe (ThreadId m, StrictTMVar m ())
-> STM m (Maybe (ThreadId m, StrictTMVar m ()))
forall a b. (a -> b) -> a -> b
$ (ThreadId m, StrictTMVar m ())
-> Maybe (ThreadId m, StrictTMVar m ())
forall a. a -> Maybe a
Just (ThreadId m, StrictTMVar m ())
rc
      case fetchclient_m of
           Maybe (ThreadId m, StrictTMVar m ())
Nothing -> () -> m ()
forall a. a -> m a
forall (m :: * -> *) a. Monad m => a -> m a
return ()
           Just (ThreadId m
tid, StrictTMVar m ()
doneVar) -> do
             -- Cancel the fetch client.
             ThreadId m -> AsyncCancelled -> m ()
forall e. Exception e => ThreadId m -> e -> m ()
forall (m :: * -> *) e.
(MonadFork m, Exception e) =>
ThreadId m -> e -> m ()
throwTo ThreadId m
tid AsyncCancelled
AsyncCancelled
             STM m () -> m ()
forall a. HasCallStack => STM m a -> m a
forall (m :: * -> *) a.
(MonadSTM m, HasCallStack) =>
STM m a -> m a
atomically (STM m () -> m ()) -> STM m () -> m ()
forall a b. (a -> b) -> a -> b
$ do
               -- wait for fetch client to exit.
               StrictTMVar m () -> STM m ()
forall (m :: * -> *) a. MonadSTM m => StrictTMVar m a -> STM m a
readTMVar StrictTMVar m ()
doneVar
               StrictTVar m (Map peer PeerGSV)
-> (Map peer PeerGSV -> Map peer PeerGSV) -> STM m ()
forall (m :: * -> *) a.
MonadSTM m =>
StrictTVar m a -> (a -> a) -> STM m ()
modifyTVar StrictTVar m (Map peer PeerGSV)
dqRegistry ((Map peer PeerGSV -> Map peer PeerGSV) -> STM m ())
-> (Map peer PeerGSV -> Map peer PeerGSV) -> STM m ()
forall a b. (a -> b) -> a -> b
$ \Map peer PeerGSV
m ->
                 Bool -> Map peer PeerGSV -> Map peer PeerGSV
forall a. HasCallStack => Bool -> a -> a
assert (peer
peer peer -> Map peer PeerGSV -> Bool
forall k a. Ord k => k -> Map k a -> Bool
`Map.member` Map peer PeerGSV
m) (Map peer PeerGSV -> Map peer PeerGSV)
-> Map peer PeerGSV -> Map peer PeerGSV
forall a b. (a -> b) -> a -> b
$
                 peer -> Map peer PeerGSV -> Map peer PeerGSV
forall k a. Ord k => k -> Map k a -> Map k a
Map.delete peer
peer Map peer PeerGSV
m
               StrictTVar m (Set peer) -> (Set peer -> Set peer) -> STM m ()
forall (m :: * -> *) a.
MonadSTM m =>
StrictTVar m a -> (a -> a) -> STM m ()
modifyTVar StrictTVar m (Set peer)
dyingRegistry ((Set peer -> Set peer) -> STM m ())
-> (Set peer -> Set peer) -> STM m ()
forall a b. (a -> b) -> a -> b
$ \Set peer
s ->
                 Bool -> Set peer -> Set peer
forall a. HasCallStack => Bool -> a -> a
assert (peer
peer peer -> Set peer -> Bool
forall a. Ord a => a -> Set a -> Bool
`Set.member` Set peer
s) (Set peer -> Set peer) -> Set peer -> Set peer
forall a b. (a -> b) -> a -> b
$
                 peer -> Set peer -> Set peer
forall a. Ord a => a -> Set a -> Set a
Set.delete peer
peer Set peer
s

setFetchClientContext :: MonadSTM m
                      => FetchClientRegistry peer header block m
                      -> Tracer m (TraceLabelPeer peer (TraceFetchClientState header))
                      -> (   WhetherReceivingTentativeBlocks
                          -> STM m (FetchClientPolicy header block m)
                         )
                      -> m ()
setFetchClientContext :: forall (m :: * -> *) peer header block.
MonadSTM m =>
FetchClientRegistry peer header block m
-> Tracer m (TraceLabelPeer peer (TraceFetchClientState header))
-> (WhetherReceivingTentativeBlocks
    -> STM m (FetchClientPolicy header block m))
-> m ()
setFetchClientContext (FetchClientRegistry StrictTMVar
  m
  (Tracer m (TraceLabelPeer peer (TraceFetchClientState header)),
   WhetherReceivingTentativeBlocks
   -> STM m (FetchClientPolicy header block m))
ctxVar StrictTVar m (Map peer (FetchClientStateVars m header))
_ StrictTVar
  m (Map peer (ThreadId m, StrictTMVar m (), StrictTMVar m ()))
_ StrictTVar m (Map peer PeerGSV)
_ StrictTVar m (Map peer (ThreadId m, StrictTMVar m ()))
_ StrictTVar m (Set peer)
_) Tracer m (TraceLabelPeer peer (TraceFetchClientState header))
tracer WhetherReceivingTentativeBlocks
-> STM m (FetchClientPolicy header block m)
mkPolicy =
    STM m () -> m ()
forall a. HasCallStack => STM m a -> m a
forall (m :: * -> *) a.
(MonadSTM m, HasCallStack) =>
STM m a -> m a
atomically (STM m () -> m ()) -> STM m () -> m ()
forall a b. (a -> b) -> a -> b
$ do
      ok <- StrictTMVar
  m
  (Tracer m (TraceLabelPeer peer (TraceFetchClientState header)),
   WhetherReceivingTentativeBlocks
   -> STM m (FetchClientPolicy header block m))
-> (Tracer m (TraceLabelPeer peer (TraceFetchClientState header)),
    WhetherReceivingTentativeBlocks
    -> STM m (FetchClientPolicy header block m))
-> STM m Bool
forall (m :: * -> *) a.
MonadSTM m =>
StrictTMVar m a -> a -> STM m Bool
tryPutTMVar StrictTMVar
  m
  (Tracer m (TraceLabelPeer peer (TraceFetchClientState header)),
   WhetherReceivingTentativeBlocks
   -> STM m (FetchClientPolicy header block m))
ctxVar (Tracer m (TraceLabelPeer peer (TraceFetchClientState header))
tracer, WhetherReceivingTentativeBlocks
-> STM m (FetchClientPolicy header block m)
mkPolicy)
      unless ok $ error "setFetchClientContext: called more than once"

-- | A read-only 'STM' action to get the current 'PeerFetchStatus' for all
-- fetch clients in the 'FetchClientRegistry'.
--
readFetchClientsStatus :: MonadSTM m
                       => FetchClientRegistry peer header block m
                       -> STM m (Map peer (PeerFetchStatus header))
readFetchClientsStatus :: forall (m :: * -> *) peer header block.
MonadSTM m =>
FetchClientRegistry peer header block m
-> STM m (Map peer (PeerFetchStatus header))
readFetchClientsStatus (FetchClientRegistry StrictTMVar
  m
  (Tracer m (TraceLabelPeer peer (TraceFetchClientState header)),
   WhetherReceivingTentativeBlocks
   -> STM m (FetchClientPolicy header block m))
_ StrictTVar m (Map peer (FetchClientStateVars m header))
registry StrictTVar
  m (Map peer (ThreadId m, StrictTMVar m (), StrictTMVar m ()))
_ StrictTVar m (Map peer PeerGSV)
_ StrictTVar m (Map peer (ThreadId m, StrictTMVar m ()))
_ StrictTVar m (Set peer)
_) =
  StrictTVar m (Map peer (FetchClientStateVars m header))
-> STM m (Map peer (FetchClientStateVars m header))
forall (m :: * -> *) a. MonadSTM m => StrictTVar m a -> STM m a
readTVar StrictTVar m (Map peer (FetchClientStateVars m header))
registry STM m (Map peer (FetchClientStateVars m header))
-> (Map peer (FetchClientStateVars m header)
    -> STM m (Map peer (PeerFetchStatus header)))
-> STM m (Map peer (PeerFetchStatus header))
forall a b. STM m a -> (a -> STM m b) -> STM m b
forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= (FetchClientStateVars m header -> STM m (PeerFetchStatus header))
-> Map peer (FetchClientStateVars m header)
-> STM m (Map peer (PeerFetchStatus header))
forall (t :: * -> *) (f :: * -> *) a b.
(Traversable t, Applicative f) =>
(a -> f b) -> t a -> f (t b)
forall (f :: * -> *) a b.
Applicative f =>
(a -> f b) -> Map peer a -> f (Map peer b)
traverse (StrictTVar m (PeerFetchStatus header)
-> STM m (PeerFetchStatus header)
forall (m :: * -> *) a. MonadSTM m => StrictTVar m a -> STM m a
readTVar (StrictTVar m (PeerFetchStatus header)
 -> STM m (PeerFetchStatus header))
-> (FetchClientStateVars m header
    -> StrictTVar m (PeerFetchStatus header))
-> FetchClientStateVars m header
-> STM m (PeerFetchStatus header)
forall b c a. (b -> c) -> (a -> b) -> a -> c
. FetchClientStateVars m header
-> StrictTVar m (PeerFetchStatus header)
forall (m :: * -> *) header.
FetchClientStateVars m header
-> StrictTVar m (PeerFetchStatus header)
fetchClientStatusVar)

-- | A read-only 'STM' action to get the 'FetchClientStateVars' for all fetch
-- clients in the 'FetchClientRegistry'.
--
readFetchClientsStateVars :: MonadSTM m
                          => FetchClientRegistry peer header block m
                          -> STM m (Map peer (FetchClientStateVars m header))
readFetchClientsStateVars :: forall (m :: * -> *) peer header block.
MonadSTM m =>
FetchClientRegistry peer header block m
-> STM m (Map peer (FetchClientStateVars m header))
readFetchClientsStateVars (FetchClientRegistry StrictTMVar
  m
  (Tracer m (TraceLabelPeer peer (TraceFetchClientState header)),
   WhetherReceivingTentativeBlocks
   -> STM m (FetchClientPolicy header block m))
_ StrictTVar m (Map peer (FetchClientStateVars m header))
registry StrictTVar
  m (Map peer (ThreadId m, StrictTMVar m (), StrictTMVar m ()))
_ StrictTVar m (Map peer PeerGSV)
_ StrictTVar m (Map peer (ThreadId m, StrictTMVar m ()))
_ StrictTVar m (Set peer)
_) = StrictTVar m (Map peer (FetchClientStateVars m header))
-> STM m (Map peer (FetchClientStateVars m header))
forall (m :: * -> *) a. MonadSTM m => StrictTVar m a -> STM m a
readTVar StrictTVar m (Map peer (FetchClientStateVars m header))
registry

-- | A read-only 'STM' action to get the 'PeerGSV's for all fetch
-- clients in the 'FetchClientRegistry'.
--
readPeerGSVs :: forall block header m peer.
                ( MonadSTM m, Ord peer)
             => FetchClientRegistry peer header block m
             -> STM m (Map peer PeerGSV)
readPeerGSVs :: forall block header (m :: * -> *) peer.
(MonadSTM m, Ord peer) =>
FetchClientRegistry peer header block m -> STM m (Map peer PeerGSV)
readPeerGSVs (FetchClientRegistry StrictTMVar
  m
  (Tracer m (TraceLabelPeer peer (TraceFetchClientState header)),
   WhetherReceivingTentativeBlocks
   -> STM m (FetchClientPolicy header block m))
_ StrictTVar m (Map peer (FetchClientStateVars m header))
_ StrictTVar
  m (Map peer (ThreadId m, StrictTMVar m (), StrictTMVar m ()))
_ StrictTVar m (Map peer PeerGSV)
dqRegistry StrictTVar m (Map peer (ThreadId m, StrictTMVar m ()))
keepRegistry StrictTVar m (Set peer)
_) = do
  dr <- StrictTVar m (Map peer PeerGSV) -> STM m (Map peer PeerGSV)
forall (m :: * -> *) a. MonadSTM m => StrictTVar m a -> STM m a
readTVar StrictTVar m (Map peer PeerGSV)
dqRegistry
  kr <- readTVar keepRegistry
  -- The intersection gives us only the currently hot peers
  return $ Map.intersection dr kr