{-# LANGUAGE FlexibleContexts    #-}
{-# LANGUAGE NamedFieldPuns      #-}
{-# LANGUAGE ScopedTypeVariables #-}

module Ouroboros.Network.BlockFetch.ClientRegistry
  ( -- * Registry of block fetch clients
    FetchClientRegistry (..)
  , newFetchClientRegistry
  , bracketFetchClient
  , bracketSyncWithFetchClient
  , setFetchClientContext
  , FetchClientPolicy (..)
  , readFetchClientsStatus
  , readFetchClientsStateVars
    -- * KeepAlive registry
  , module KeepAlive
  ) where

import Data.Functor.Contravariant (contramap)
import Data.Map (Map)
import Data.Map qualified as Map
import Data.Set qualified as Set

import Control.Concurrent.Class.MonadSTM.Strict
import Control.Exception (assert)
import Control.Monad (unless)
import Control.Monad.Class.MonadAsync
import Control.Monad.Class.MonadFork (MonadFork (throwTo),
           MonadThread (ThreadId, myThreadId))
import Control.Monad.Class.MonadThrow
import Control.Monad.Class.MonadTimer.SI
import Control.Tracer (Tracer)

import Ouroboros.Network.BlockFetch.ClientState
import Ouroboros.Network.Diffusion.Policies (deactivateTimeout)
import Ouroboros.Network.KeepAlive.Registry as KeepAlive



-- | A registry for the threads that are executing the client side of the
-- 'BlockFetch' protocol to communicate with our peers.
--
-- The registry contains the shared variables we use to communicate with these
-- threads, both to track their status and to provide instructions.
--
-- The threads add\/remove themselves to\/from this registry when they start up
-- and shut down.
--
data FetchClientRegistry peer header block m =
     FetchClientRegistry {
       forall peer header block (m :: * -> *).
FetchClientRegistry peer header block m
-> StrictTMVar
     m
     (Tracer m (TraceLabelPeer peer (TraceFetchClientState header)),
      STM m (FetchClientPolicy header block m))
ctxVar
         :: StrictTMVar
              m ( Tracer m (TraceLabelPeer peer (TraceFetchClientState header))
                , STM m (FetchClientPolicy header block m)
                ),
       forall peer header block (m :: * -> *).
FetchClientRegistry peer header block m
-> StrictTVar m (Map peer (FetchClientStateVars m header))
fetchRegistry
         :: StrictTVar  m (Map peer (FetchClientStateVars m header)),
       forall peer header block (m :: * -> *).
FetchClientRegistry peer header block m
-> StrictTVar
     m (Map peer (ThreadId m, StrictTMVar m (), StrictTMVar m ()))
syncRegistry
         :: StrictTVar  m (Map peer (ThreadId m, StrictTMVar m (), StrictTMVar m ()))
    }


newFetchClientRegistry :: MonadSTM m
                       => m (FetchClientRegistry peer header block m)
newFetchClientRegistry :: forall (m :: * -> *) peer header block.
MonadSTM m =>
m (FetchClientRegistry peer header block m)
newFetchClientRegistry = StrictTMVar
  m
  (Tracer m (TraceLabelPeer peer (TraceFetchClientState header)),
   STM m (FetchClientPolicy header block m))
-> StrictTVar m (Map peer (FetchClientStateVars m header))
-> StrictTVar
     m (Map peer (ThreadId m, StrictTMVar m (), StrictTMVar m ()))
-> FetchClientRegistry peer header block m
forall peer header block (m :: * -> *).
StrictTMVar
  m
  (Tracer m (TraceLabelPeer peer (TraceFetchClientState header)),
   STM m (FetchClientPolicy header block m))
-> StrictTVar m (Map peer (FetchClientStateVars m header))
-> StrictTVar
     m (Map peer (ThreadId m, StrictTMVar m (), StrictTMVar m ()))
-> FetchClientRegistry peer header block m
FetchClientRegistry (StrictTMVar
   m
   (Tracer m (TraceLabelPeer peer (TraceFetchClientState header)),
    STM m (FetchClientPolicy header block m))
 -> StrictTVar m (Map peer (FetchClientStateVars m header))
 -> StrictTVar
      m (Map peer (ThreadId m, StrictTMVar m (), StrictTMVar m ()))
 -> FetchClientRegistry peer header block m)
-> m (StrictTMVar
        m
        (Tracer m (TraceLabelPeer peer (TraceFetchClientState header)),
         STM m (FetchClientPolicy header block m)))
-> m (StrictTVar m (Map peer (FetchClientStateVars m header))
      -> StrictTVar
           m (Map peer (ThreadId m, StrictTMVar m (), StrictTMVar m ()))
      -> FetchClientRegistry peer header block m)
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> m (StrictTMVar
     m
     (Tracer m (TraceLabelPeer peer (TraceFetchClientState header)),
      STM m (FetchClientPolicy header block m)))
forall (m :: * -> *) a. MonadSTM m => m (StrictTMVar m a)
newEmptyTMVarIO
                                             m (StrictTVar m (Map peer (FetchClientStateVars m header))
   -> StrictTVar
        m (Map peer (ThreadId m, StrictTMVar m (), StrictTMVar m ()))
   -> FetchClientRegistry peer header block m)
-> m (StrictTVar m (Map peer (FetchClientStateVars m header)))
-> m (StrictTVar
        m (Map peer (ThreadId m, StrictTMVar m (), StrictTMVar m ()))
      -> FetchClientRegistry peer header block m)
forall a b. m (a -> b) -> m a -> m b
forall (f :: * -> *) a b. Applicative f => f (a -> b) -> f a -> f b
<*> Map peer (FetchClientStateVars m header)
-> m (StrictTVar m (Map peer (FetchClientStateVars m header)))
forall (m :: * -> *) a. MonadSTM m => a -> m (StrictTVar m a)
newTVarIO Map peer (FetchClientStateVars m header)
forall k a. Map k a
Map.empty
                                             m (StrictTVar
     m (Map peer (ThreadId m, StrictTMVar m (), StrictTMVar m ()))
   -> FetchClientRegistry peer header block m)
-> m (StrictTVar
        m (Map peer (ThreadId m, StrictTMVar m (), StrictTMVar m ())))
-> m (FetchClientRegistry peer header block m)
forall a b. m (a -> b) -> m a -> m b
forall (f :: * -> *) a b. Applicative f => f (a -> b) -> f a -> f b
<*> Map peer (ThreadId m, StrictTMVar m (), StrictTMVar m ())
-> m (StrictTVar
        m (Map peer (ThreadId m, StrictTMVar m (), StrictTMVar m ())))
forall (m :: * -> *) a. MonadSTM m => a -> m (StrictTVar m a)
newTVarIO Map peer (ThreadId m, StrictTMVar m (), StrictTMVar m ())
forall k a. Map k a
Map.empty

-- | This is needed to start a block fetch client. It provides the required
-- 'FetchClientContext'. It registers and unregisters the fetch client on
-- start and end.
--
-- It also manages synchronisation with the corresponding chain sync client.
--
bracketFetchClient :: forall m a peer header block version.
                      (MonadFork m, MonadMask m, MonadTimer m, Ord peer)
                   => FetchClientRegistry peer header block m
                   -> KeepAliveRegistry peer m
                   -> version
                   -> peer
                   -> (FetchClientContext header block m -> m a)
                   -> m a
bracketFetchClient :: forall (m :: * -> *) a peer header block version.
(MonadFork m, MonadMask m, MonadTimer m, Ord peer) =>
FetchClientRegistry peer header block m
-> KeepAliveRegistry peer m
-> version
-> peer
-> (FetchClientContext header block m -> m a)
-> m a
bracketFetchClient FetchClientRegistry { StrictTMVar
  m
  (Tracer m (TraceLabelPeer peer (TraceFetchClientState header)),
   STM m (FetchClientPolicy header block m))
ctxVar :: forall peer header block (m :: * -> *).
FetchClientRegistry peer header block m
-> StrictTMVar
     m
     (Tracer m (TraceLabelPeer peer (TraceFetchClientState header)),
      STM m (FetchClientPolicy header block m))
ctxVar :: StrictTMVar
  m
  (Tracer m (TraceLabelPeer peer (TraceFetchClientState header)),
   STM m (FetchClientPolicy header block m))
ctxVar, StrictTVar m (Map peer (FetchClientStateVars m header))
fetchRegistry :: forall peer header block (m :: * -> *).
FetchClientRegistry peer header block m
-> StrictTVar m (Map peer (FetchClientStateVars m header))
fetchRegistry :: StrictTVar m (Map peer (FetchClientStateVars m header))
fetchRegistry, StrictTVar
  m (Map peer (ThreadId m, StrictTMVar m (), StrictTMVar m ()))
syncRegistry :: forall peer header block (m :: * -> *).
FetchClientRegistry peer header block m
-> StrictTVar
     m (Map peer (ThreadId m, StrictTMVar m (), StrictTMVar m ()))
syncRegistry :: StrictTVar
  m (Map peer (ThreadId m, StrictTMVar m (), StrictTMVar m ()))
syncRegistry }
                   KeepAliveRegistry { StrictTVar m (Map peer PeerGSV)
dqRegistry :: StrictTVar m (Map peer PeerGSV)
dqRegistry :: forall peer (m :: * -> *).
KeepAliveRegistry peer m -> StrictTVar m (Map peer PeerGSV)
dqRegistry, StrictTVar m (Map peer (ThreadId m, StrictTMVar m ()))
keepRegistry :: StrictTVar m (Map peer (ThreadId m, StrictTMVar m ()))
keepRegistry :: forall peer (m :: * -> *).
KeepAliveRegistry peer m
-> StrictTVar m (Map peer (ThreadId m, StrictTMVar m ()))
keepRegistry, StrictTVar m (Set peer)
dyingRegistry :: StrictTVar m (Set peer)
dyingRegistry :: forall peer (m :: * -> *).
KeepAliveRegistry peer m -> StrictTVar m (Set peer)
dyingRegistry }
                   version
_version peer
peer FetchClientContext header block m -> m a
action = do
    ksVar <- m (StrictTMVar m ())
forall (m :: * -> *) a. MonadSTM m => m (StrictTMVar m a)
newEmptyTMVarIO
    fst <$> generalBracket (register ksVar) (unregister ksVar) (action . fst)
  where
    onExceptionTimeout :: DiffTime
    onExceptionTimeout :: DiffTime
onExceptionTimeout = DiffTime
1

    register :: StrictTMVar m ()
             -> m ( FetchClientContext header block m
                  , (ThreadId m, StrictTMVar m ()) )
    register :: StrictTMVar m ()
-> m (FetchClientContext header block m,
      (ThreadId m, StrictTMVar m ()))
register StrictTMVar m ()
ksVar = do
      tid <- m (ThreadId m)
forall (m :: * -> *). MonadThread m => m (ThreadId m)
myThreadId
      ctx <- atomically $ do
        -- wait for any potential older blockfetch to finish cleanup
        fr <- readTVar fetchRegistry
        check (peer `Map.notMember` fr)

        -- don't start if keepalive is attempting to die
        dr <- readTVar dyingRegistry
        check (peer `Set.notMember` dr)

        -- blocks until setFetchClientContext is called
        (tracer, mkPolicy) <- readTMVar ctxVar

        -- wait for and register with keepAlive
        dqPeers <- readTVar dqRegistry
        check (peer `Map.member` dqPeers)
        modifyTVar keepRegistry $ \Map peer (ThreadId m, StrictTMVar m ())
m ->
          Bool
-> Map peer (ThreadId m, StrictTMVar m ())
-> Map peer (ThreadId m, StrictTMVar m ())
forall a. HasCallStack => Bool -> a -> a
assert (peer
peer peer -> Map peer (ThreadId m, StrictTMVar m ()) -> Bool
forall k a. Ord k => k -> Map k a -> Bool
`Map.notMember` Map peer (ThreadId m, StrictTMVar m ())
m) (Map peer (ThreadId m, StrictTMVar m ())
 -> Map peer (ThreadId m, StrictTMVar m ()))
-> Map peer (ThreadId m, StrictTMVar m ())
-> Map peer (ThreadId m, StrictTMVar m ())
forall a b. (a -> b) -> a -> b
$
          peer
-> (ThreadId m, StrictTMVar m ())
-> Map peer (ThreadId m, StrictTMVar m ())
-> Map peer (ThreadId m, StrictTMVar m ())
forall k a. Ord k => k -> a -> Map k a -> Map k a
Map.insert peer
peer (ThreadId m
tid, StrictTMVar m ()
ksVar) Map peer (ThreadId m, StrictTMVar m ())
m

        -- allocate the policy specific for this peer's negotiated version
        policy <- mkPolicy

        stateVars <- newFetchClientStateVars
        modifyTVar fetchRegistry $ \Map peer (FetchClientStateVars m header)
m ->
          Bool
-> Map peer (FetchClientStateVars m header)
-> Map peer (FetchClientStateVars m header)
forall a. HasCallStack => Bool -> a -> a
assert (peer
peer peer -> Map peer (FetchClientStateVars m header) -> Bool
forall k a. Ord k => k -> Map k a -> Bool
`Map.notMember` Map peer (FetchClientStateVars m header)
m) (Map peer (FetchClientStateVars m header)
 -> Map peer (FetchClientStateVars m header))
-> Map peer (FetchClientStateVars m header)
-> Map peer (FetchClientStateVars m header)
forall a b. (a -> b) -> a -> b
$
          peer
-> FetchClientStateVars m header
-> Map peer (FetchClientStateVars m header)
-> Map peer (FetchClientStateVars m header)
forall k a. Ord k => k -> a -> Map k a -> Map k a
Map.insert peer
peer FetchClientStateVars m header
stateVars Map peer (FetchClientStateVars m header)
m
        return FetchClientContext {
          fetchClientCtxTracer    = contramap (TraceLabelPeer peer) tracer,
          fetchClientCtxPolicy    = policy,
          fetchClientCtxStateVars = stateVars
          }

      -- Now wait for the sync client to start up.
      onException
        (atomically $ do
            syncclients <- readTVar syncRegistry
            case Map.lookup peer syncclients of
                 Maybe (ThreadId m, StrictTMVar m (), StrictTMVar m ())
Nothing -> STM
  m
  (FetchClientContext header block m, (ThreadId m, StrictTMVar m ()))
forall a. STM m a
forall (m :: * -> *) a. MonadSTM m => STM m a
retry
                 Just (ThreadId m
cTid, StrictTMVar m ()
doneVar, StrictTMVar m ()
startVar) -> do
                   StrictTMVar m () -> () -> STM m ()
forall (m :: * -> *) a.
MonadSTM m =>
StrictTMVar m a -> a -> STM m ()
putTMVar StrictTMVar m ()
startVar ()
                   StrictTVar m (PeerFetchStatus header)
-> PeerFetchStatus header -> STM m ()
forall (m :: * -> *) a.
MonadSTM m =>
StrictTVar m a -> a -> STM m ()
writeTVar (FetchClientStateVars m header
-> StrictTVar m (PeerFetchStatus header)
forall (m :: * -> *) header.
FetchClientStateVars m header
-> StrictTVar m (PeerFetchStatus header)
fetchClientStatusVar (FetchClientStateVars m header
 -> StrictTVar m (PeerFetchStatus header))
-> FetchClientStateVars m header
-> StrictTVar m (PeerFetchStatus header)
forall a b. (a -> b) -> a -> b
$ FetchClientContext header block m -> FetchClientStateVars m header
forall header block (m :: * -> *).
FetchClientContext header block m -> FetchClientStateVars m header
fetchClientCtxStateVars FetchClientContext header block m
ctx)
                             (Set (Point header) -> IsIdle -> PeerFetchStatus header
forall header.
Set (Point header) -> IsIdle -> PeerFetchStatus header
PeerFetchStatusReady Set (Point header)
forall a. Set a
Set.empty IsIdle
IsIdle)
                   (FetchClientContext header block m, (ThreadId m, StrictTMVar m ()))
-> STM
     m
     (FetchClientContext header block m, (ThreadId m, StrictTMVar m ()))
forall a. a -> STM m a
forall (m :: * -> *) a. Monad m => a -> m a
return (FetchClientContext header block m
ctx, (ThreadId m
cTid, StrictTMVar m ()
doneVar))
            )

        (atomically $ do
         -- we've been killed before the sync client started, cleanup
         writeTVar (fetchClientStatusVar $ fetchClientCtxStateVars ctx) PeerFetchStatusShutdown
         putTMVar ksVar ()
         modifyTVar keepRegistry $ \Map peer (ThreadId m, StrictTMVar m ())
m ->
           Bool
-> Map peer (ThreadId m, StrictTMVar m ())
-> Map peer (ThreadId m, StrictTMVar m ())
forall a. HasCallStack => Bool -> a -> a
assert (peer
peer peer -> Map peer (ThreadId m, StrictTMVar m ()) -> Bool
forall k a. Ord k => k -> Map k a -> Bool
`Map.member` Map peer (ThreadId m, StrictTMVar m ())
m) (Map peer (ThreadId m, StrictTMVar m ())
 -> Map peer (ThreadId m, StrictTMVar m ()))
-> Map peer (ThreadId m, StrictTMVar m ())
-> Map peer (ThreadId m, StrictTMVar m ())
forall a b. (a -> b) -> a -> b
$
           peer
-> Map peer (ThreadId m, StrictTMVar m ())
-> Map peer (ThreadId m, StrictTMVar m ())
forall k a. Ord k => k -> Map k a -> Map k a
Map.delete peer
peer Map peer (ThreadId m, StrictTMVar m ())
m

         modifyTVar fetchRegistry $ \Map peer (FetchClientStateVars m header)
m ->
           Bool
-> Map peer (FetchClientStateVars m header)
-> Map peer (FetchClientStateVars m header)
forall a. HasCallStack => Bool -> a -> a
assert (peer
peer peer -> Map peer (FetchClientStateVars m header) -> Bool
forall k a. Ord k => k -> Map k a -> Bool
`Map.member` Map peer (FetchClientStateVars m header)
m) (Map peer (FetchClientStateVars m header)
 -> Map peer (FetchClientStateVars m header))
-> Map peer (FetchClientStateVars m header)
-> Map peer (FetchClientStateVars m header)
forall a b. (a -> b) -> a -> b
$
           peer
-> Map peer (FetchClientStateVars m header)
-> Map peer (FetchClientStateVars m header)
forall k a. Ord k => k -> Map k a -> Map k a
Map.delete peer
peer Map peer (FetchClientStateVars m header)
m
         )

    unregister :: StrictTMVar m ()
               -> ( FetchClientContext header block m
                  , (ThreadId m, StrictTMVar m ()) )
               -> ExitCase a
               -> m ()
    unregister :: StrictTMVar m ()
-> (FetchClientContext header block m,
    (ThreadId m, StrictTMVar m ()))
-> ExitCase a
-> m ()
unregister StrictTMVar m ()
ksVar (FetchClientContext { fetchClientCtxStateVars :: forall header block (m :: * -> *).
FetchClientContext header block m -> FetchClientStateVars m header
fetchClientCtxStateVars = FetchClientStateVars m header
stateVars },
                      (ThreadId m
tid, StrictTMVar m ()
doneVar)) ExitCase a
exitCase  = ((forall a. m a -> m a) -> m ()) -> m ()
forall b. ((forall a. m a -> m a) -> m b) -> m b
forall (m :: * -> *) b.
MonadMask m =>
((forall a. m a -> m a) -> m b) -> m b
uninterruptibleMask (((forall a. m a -> m a) -> m ()) -> m ())
-> ((forall a. m a -> m a) -> m ()) -> m ()
forall a b. (a -> b) -> a -> b
$ \forall a. m a -> m a
unmask -> do
      let timeoutLimit :: DiffTime
timeoutLimit = case ExitCase a
exitCase of
                              ExitCaseSuccess a
_ -> DiffTime
deactivateTimeout
                              ExitCase a
_                 -> DiffTime
onExceptionTimeout
      dead <- do
        -- Signal we are shutting down
        dieFast <- STM m Bool -> m Bool
forall a. HasCallStack => STM m a -> m a
forall (m :: * -> *) a.
(MonadSTM m, HasCallStack) =>
STM m a -> m a
atomically (STM m Bool -> m Bool) -> STM m Bool -> m Bool
forall a b. (a -> b) -> a -> b
$ do
          StrictTVar m (PeerFetchStatus header)
-> PeerFetchStatus header -> STM m ()
forall (m :: * -> *) a.
MonadSTM m =>
StrictTVar m a -> a -> STM m ()
writeTVar (FetchClientStateVars m header
-> StrictTVar m (PeerFetchStatus header)
forall (m :: * -> *) header.
FetchClientStateVars m header
-> StrictTVar m (PeerFetchStatus header)
fetchClientStatusVar FetchClientStateVars m header
stateVars) PeerFetchStatus header
forall header. PeerFetchStatus header
PeerFetchStatusShutdown

          dr <- StrictTVar m (Set peer) -> STM m (Set peer)
forall (m :: * -> *) a. MonadSTM m => StrictTVar m a -> STM m a
readTVar StrictTVar m (Set peer)
dyingRegistry
          return $ Set.member peer dr

        -- If keepAlive is dying we don't need to let chainsync exit cleanly
        if dieFast
           then do
             throwTo tid AsyncCancelled
             atomically $ readTMVar doneVar >> cleanup
             return True
           else return False

      if dead
         then return ()
         else do
           -- Give the sync client a chance to exit cleanly before killing it.
           res <- onException
                   (unmask $ timeout timeoutLimit $ atomically $ readTMVar doneVar)
                   (-- no time to wait, die die die!
                    uninterruptibleMask_ $ do
                    throwTo tid AsyncCancelled
                    atomically $ readTMVar doneVar >> cleanup
                   )
           case res of
                  Maybe ()
Nothing -> do
                    ThreadId m -> AsyncCancelled -> m ()
forall e. Exception e => ThreadId m -> e -> m ()
forall (m :: * -> *) e.
(MonadFork m, Exception e) =>
ThreadId m -> e -> m ()
throwTo ThreadId m
tid AsyncCancelled
AsyncCancelled
                    STM m () -> m ()
forall a. HasCallStack => STM m a -> m a
forall (m :: * -> *) a.
(MonadSTM m, HasCallStack) =>
STM m a -> m a
atomically (STM m () -> m ()) -> STM m () -> m ()
forall a b. (a -> b) -> a -> b
$ StrictTMVar m () -> STM m ()
forall (m :: * -> *) a. MonadSTM m => StrictTMVar m a -> STM m a
readTMVar StrictTMVar m ()
doneVar STM m () -> STM m () -> STM m ()
forall a b. STM m a -> STM m b -> STM m b
forall (m :: * -> *) a b. Monad m => m a -> m b -> m b
>> STM m ()
cleanup
                  Just ()
_ -> STM m () -> m ()
forall a. HasCallStack => STM m a -> m a
forall (m :: * -> *) a.
(MonadSTM m, HasCallStack) =>
STM m a -> m a
atomically STM m ()
cleanup
     where
       cleanup :: STM m ()
cleanup = do
         StrictTVar m (Map peer (FetchClientStateVars m header))
-> (Map peer (FetchClientStateVars m header)
    -> Map peer (FetchClientStateVars m header))
-> STM m ()
forall (m :: * -> *) a.
MonadSTM m =>
StrictTVar m a -> (a -> a) -> STM m ()
modifyTVar StrictTVar m (Map peer (FetchClientStateVars m header))
fetchRegistry ((Map peer (FetchClientStateVars m header)
  -> Map peer (FetchClientStateVars m header))
 -> STM m ())
-> (Map peer (FetchClientStateVars m header)
    -> Map peer (FetchClientStateVars m header))
-> STM m ()
forall a b. (a -> b) -> a -> b
$ \Map peer (FetchClientStateVars m header)
m ->
           Bool
-> Map peer (FetchClientStateVars m header)
-> Map peer (FetchClientStateVars m header)
forall a. HasCallStack => Bool -> a -> a
assert (peer
peer peer -> Map peer (FetchClientStateVars m header) -> Bool
forall k a. Ord k => k -> Map k a -> Bool
`Map.member` Map peer (FetchClientStateVars m header)
m) (Map peer (FetchClientStateVars m header)
 -> Map peer (FetchClientStateVars m header))
-> Map peer (FetchClientStateVars m header)
-> Map peer (FetchClientStateVars m header)
forall a b. (a -> b) -> a -> b
$
           peer
-> Map peer (FetchClientStateVars m header)
-> Map peer (FetchClientStateVars m header)
forall k a. Ord k => k -> Map k a -> Map k a
Map.delete peer
peer Map peer (FetchClientStateVars m header)
m

         -- Signal to keepAlive that we're going away
         StrictTMVar m () -> () -> STM m ()
forall (m :: * -> *) a.
MonadSTM m =>
StrictTMVar m a -> a -> STM m ()
putTMVar StrictTMVar m ()
ksVar ()
         StrictTVar m (Map peer (ThreadId m, StrictTMVar m ()))
-> (Map peer (ThreadId m, StrictTMVar m ())
    -> Map peer (ThreadId m, StrictTMVar m ()))
-> STM m ()
forall (m :: * -> *) a.
MonadSTM m =>
StrictTVar m a -> (a -> a) -> STM m ()
modifyTVar StrictTVar m (Map peer (ThreadId m, StrictTMVar m ()))
keepRegistry ((Map peer (ThreadId m, StrictTMVar m ())
  -> Map peer (ThreadId m, StrictTMVar m ()))
 -> STM m ())
-> (Map peer (ThreadId m, StrictTMVar m ())
    -> Map peer (ThreadId m, StrictTMVar m ()))
-> STM m ()
forall a b. (a -> b) -> a -> b
$ \Map peer (ThreadId m, StrictTMVar m ())
m ->
           Bool
-> Map peer (ThreadId m, StrictTMVar m ())
-> Map peer (ThreadId m, StrictTMVar m ())
forall a. HasCallStack => Bool -> a -> a
assert (peer
peer peer -> Map peer (ThreadId m, StrictTMVar m ()) -> Bool
forall k a. Ord k => k -> Map k a -> Bool
`Map.member` Map peer (ThreadId m, StrictTMVar m ())
m) (Map peer (ThreadId m, StrictTMVar m ())
 -> Map peer (ThreadId m, StrictTMVar m ()))
-> Map peer (ThreadId m, StrictTMVar m ())
-> Map peer (ThreadId m, StrictTMVar m ())
forall a b. (a -> b) -> a -> b
$
           peer
-> Map peer (ThreadId m, StrictTMVar m ())
-> Map peer (ThreadId m, StrictTMVar m ())
forall k a. Ord k => k -> Map k a -> Map k a
Map.delete peer
peer Map peer (ThreadId m, StrictTMVar m ())
m


-- | The block fetch and chain sync clients for each peer need to synchronise
-- their startup and shutdown. This bracket operation provides that
-- synchronisation for the chain sync client.
--
-- This must be used for the chain sync client /outside/ of its own state
-- registration and deregistration.
--
bracketSyncWithFetchClient :: forall m a peer header block.
                              (MonadSTM m, MonadFork m, MonadCatch m,
                               Ord peer)
                           => FetchClientRegistry peer header block m
                           -> peer
                           -> m a
                           -> m a
bracketSyncWithFetchClient :: forall (m :: * -> *) a peer header block.
(MonadSTM m, MonadFork m, MonadCatch m, Ord peer) =>
FetchClientRegistry peer header block m -> peer -> m a -> m a
bracketSyncWithFetchClient FetchClientRegistry { StrictTVar
  m (Map peer (ThreadId m, StrictTMVar m (), StrictTMVar m ()))
syncRegistry :: forall peer header block (m :: * -> *).
FetchClientRegistry peer header block m
-> StrictTVar
     m (Map peer (ThreadId m, StrictTMVar m (), StrictTMVar m ()))
syncRegistry :: StrictTVar
  m (Map peer (ThreadId m, StrictTMVar m (), StrictTMVar m ()))
syncRegistry } peer
peer m a
action = do
    doneVar <- m (StrictTMVar m ())
forall (m :: * -> *) a. MonadSTM m => m (StrictTMVar m a)
newEmptyTMVarIO
    startVar <- newEmptyTMVarIO
    bracket_ (register doneVar startVar) (unregister doneVar) action
  where
    -- The goal here is that the block fetch client should be registered
    -- before the sync client starts running.
    --
    -- On the shutdown side, the sync client should stop before the block fetch
    -- is unregistered. This has to happen even if either client is terminated
    -- abnormally or being cancelled (which of course can happen in any order).

    register :: StrictTMVar m () -> StrictTMVar m () -> m ()
    register :: StrictTMVar m () -> StrictTMVar m () -> m ()
register StrictTMVar m ()
doneVar StrictTMVar m ()
startVar = do
      tid <- m (ThreadId m)
forall (m :: * -> *). MonadThread m => m (ThreadId m)
myThreadId
      -- We first register ourselves
      atomically $ do
        -- wait for any potential older chainsync clients to finish cleanup
        sr <- readTVar syncRegistry
        check (peer `Map.notMember` sr)

        modifyTVar syncRegistry $ \Map peer (ThreadId m, StrictTMVar m (), StrictTMVar m ())
m ->
          Bool
-> Map peer (ThreadId m, StrictTMVar m (), StrictTMVar m ())
-> Map peer (ThreadId m, StrictTMVar m (), StrictTMVar m ())
forall a. HasCallStack => Bool -> a -> a
assert (peer
peer peer
-> Map peer (ThreadId m, StrictTMVar m (), StrictTMVar m ())
-> Bool
forall k a. Ord k => k -> Map k a -> Bool
`Map.notMember` Map peer (ThreadId m, StrictTMVar m (), StrictTMVar m ())
m) (Map peer (ThreadId m, StrictTMVar m (), StrictTMVar m ())
 -> Map peer (ThreadId m, StrictTMVar m (), StrictTMVar m ()))
-> Map peer (ThreadId m, StrictTMVar m (), StrictTMVar m ())
-> Map peer (ThreadId m, StrictTMVar m (), StrictTMVar m ())
forall a b. (a -> b) -> a -> b
$
          peer
-> (ThreadId m, StrictTMVar m (), StrictTMVar m ())
-> Map peer (ThreadId m, StrictTMVar m (), StrictTMVar m ())
-> Map peer (ThreadId m, StrictTMVar m (), StrictTMVar m ())
forall k a. Ord k => k -> a -> Map k a -> Map k a
Map.insert peer
peer (ThreadId m
tid, StrictTMVar m ()
doneVar, StrictTMVar m ()
startVar) Map peer (ThreadId m, StrictTMVar m (), StrictTMVar m ())
m
      -- Then we wait for fetch to notice us
      onException (atomically $ readTMVar startVar) (unregister doneVar)

    unregister :: StrictTMVar m () -> m ()
    unregister :: StrictTMVar m () -> m ()
unregister StrictTMVar m ()
doneVar =
      STM m () -> m ()
forall a. HasCallStack => STM m a -> m a
forall (m :: * -> *) a.
(MonadSTM m, HasCallStack) =>
STM m a -> m a
atomically (STM m () -> m ()) -> STM m () -> m ()
forall a b. (a -> b) -> a -> b
$ do
        StrictTMVar m () -> () -> STM m ()
forall (m :: * -> *) a.
MonadSTM m =>
StrictTMVar m a -> a -> STM m ()
putTMVar StrictTMVar m ()
doneVar ()
        StrictTVar
  m (Map peer (ThreadId m, StrictTMVar m (), StrictTMVar m ()))
-> (Map peer (ThreadId m, StrictTMVar m (), StrictTMVar m ())
    -> Map peer (ThreadId m, StrictTMVar m (), StrictTMVar m ()))
-> STM m ()
forall (m :: * -> *) a.
MonadSTM m =>
StrictTVar m a -> (a -> a) -> STM m ()
modifyTVar StrictTVar
  m (Map peer (ThreadId m, StrictTMVar m (), StrictTMVar m ()))
syncRegistry ((Map peer (ThreadId m, StrictTMVar m (), StrictTMVar m ())
  -> Map peer (ThreadId m, StrictTMVar m (), StrictTMVar m ()))
 -> STM m ())
-> (Map peer (ThreadId m, StrictTMVar m (), StrictTMVar m ())
    -> Map peer (ThreadId m, StrictTMVar m (), StrictTMVar m ()))
-> STM m ()
forall a b. (a -> b) -> a -> b
$ \Map peer (ThreadId m, StrictTMVar m (), StrictTMVar m ())
m ->
          Bool
-> Map peer (ThreadId m, StrictTMVar m (), StrictTMVar m ())
-> Map peer (ThreadId m, StrictTMVar m (), StrictTMVar m ())
forall a. HasCallStack => Bool -> a -> a
assert (peer
peer peer
-> Map peer (ThreadId m, StrictTMVar m (), StrictTMVar m ())
-> Bool
forall k a. Ord k => k -> Map k a -> Bool
`Map.member` Map peer (ThreadId m, StrictTMVar m (), StrictTMVar m ())
m) (Map peer (ThreadId m, StrictTMVar m (), StrictTMVar m ())
 -> Map peer (ThreadId m, StrictTMVar m (), StrictTMVar m ()))
-> Map peer (ThreadId m, StrictTMVar m (), StrictTMVar m ())
-> Map peer (ThreadId m, StrictTMVar m (), StrictTMVar m ())
forall a b. (a -> b) -> a -> b
$
          peer
-> Map peer (ThreadId m, StrictTMVar m (), StrictTMVar m ())
-> Map peer (ThreadId m, StrictTMVar m (), StrictTMVar m ())
forall k a. Ord k => k -> Map k a -> Map k a
Map.delete peer
peer Map peer (ThreadId m, StrictTMVar m (), StrictTMVar m ())
m

setFetchClientContext :: MonadSTM m
                      => FetchClientRegistry peer header block m
                      -> Tracer m (TraceLabelPeer peer (TraceFetchClientState header))
                      -> STM m (FetchClientPolicy header block m)
                      -> m ()
setFetchClientContext :: forall (m :: * -> *) peer header block.
MonadSTM m =>
FetchClientRegistry peer header block m
-> Tracer m (TraceLabelPeer peer (TraceFetchClientState header))
-> STM m (FetchClientPolicy header block m)
-> m ()
setFetchClientContext FetchClientRegistry { StrictTMVar
  m
  (Tracer m (TraceLabelPeer peer (TraceFetchClientState header)),
   STM m (FetchClientPolicy header block m))
ctxVar :: forall peer header block (m :: * -> *).
FetchClientRegistry peer header block m
-> StrictTMVar
     m
     (Tracer m (TraceLabelPeer peer (TraceFetchClientState header)),
      STM m (FetchClientPolicy header block m))
ctxVar :: StrictTMVar
  m
  (Tracer m (TraceLabelPeer peer (TraceFetchClientState header)),
   STM m (FetchClientPolicy header block m))
ctxVar } Tracer m (TraceLabelPeer peer (TraceFetchClientState header))
tracer STM m (FetchClientPolicy header block m)
mkPolicy =
    STM m () -> m ()
forall a. HasCallStack => STM m a -> m a
forall (m :: * -> *) a.
(MonadSTM m, HasCallStack) =>
STM m a -> m a
atomically (STM m () -> m ()) -> STM m () -> m ()
forall a b. (a -> b) -> a -> b
$ do
      ok <- StrictTMVar
  m
  (Tracer m (TraceLabelPeer peer (TraceFetchClientState header)),
   STM m (FetchClientPolicy header block m))
-> (Tracer m (TraceLabelPeer peer (TraceFetchClientState header)),
    STM m (FetchClientPolicy header block m))
-> STM m Bool
forall (m :: * -> *) a.
MonadSTM m =>
StrictTMVar m a -> a -> STM m Bool
tryPutTMVar StrictTMVar
  m
  (Tracer m (TraceLabelPeer peer (TraceFetchClientState header)),
   STM m (FetchClientPolicy header block m))
ctxVar (Tracer m (TraceLabelPeer peer (TraceFetchClientState header))
tracer, STM m (FetchClientPolicy header block m)
mkPolicy)
      unless ok $ error "setFetchClientContext: called more than once"

-- | A read-only 'STM' action to get the current 'PeerFetchStatus' for all
-- fetch clients in the 'FetchClientRegistry'.
--
readFetchClientsStatus :: MonadSTM m
                       => FetchClientRegistry peer header block m
                       -> STM m (Map peer (PeerFetchStatus header))
readFetchClientsStatus :: forall (m :: * -> *) peer header block.
MonadSTM m =>
FetchClientRegistry peer header block m
-> STM m (Map peer (PeerFetchStatus header))
readFetchClientsStatus FetchClientRegistry { StrictTVar m (Map peer (FetchClientStateVars m header))
fetchRegistry :: forall peer header block (m :: * -> *).
FetchClientRegistry peer header block m
-> StrictTVar m (Map peer (FetchClientStateVars m header))
fetchRegistry :: StrictTVar m (Map peer (FetchClientStateVars m header))
fetchRegistry } =
  StrictTVar m (Map peer (FetchClientStateVars m header))
-> STM m (Map peer (FetchClientStateVars m header))
forall (m :: * -> *) a. MonadSTM m => StrictTVar m a -> STM m a
readTVar StrictTVar m (Map peer (FetchClientStateVars m header))
fetchRegistry STM m (Map peer (FetchClientStateVars m header))
-> (Map peer (FetchClientStateVars m header)
    -> STM m (Map peer (PeerFetchStatus header)))
-> STM m (Map peer (PeerFetchStatus header))
forall a b. STM m a -> (a -> STM m b) -> STM m b
forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= (FetchClientStateVars m header -> STM m (PeerFetchStatus header))
-> Map peer (FetchClientStateVars m header)
-> STM m (Map peer (PeerFetchStatus header))
forall (t :: * -> *) (f :: * -> *) a b.
(Traversable t, Applicative f) =>
(a -> f b) -> t a -> f (t b)
forall (f :: * -> *) a b.
Applicative f =>
(a -> f b) -> Map peer a -> f (Map peer b)
traverse (StrictTVar m (PeerFetchStatus header)
-> STM m (PeerFetchStatus header)
forall (m :: * -> *) a. MonadSTM m => StrictTVar m a -> STM m a
readTVar (StrictTVar m (PeerFetchStatus header)
 -> STM m (PeerFetchStatus header))
-> (FetchClientStateVars m header
    -> StrictTVar m (PeerFetchStatus header))
-> FetchClientStateVars m header
-> STM m (PeerFetchStatus header)
forall b c a. (b -> c) -> (a -> b) -> a -> c
. FetchClientStateVars m header
-> StrictTVar m (PeerFetchStatus header)
forall (m :: * -> *) header.
FetchClientStateVars m header
-> StrictTVar m (PeerFetchStatus header)
fetchClientStatusVar)

-- | A read-only 'STM' action to get the 'FetchClientStateVars' for all fetch
-- clients in the 'FetchClientRegistry'.
--
readFetchClientsStateVars :: MonadSTM m
                          => FetchClientRegistry peer header block m
                          -> STM m (Map peer (FetchClientStateVars m header))
readFetchClientsStateVars :: forall (m :: * -> *) peer header block.
MonadSTM m =>
FetchClientRegistry peer header block m
-> STM m (Map peer (FetchClientStateVars m header))
readFetchClientsStateVars FetchClientRegistry { StrictTVar m (Map peer (FetchClientStateVars m header))
fetchRegistry :: forall peer header block (m :: * -> *).
FetchClientRegistry peer header block m
-> StrictTVar m (Map peer (FetchClientStateVars m header))
fetchRegistry :: StrictTVar m (Map peer (FetchClientStateVars m header))
fetchRegistry } = StrictTVar m (Map peer (FetchClientStateVars m header))
-> STM m (Map peer (FetchClientStateVars m header))
forall (m :: * -> *) a. MonadSTM m => StrictTVar m a -> STM m a
readTVar StrictTVar m (Map peer (FetchClientStateVars m header))
fetchRegistry