{-# LANGUAGE FlexibleContexts #-}
{-# LANGUAGE ScopedTypeVariables #-}
module Ouroboros.Network.BlockFetch.ClientRegistry
(
FetchClientRegistry (..)
, newFetchClientRegistry
, bracketFetchClient
, bracketKeepAliveClient
, bracketSyncWithFetchClient
, setFetchClientContext
, FetchClientPolicy (..)
, readFetchClientsStatus
, readFetchClientsStateVars
, readPeerGSVs
) where
import Data.Functor.Contravariant (contramap)
import Data.Map (Map)
import Data.Map qualified as Map
import Data.Set (Set)
import Data.Set qualified as Set
import Control.Concurrent.Class.MonadSTM.Strict
import Control.Exception (assert)
import Control.Monad (unless)
import Control.Monad.Class.MonadAsync
import Control.Monad.Class.MonadFork (MonadFork (throwTo),
MonadThread (ThreadId, myThreadId))
import Control.Monad.Class.MonadThrow
import Control.Monad.Class.MonadTimer.SI
import Control.Tracer (Tracer)
import Ouroboros.Network.BlockFetch.ClientState
import Ouroboros.Network.DeltaQ
import Ouroboros.Network.Diffusion.Policies (deactivateTimeout)
data FetchClientRegistry peer header block m =
FetchClientRegistry {
forall peer header block (m :: * -> *).
FetchClientRegistry peer header block m
-> StrictTMVar
m
(Tracer m (TraceLabelPeer peer (TraceFetchClientState header)),
STM m (FetchClientPolicy header block m))
fcrCtxVar
:: StrictTMVar
m ( Tracer m (TraceLabelPeer peer (TraceFetchClientState header))
, STM m (FetchClientPolicy header block m)
),
forall peer header block (m :: * -> *).
FetchClientRegistry peer header block m
-> StrictTVar m (Map peer (FetchClientStateVars m header))
fcrFetchRegistry
:: StrictTVar m (Map peer (FetchClientStateVars m header)),
forall peer header block (m :: * -> *).
FetchClientRegistry peer header block m
-> StrictTVar
m (Map peer (ThreadId m, StrictTMVar m (), StrictTMVar m ()))
fcrSyncRegistry
:: StrictTVar m (Map peer (ThreadId m, StrictTMVar m (), StrictTMVar m ())),
forall peer header block (m :: * -> *).
FetchClientRegistry peer header block m
-> StrictTVar m (Map peer PeerGSV)
fcrDqRegistry
:: StrictTVar m (Map peer PeerGSV),
forall peer header block (m :: * -> *).
FetchClientRegistry peer header block m
-> StrictTVar m (Map peer (ThreadId m, StrictTMVar m ()))
fcrKeepRegistry
:: StrictTVar m (Map peer (ThreadId m, StrictTMVar m ())),
forall peer header block (m :: * -> *).
FetchClientRegistry peer header block m -> StrictTVar m (Set peer)
fcrDying
:: StrictTVar m (Set peer)
}
newFetchClientRegistry :: MonadSTM m
=> m (FetchClientRegistry peer header block m)
newFetchClientRegistry :: forall (m :: * -> *) peer header block.
MonadSTM m =>
m (FetchClientRegistry peer header block m)
newFetchClientRegistry = StrictTMVar
m
(Tracer m (TraceLabelPeer peer (TraceFetchClientState header)),
STM m (FetchClientPolicy header block m))
-> StrictTVar m (Map peer (FetchClientStateVars m header))
-> StrictTVar
m (Map peer (ThreadId m, StrictTMVar m (), StrictTMVar m ()))
-> StrictTVar m (Map peer PeerGSV)
-> StrictTVar m (Map peer (ThreadId m, StrictTMVar m ()))
-> StrictTVar m (Set peer)
-> FetchClientRegistry peer header block m
forall peer header block (m :: * -> *).
StrictTMVar
m
(Tracer m (TraceLabelPeer peer (TraceFetchClientState header)),
STM m (FetchClientPolicy header block m))
-> StrictTVar m (Map peer (FetchClientStateVars m header))
-> StrictTVar
m (Map peer (ThreadId m, StrictTMVar m (), StrictTMVar m ()))
-> StrictTVar m (Map peer PeerGSV)
-> StrictTVar m (Map peer (ThreadId m, StrictTMVar m ()))
-> StrictTVar m (Set peer)
-> FetchClientRegistry peer header block m
FetchClientRegistry (StrictTMVar
m
(Tracer m (TraceLabelPeer peer (TraceFetchClientState header)),
STM m (FetchClientPolicy header block m))
-> StrictTVar m (Map peer (FetchClientStateVars m header))
-> StrictTVar
m (Map peer (ThreadId m, StrictTMVar m (), StrictTMVar m ()))
-> StrictTVar m (Map peer PeerGSV)
-> StrictTVar m (Map peer (ThreadId m, StrictTMVar m ()))
-> StrictTVar m (Set peer)
-> FetchClientRegistry peer header block m)
-> m (StrictTMVar
m
(Tracer m (TraceLabelPeer peer (TraceFetchClientState header)),
STM m (FetchClientPolicy header block m)))
-> m (StrictTVar m (Map peer (FetchClientStateVars m header))
-> StrictTVar
m (Map peer (ThreadId m, StrictTMVar m (), StrictTMVar m ()))
-> StrictTVar m (Map peer PeerGSV)
-> StrictTVar m (Map peer (ThreadId m, StrictTMVar m ()))
-> StrictTVar m (Set peer)
-> FetchClientRegistry peer header block m)
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> m (StrictTMVar
m
(Tracer m (TraceLabelPeer peer (TraceFetchClientState header)),
STM m (FetchClientPolicy header block m)))
forall (m :: * -> *) a. MonadSTM m => m (StrictTMVar m a)
newEmptyTMVarIO
m (StrictTVar m (Map peer (FetchClientStateVars m header))
-> StrictTVar
m (Map peer (ThreadId m, StrictTMVar m (), StrictTMVar m ()))
-> StrictTVar m (Map peer PeerGSV)
-> StrictTVar m (Map peer (ThreadId m, StrictTMVar m ()))
-> StrictTVar m (Set peer)
-> FetchClientRegistry peer header block m)
-> m (StrictTVar m (Map peer (FetchClientStateVars m header)))
-> m (StrictTVar
m (Map peer (ThreadId m, StrictTMVar m (), StrictTMVar m ()))
-> StrictTVar m (Map peer PeerGSV)
-> StrictTVar m (Map peer (ThreadId m, StrictTMVar m ()))
-> StrictTVar m (Set peer)
-> FetchClientRegistry peer header block m)
forall a b. m (a -> b) -> m a -> m b
forall (f :: * -> *) a b. Applicative f => f (a -> b) -> f a -> f b
<*> Map peer (FetchClientStateVars m header)
-> m (StrictTVar m (Map peer (FetchClientStateVars m header)))
forall (m :: * -> *) a. MonadSTM m => a -> m (StrictTVar m a)
newTVarIO Map peer (FetchClientStateVars m header)
forall k a. Map k a
Map.empty
m (StrictTVar
m (Map peer (ThreadId m, StrictTMVar m (), StrictTMVar m ()))
-> StrictTVar m (Map peer PeerGSV)
-> StrictTVar m (Map peer (ThreadId m, StrictTMVar m ()))
-> StrictTVar m (Set peer)
-> FetchClientRegistry peer header block m)
-> m (StrictTVar
m (Map peer (ThreadId m, StrictTMVar m (), StrictTMVar m ())))
-> m (StrictTVar m (Map peer PeerGSV)
-> StrictTVar m (Map peer (ThreadId m, StrictTMVar m ()))
-> StrictTVar m (Set peer)
-> FetchClientRegistry peer header block m)
forall a b. m (a -> b) -> m a -> m b
forall (f :: * -> *) a b. Applicative f => f (a -> b) -> f a -> f b
<*> Map peer (ThreadId m, StrictTMVar m (), StrictTMVar m ())
-> m (StrictTVar
m (Map peer (ThreadId m, StrictTMVar m (), StrictTMVar m ())))
forall (m :: * -> *) a. MonadSTM m => a -> m (StrictTVar m a)
newTVarIO Map peer (ThreadId m, StrictTMVar m (), StrictTMVar m ())
forall k a. Map k a
Map.empty
m (StrictTVar m (Map peer PeerGSV)
-> StrictTVar m (Map peer (ThreadId m, StrictTMVar m ()))
-> StrictTVar m (Set peer)
-> FetchClientRegistry peer header block m)
-> m (StrictTVar m (Map peer PeerGSV))
-> m (StrictTVar m (Map peer (ThreadId m, StrictTMVar m ()))
-> StrictTVar m (Set peer)
-> FetchClientRegistry peer header block m)
forall a b. m (a -> b) -> m a -> m b
forall (f :: * -> *) a b. Applicative f => f (a -> b) -> f a -> f b
<*> Map peer PeerGSV -> m (StrictTVar m (Map peer PeerGSV))
forall (m :: * -> *) a. MonadSTM m => a -> m (StrictTVar m a)
newTVarIO Map peer PeerGSV
forall k a. Map k a
Map.empty
m (StrictTVar m (Map peer (ThreadId m, StrictTMVar m ()))
-> StrictTVar m (Set peer)
-> FetchClientRegistry peer header block m)
-> m (StrictTVar m (Map peer (ThreadId m, StrictTMVar m ())))
-> m (StrictTVar m (Set peer)
-> FetchClientRegistry peer header block m)
forall a b. m (a -> b) -> m a -> m b
forall (f :: * -> *) a b. Applicative f => f (a -> b) -> f a -> f b
<*> Map peer (ThreadId m, StrictTMVar m ())
-> m (StrictTVar m (Map peer (ThreadId m, StrictTMVar m ())))
forall (m :: * -> *) a. MonadSTM m => a -> m (StrictTVar m a)
newTVarIO Map peer (ThreadId m, StrictTMVar m ())
forall k a. Map k a
Map.empty
m (StrictTVar m (Set peer)
-> FetchClientRegistry peer header block m)
-> m (StrictTVar m (Set peer))
-> m (FetchClientRegistry peer header block m)
forall a b. m (a -> b) -> m a -> m b
forall (f :: * -> *) a b. Applicative f => f (a -> b) -> f a -> f b
<*> Set peer -> m (StrictTVar m (Set peer))
forall (m :: * -> *) a. MonadSTM m => a -> m (StrictTVar m a)
newTVarIO Set peer
forall a. Set a
Set.empty
bracketFetchClient :: forall m a peer header block version.
(MonadFork m, MonadMask m, MonadTimer m, Ord peer)
=> FetchClientRegistry peer header block m
-> version
-> peer
-> (FetchClientContext header block m -> m a)
-> m a
bracketFetchClient :: forall (m :: * -> *) a peer header block version.
(MonadFork m, MonadMask m, MonadTimer m, Ord peer) =>
FetchClientRegistry peer header block m
-> version
-> peer
-> (FetchClientContext header block m -> m a)
-> m a
bracketFetchClient (FetchClientRegistry StrictTMVar
m
(Tracer m (TraceLabelPeer peer (TraceFetchClientState header)),
STM m (FetchClientPolicy header block m))
ctxVar
StrictTVar m (Map peer (FetchClientStateVars m header))
fetchRegistry StrictTVar
m (Map peer (ThreadId m, StrictTMVar m (), StrictTMVar m ()))
syncRegistry StrictTVar m (Map peer PeerGSV)
dqRegistry StrictTVar m (Map peer (ThreadId m, StrictTMVar m ()))
keepRegistry StrictTVar m (Set peer)
dyingRegistry)
version
_version peer
peer FetchClientContext header block m -> m a
action = do
ksVar <- m (StrictTMVar m ())
forall (m :: * -> *) a. MonadSTM m => m (StrictTMVar m a)
newEmptyTMVarIO
bracket (register ksVar) (uncurry (unregister ksVar)) (action . fst)
where
register :: StrictTMVar m ()
-> m ( FetchClientContext header block m
, (ThreadId m, StrictTMVar m ()) )
register :: StrictTMVar m ()
-> m (FetchClientContext header block m,
(ThreadId m, StrictTMVar m ()))
register StrictTMVar m ()
ksVar = do
tid <- m (ThreadId m)
forall (m :: * -> *). MonadThread m => m (ThreadId m)
myThreadId
ctx <- atomically $ do
fr <- readTVar fetchRegistry
check (peer `Map.notMember` fr)
dr <- readTVar dyingRegistry
check (peer `Set.notMember` dr)
(tracer, mkPolicy) <- readTMVar ctxVar
dqPeers <- readTVar dqRegistry
check (peer `Map.member` dqPeers)
modifyTVar keepRegistry $ \Map peer (ThreadId m, StrictTMVar m ())
m ->
Bool
-> Map peer (ThreadId m, StrictTMVar m ())
-> Map peer (ThreadId m, StrictTMVar m ())
forall a. HasCallStack => Bool -> a -> a
assert (peer
peer peer -> Map peer (ThreadId m, StrictTMVar m ()) -> Bool
forall k a. Ord k => k -> Map k a -> Bool
`Map.notMember` Map peer (ThreadId m, StrictTMVar m ())
m) (Map peer (ThreadId m, StrictTMVar m ())
-> Map peer (ThreadId m, StrictTMVar m ()))
-> Map peer (ThreadId m, StrictTMVar m ())
-> Map peer (ThreadId m, StrictTMVar m ())
forall a b. (a -> b) -> a -> b
$
peer
-> (ThreadId m, StrictTMVar m ())
-> Map peer (ThreadId m, StrictTMVar m ())
-> Map peer (ThreadId m, StrictTMVar m ())
forall k a. Ord k => k -> a -> Map k a -> Map k a
Map.insert peer
peer (ThreadId m
tid, StrictTMVar m ()
ksVar) Map peer (ThreadId m, StrictTMVar m ())
m
policy <- mkPolicy
stateVars <- newFetchClientStateVars
modifyTVar fetchRegistry $ \Map peer (FetchClientStateVars m header)
m ->
Bool
-> Map peer (FetchClientStateVars m header)
-> Map peer (FetchClientStateVars m header)
forall a. HasCallStack => Bool -> a -> a
assert (peer
peer peer -> Map peer (FetchClientStateVars m header) -> Bool
forall k a. Ord k => k -> Map k a -> Bool
`Map.notMember` Map peer (FetchClientStateVars m header)
m) (Map peer (FetchClientStateVars m header)
-> Map peer (FetchClientStateVars m header))
-> Map peer (FetchClientStateVars m header)
-> Map peer (FetchClientStateVars m header)
forall a b. (a -> b) -> a -> b
$
peer
-> FetchClientStateVars m header
-> Map peer (FetchClientStateVars m header)
-> Map peer (FetchClientStateVars m header)
forall k a. Ord k => k -> a -> Map k a -> Map k a
Map.insert peer
peer FetchClientStateVars m header
stateVars Map peer (FetchClientStateVars m header)
m
return FetchClientContext {
fetchClientCtxTracer = contramap (TraceLabelPeer peer) tracer,
fetchClientCtxPolicy = policy,
fetchClientCtxStateVars = stateVars
}
onException
(atomically $ do
syncclients <- readTVar syncRegistry
case Map.lookup peer syncclients of
Maybe (ThreadId m, StrictTMVar m (), StrictTMVar m ())
Nothing -> STM
m
(FetchClientContext header block m, (ThreadId m, StrictTMVar m ()))
forall a. STM m a
forall (m :: * -> *) a. MonadSTM m => STM m a
retry
Just (ThreadId m
cTid, StrictTMVar m ()
doneVar, StrictTMVar m ()
startVar) -> do
StrictTMVar m () -> () -> STM m ()
forall (m :: * -> *) a.
MonadSTM m =>
StrictTMVar m a -> a -> STM m ()
putTMVar StrictTMVar m ()
startVar ()
StrictTVar m (PeerFetchStatus header)
-> PeerFetchStatus header -> STM m ()
forall (m :: * -> *) a.
MonadSTM m =>
StrictTVar m a -> a -> STM m ()
writeTVar (FetchClientStateVars m header
-> StrictTVar m (PeerFetchStatus header)
forall (m :: * -> *) header.
FetchClientStateVars m header
-> StrictTVar m (PeerFetchStatus header)
fetchClientStatusVar (FetchClientStateVars m header
-> StrictTVar m (PeerFetchStatus header))
-> FetchClientStateVars m header
-> StrictTVar m (PeerFetchStatus header)
forall a b. (a -> b) -> a -> b
$ FetchClientContext header block m -> FetchClientStateVars m header
forall header block (m :: * -> *).
FetchClientContext header block m -> FetchClientStateVars m header
fetchClientCtxStateVars FetchClientContext header block m
ctx)
(Set (Point header) -> IsIdle -> PeerFetchStatus header
forall header.
Set (Point header) -> IsIdle -> PeerFetchStatus header
PeerFetchStatusReady Set (Point header)
forall a. Set a
Set.empty IsIdle
IsIdle)
(FetchClientContext header block m, (ThreadId m, StrictTMVar m ()))
-> STM
m
(FetchClientContext header block m, (ThreadId m, StrictTMVar m ()))
forall a. a -> STM m a
forall (m :: * -> *) a. Monad m => a -> m a
return (FetchClientContext header block m
ctx, (ThreadId m
cTid, StrictTMVar m ()
doneVar))
)
(atomically $ do
writeTVar (fetchClientStatusVar $ fetchClientCtxStateVars ctx) PeerFetchStatusShutdown
putTMVar ksVar ()
modifyTVar keepRegistry $ \Map peer (ThreadId m, StrictTMVar m ())
m ->
Bool
-> Map peer (ThreadId m, StrictTMVar m ())
-> Map peer (ThreadId m, StrictTMVar m ())
forall a. HasCallStack => Bool -> a -> a
assert (peer
peer peer -> Map peer (ThreadId m, StrictTMVar m ()) -> Bool
forall k a. Ord k => k -> Map k a -> Bool
`Map.member` Map peer (ThreadId m, StrictTMVar m ())
m) (Map peer (ThreadId m, StrictTMVar m ())
-> Map peer (ThreadId m, StrictTMVar m ()))
-> Map peer (ThreadId m, StrictTMVar m ())
-> Map peer (ThreadId m, StrictTMVar m ())
forall a b. (a -> b) -> a -> b
$
peer
-> Map peer (ThreadId m, StrictTMVar m ())
-> Map peer (ThreadId m, StrictTMVar m ())
forall k a. Ord k => k -> Map k a -> Map k a
Map.delete peer
peer Map peer (ThreadId m, StrictTMVar m ())
m
modifyTVar fetchRegistry $ \Map peer (FetchClientStateVars m header)
m ->
Bool
-> Map peer (FetchClientStateVars m header)
-> Map peer (FetchClientStateVars m header)
forall a. HasCallStack => Bool -> a -> a
assert (peer
peer peer -> Map peer (FetchClientStateVars m header) -> Bool
forall k a. Ord k => k -> Map k a -> Bool
`Map.member` Map peer (FetchClientStateVars m header)
m) (Map peer (FetchClientStateVars m header)
-> Map peer (FetchClientStateVars m header))
-> Map peer (FetchClientStateVars m header)
-> Map peer (FetchClientStateVars m header)
forall a b. (a -> b) -> a -> b
$
peer
-> Map peer (FetchClientStateVars m header)
-> Map peer (FetchClientStateVars m header)
forall k a. Ord k => k -> Map k a -> Map k a
Map.delete peer
peer Map peer (FetchClientStateVars m header)
m
)
unregister :: StrictTMVar m ()
-> FetchClientContext header block m
-> (ThreadId m, StrictTMVar m ())
-> m ()
unregister :: StrictTMVar m ()
-> FetchClientContext header block m
-> (ThreadId m, StrictTMVar m ())
-> m ()
unregister StrictTMVar m ()
ksVar FetchClientContext { fetchClientCtxStateVars :: forall header block (m :: * -> *).
FetchClientContext header block m -> FetchClientStateVars m header
fetchClientCtxStateVars = FetchClientStateVars m header
stateVars }
(ThreadId m
tid, StrictTMVar m ()
doneVar) = ((forall a. m a -> m a) -> m ()) -> m ()
forall b. ((forall a. m a -> m a) -> m b) -> m b
forall (m :: * -> *) b.
MonadMask m =>
((forall a. m a -> m a) -> m b) -> m b
uninterruptibleMask (((forall a. m a -> m a) -> m ()) -> m ())
-> ((forall a. m a -> m a) -> m ()) -> m ()
forall a b. (a -> b) -> a -> b
$ \forall a. m a -> m a
unmask -> do
dead <- do
dieFast <- STM m Bool -> m Bool
forall a. HasCallStack => STM m a -> m a
forall (m :: * -> *) a.
(MonadSTM m, HasCallStack) =>
STM m a -> m a
atomically (STM m Bool -> m Bool) -> STM m Bool -> m Bool
forall a b. (a -> b) -> a -> b
$ do
StrictTVar m (PeerFetchStatus header)
-> PeerFetchStatus header -> STM m ()
forall (m :: * -> *) a.
MonadSTM m =>
StrictTVar m a -> a -> STM m ()
writeTVar (FetchClientStateVars m header
-> StrictTVar m (PeerFetchStatus header)
forall (m :: * -> *) header.
FetchClientStateVars m header
-> StrictTVar m (PeerFetchStatus header)
fetchClientStatusVar FetchClientStateVars m header
stateVars) PeerFetchStatus header
forall header. PeerFetchStatus header
PeerFetchStatusShutdown
dr <- StrictTVar m (Set peer) -> STM m (Set peer)
forall (m :: * -> *) a. MonadSTM m => StrictTVar m a -> STM m a
readTVar StrictTVar m (Set peer)
dyingRegistry
return $ Set.member peer dr
if dieFast
then do
throwTo tid AsyncCancelled
atomically $ readTMVar doneVar >> cleanup
return True
else return False
if dead
then return ()
else do
res <- onException
(unmask $ timeout deactivateTimeout $ atomically $ readTMVar doneVar)
(
uninterruptibleMask_ $ do
throwTo tid AsyncCancelled
atomically $ readTMVar doneVar >> cleanup
)
case res of
Maybe ()
Nothing -> do
ThreadId m -> AsyncCancelled -> m ()
forall e. Exception e => ThreadId m -> e -> m ()
forall (m :: * -> *) e.
(MonadFork m, Exception e) =>
ThreadId m -> e -> m ()
throwTo ThreadId m
tid AsyncCancelled
AsyncCancelled
STM m () -> m ()
forall a. HasCallStack => STM m a -> m a
forall (m :: * -> *) a.
(MonadSTM m, HasCallStack) =>
STM m a -> m a
atomically (STM m () -> m ()) -> STM m () -> m ()
forall a b. (a -> b) -> a -> b
$ StrictTMVar m () -> STM m ()
forall (m :: * -> *) a. MonadSTM m => StrictTMVar m a -> STM m a
readTMVar StrictTMVar m ()
doneVar STM m () -> STM m () -> STM m ()
forall a b. STM m a -> STM m b -> STM m b
forall (m :: * -> *) a b. Monad m => m a -> m b -> m b
>> STM m ()
cleanup
Just ()
_ -> STM m () -> m ()
forall a. HasCallStack => STM m a -> m a
forall (m :: * -> *) a.
(MonadSTM m, HasCallStack) =>
STM m a -> m a
atomically STM m ()
cleanup
where
cleanup :: STM m ()
cleanup = do
StrictTVar m (Map peer (FetchClientStateVars m header))
-> (Map peer (FetchClientStateVars m header)
-> Map peer (FetchClientStateVars m header))
-> STM m ()
forall (m :: * -> *) a.
MonadSTM m =>
StrictTVar m a -> (a -> a) -> STM m ()
modifyTVar StrictTVar m (Map peer (FetchClientStateVars m header))
fetchRegistry ((Map peer (FetchClientStateVars m header)
-> Map peer (FetchClientStateVars m header))
-> STM m ())
-> (Map peer (FetchClientStateVars m header)
-> Map peer (FetchClientStateVars m header))
-> STM m ()
forall a b. (a -> b) -> a -> b
$ \Map peer (FetchClientStateVars m header)
m ->
Bool
-> Map peer (FetchClientStateVars m header)
-> Map peer (FetchClientStateVars m header)
forall a. HasCallStack => Bool -> a -> a
assert (peer
peer peer -> Map peer (FetchClientStateVars m header) -> Bool
forall k a. Ord k => k -> Map k a -> Bool
`Map.member` Map peer (FetchClientStateVars m header)
m) (Map peer (FetchClientStateVars m header)
-> Map peer (FetchClientStateVars m header))
-> Map peer (FetchClientStateVars m header)
-> Map peer (FetchClientStateVars m header)
forall a b. (a -> b) -> a -> b
$
peer
-> Map peer (FetchClientStateVars m header)
-> Map peer (FetchClientStateVars m header)
forall k a. Ord k => k -> Map k a -> Map k a
Map.delete peer
peer Map peer (FetchClientStateVars m header)
m
StrictTMVar m () -> () -> STM m ()
forall (m :: * -> *) a.
MonadSTM m =>
StrictTMVar m a -> a -> STM m ()
putTMVar StrictTMVar m ()
ksVar ()
StrictTVar m (Map peer (ThreadId m, StrictTMVar m ()))
-> (Map peer (ThreadId m, StrictTMVar m ())
-> Map peer (ThreadId m, StrictTMVar m ()))
-> STM m ()
forall (m :: * -> *) a.
MonadSTM m =>
StrictTVar m a -> (a -> a) -> STM m ()
modifyTVar StrictTVar m (Map peer (ThreadId m, StrictTMVar m ()))
keepRegistry ((Map peer (ThreadId m, StrictTMVar m ())
-> Map peer (ThreadId m, StrictTMVar m ()))
-> STM m ())
-> (Map peer (ThreadId m, StrictTMVar m ())
-> Map peer (ThreadId m, StrictTMVar m ()))
-> STM m ()
forall a b. (a -> b) -> a -> b
$ \Map peer (ThreadId m, StrictTMVar m ())
m ->
Bool
-> Map peer (ThreadId m, StrictTMVar m ())
-> Map peer (ThreadId m, StrictTMVar m ())
forall a. HasCallStack => Bool -> a -> a
assert (peer
peer peer -> Map peer (ThreadId m, StrictTMVar m ()) -> Bool
forall k a. Ord k => k -> Map k a -> Bool
`Map.member` Map peer (ThreadId m, StrictTMVar m ())
m) (Map peer (ThreadId m, StrictTMVar m ())
-> Map peer (ThreadId m, StrictTMVar m ()))
-> Map peer (ThreadId m, StrictTMVar m ())
-> Map peer (ThreadId m, StrictTMVar m ())
forall a b. (a -> b) -> a -> b
$
peer
-> Map peer (ThreadId m, StrictTMVar m ())
-> Map peer (ThreadId m, StrictTMVar m ())
forall k a. Ord k => k -> Map k a -> Map k a
Map.delete peer
peer Map peer (ThreadId m, StrictTMVar m ())
m
bracketSyncWithFetchClient :: forall m a peer header block.
(MonadSTM m, MonadFork m, MonadCatch m,
Ord peer)
=> FetchClientRegistry peer header block m
-> peer
-> m a
-> m a
bracketSyncWithFetchClient :: forall (m :: * -> *) a peer header block.
(MonadSTM m, MonadFork m, MonadCatch m, Ord peer) =>
FetchClientRegistry peer header block m -> peer -> m a -> m a
bracketSyncWithFetchClient (FetchClientRegistry StrictTMVar
m
(Tracer m (TraceLabelPeer peer (TraceFetchClientState header)),
STM m (FetchClientPolicy header block m))
_ctxVar
StrictTVar m (Map peer (FetchClientStateVars m header))
_fetchRegistry StrictTVar
m (Map peer (ThreadId m, StrictTMVar m (), StrictTMVar m ()))
syncRegistry StrictTVar m (Map peer PeerGSV)
_dqRegistry StrictTVar m (Map peer (ThreadId m, StrictTMVar m ()))
_keepRegistry StrictTVar m (Set peer)
_dyingRegistry) peer
peer m a
action = do
doneVar <- m (StrictTMVar m ())
forall (m :: * -> *) a. MonadSTM m => m (StrictTMVar m a)
newEmptyTMVarIO
startVar <- newEmptyTMVarIO
bracket_ (register doneVar startVar) (unregister doneVar) action
where
register :: StrictTMVar m () -> StrictTMVar m () -> m ()
register :: StrictTMVar m () -> StrictTMVar m () -> m ()
register StrictTMVar m ()
doneVar StrictTMVar m ()
startVar = do
tid <- m (ThreadId m)
forall (m :: * -> *). MonadThread m => m (ThreadId m)
myThreadId
atomically $ do
sr <- readTVar syncRegistry
check (peer `Map.notMember` sr)
modifyTVar syncRegistry $ \Map peer (ThreadId m, StrictTMVar m (), StrictTMVar m ())
m ->
Bool
-> Map peer (ThreadId m, StrictTMVar m (), StrictTMVar m ())
-> Map peer (ThreadId m, StrictTMVar m (), StrictTMVar m ())
forall a. HasCallStack => Bool -> a -> a
assert (peer
peer peer
-> Map peer (ThreadId m, StrictTMVar m (), StrictTMVar m ())
-> Bool
forall k a. Ord k => k -> Map k a -> Bool
`Map.notMember` Map peer (ThreadId m, StrictTMVar m (), StrictTMVar m ())
m) (Map peer (ThreadId m, StrictTMVar m (), StrictTMVar m ())
-> Map peer (ThreadId m, StrictTMVar m (), StrictTMVar m ()))
-> Map peer (ThreadId m, StrictTMVar m (), StrictTMVar m ())
-> Map peer (ThreadId m, StrictTMVar m (), StrictTMVar m ())
forall a b. (a -> b) -> a -> b
$
peer
-> (ThreadId m, StrictTMVar m (), StrictTMVar m ())
-> Map peer (ThreadId m, StrictTMVar m (), StrictTMVar m ())
-> Map peer (ThreadId m, StrictTMVar m (), StrictTMVar m ())
forall k a. Ord k => k -> a -> Map k a -> Map k a
Map.insert peer
peer (ThreadId m
tid, StrictTMVar m ()
doneVar, StrictTMVar m ()
startVar) Map peer (ThreadId m, StrictTMVar m (), StrictTMVar m ())
m
onException (atomically $ readTMVar startVar) (unregister doneVar)
unregister :: StrictTMVar m () -> m ()
unregister :: StrictTMVar m () -> m ()
unregister StrictTMVar m ()
doneVar =
STM m () -> m ()
forall a. HasCallStack => STM m a -> m a
forall (m :: * -> *) a.
(MonadSTM m, HasCallStack) =>
STM m a -> m a
atomically (STM m () -> m ()) -> STM m () -> m ()
forall a b. (a -> b) -> a -> b
$ do
StrictTMVar m () -> () -> STM m ()
forall (m :: * -> *) a.
MonadSTM m =>
StrictTMVar m a -> a -> STM m ()
putTMVar StrictTMVar m ()
doneVar ()
StrictTVar
m (Map peer (ThreadId m, StrictTMVar m (), StrictTMVar m ()))
-> (Map peer (ThreadId m, StrictTMVar m (), StrictTMVar m ())
-> Map peer (ThreadId m, StrictTMVar m (), StrictTMVar m ()))
-> STM m ()
forall (m :: * -> *) a.
MonadSTM m =>
StrictTVar m a -> (a -> a) -> STM m ()
modifyTVar StrictTVar
m (Map peer (ThreadId m, StrictTMVar m (), StrictTMVar m ()))
syncRegistry ((Map peer (ThreadId m, StrictTMVar m (), StrictTMVar m ())
-> Map peer (ThreadId m, StrictTMVar m (), StrictTMVar m ()))
-> STM m ())
-> (Map peer (ThreadId m, StrictTMVar m (), StrictTMVar m ())
-> Map peer (ThreadId m, StrictTMVar m (), StrictTMVar m ()))
-> STM m ()
forall a b. (a -> b) -> a -> b
$ \Map peer (ThreadId m, StrictTMVar m (), StrictTMVar m ())
m ->
Bool
-> Map peer (ThreadId m, StrictTMVar m (), StrictTMVar m ())
-> Map peer (ThreadId m, StrictTMVar m (), StrictTMVar m ())
forall a. HasCallStack => Bool -> a -> a
assert (peer
peer peer
-> Map peer (ThreadId m, StrictTMVar m (), StrictTMVar m ())
-> Bool
forall k a. Ord k => k -> Map k a -> Bool
`Map.member` Map peer (ThreadId m, StrictTMVar m (), StrictTMVar m ())
m) (Map peer (ThreadId m, StrictTMVar m (), StrictTMVar m ())
-> Map peer (ThreadId m, StrictTMVar m (), StrictTMVar m ()))
-> Map peer (ThreadId m, StrictTMVar m (), StrictTMVar m ())
-> Map peer (ThreadId m, StrictTMVar m (), StrictTMVar m ())
forall a b. (a -> b) -> a -> b
$
peer
-> Map peer (ThreadId m, StrictTMVar m (), StrictTMVar m ())
-> Map peer (ThreadId m, StrictTMVar m (), StrictTMVar m ())
forall k a. Ord k => k -> Map k a -> Map k a
Map.delete peer
peer Map peer (ThreadId m, StrictTMVar m (), StrictTMVar m ())
m
bracketKeepAliveClient :: forall m a peer header block.
(MonadSTM m, MonadFork m, MonadMask m, Ord peer)
=> FetchClientRegistry peer header block m
-> peer
-> (StrictTVar m (Map peer PeerGSV) -> m a)
-> m a
bracketKeepAliveClient :: forall (m :: * -> *) a peer header block.
(MonadSTM m, MonadFork m, MonadMask m, Ord peer) =>
FetchClientRegistry peer header block m
-> peer -> (StrictTVar m (Map peer PeerGSV) -> m a) -> m a
bracketKeepAliveClient(FetchClientRegistry StrictTMVar
m
(Tracer m (TraceLabelPeer peer (TraceFetchClientState header)),
STM m (FetchClientPolicy header block m))
_ctxVar
StrictTVar m (Map peer (FetchClientStateVars m header))
_fetchRegistry StrictTVar
m (Map peer (ThreadId m, StrictTMVar m (), StrictTMVar m ()))
_syncRegistry StrictTVar m (Map peer PeerGSV)
dqRegistry StrictTVar m (Map peer (ThreadId m, StrictTMVar m ()))
keepRegistry StrictTVar m (Set peer)
dyingRegistry) peer
peer StrictTVar m (Map peer PeerGSV) -> m a
action = do
m () -> m () -> m a -> m a
forall a b c. m a -> m b -> m c -> m c
forall (m :: * -> *) a b c.
MonadThrow m =>
m a -> m b -> m c -> m c
bracket_ m ()
register m ()
unregister (StrictTVar m (Map peer PeerGSV) -> m a
action StrictTVar m (Map peer PeerGSV)
dqRegistry)
where
register :: m ()
register :: m ()
register =
STM m () -> m ()
forall a. HasCallStack => STM m a -> m a
forall (m :: * -> *) a.
(MonadSTM m, HasCallStack) =>
STM m a -> m a
atomically (STM m () -> m ()) -> STM m () -> m ()
forall a b. (a -> b) -> a -> b
$ do
dr <- StrictTVar m (Map peer PeerGSV) -> STM m (Map peer PeerGSV)
forall (m :: * -> *) a. MonadSTM m => StrictTVar m a -> STM m a
readTVar StrictTVar m (Map peer PeerGSV)
dqRegistry
check (peer `Map.notMember` dr)
modifyTVar dqRegistry $ \Map peer PeerGSV
m ->
Bool -> Map peer PeerGSV -> Map peer PeerGSV
forall a. HasCallStack => Bool -> a -> a
assert (peer
peer peer -> Map peer PeerGSV -> Bool
forall k a. Ord k => k -> Map k a -> Bool
`Map.notMember` Map peer PeerGSV
m) (Map peer PeerGSV -> Map peer PeerGSV)
-> Map peer PeerGSV -> Map peer PeerGSV
forall a b. (a -> b) -> a -> b
$
peer -> PeerGSV -> Map peer PeerGSV -> Map peer PeerGSV
forall k a. Ord k => k -> a -> Map k a -> Map k a
Map.insert peer
peer PeerGSV
defaultGSV Map peer PeerGSV
m
unregister :: m ()
unregister :: m ()
unregister = m () -> m ()
forall a. m a -> m a
forall (m :: * -> *) a. MonadMask m => m a -> m a
uninterruptibleMask_ (m () -> m ()) -> m () -> m ()
forall a b. (a -> b) -> a -> b
$ do
fetchclient_m <- STM m (Maybe (ThreadId m, StrictTMVar m ()))
-> m (Maybe (ThreadId m, StrictTMVar m ()))
forall a. HasCallStack => STM m a -> m a
forall (m :: * -> *) a.
(MonadSTM m, HasCallStack) =>
STM m a -> m a
atomically (STM m (Maybe (ThreadId m, StrictTMVar m ()))
-> m (Maybe (ThreadId m, StrictTMVar m ())))
-> STM m (Maybe (ThreadId m, StrictTMVar m ()))
-> m (Maybe (ThreadId m, StrictTMVar m ()))
forall a b. (a -> b) -> a -> b
$ do
fetchclients <- StrictTVar m (Map peer (ThreadId m, StrictTMVar m ()))
-> STM m (Map peer (ThreadId m, StrictTMVar m ()))
forall (m :: * -> *) a. MonadSTM m => StrictTVar m a -> STM m a
readTVar StrictTVar m (Map peer (ThreadId m, StrictTMVar m ()))
keepRegistry
case Map.lookup peer fetchclients of
Maybe (ThreadId m, StrictTMVar m ())
Nothing -> do
StrictTVar m (Map peer PeerGSV)
-> (Map peer PeerGSV -> Map peer PeerGSV) -> STM m ()
forall (m :: * -> *) a.
MonadSTM m =>
StrictTVar m a -> (a -> a) -> STM m ()
modifyTVar StrictTVar m (Map peer PeerGSV)
dqRegistry ((Map peer PeerGSV -> Map peer PeerGSV) -> STM m ())
-> (Map peer PeerGSV -> Map peer PeerGSV) -> STM m ()
forall a b. (a -> b) -> a -> b
$ \Map peer PeerGSV
m ->
Bool -> Map peer PeerGSV -> Map peer PeerGSV
forall a. HasCallStack => Bool -> a -> a
assert (peer
peer peer -> Map peer PeerGSV -> Bool
forall k a. Ord k => k -> Map k a -> Bool
`Map.member` Map peer PeerGSV
m) (Map peer PeerGSV -> Map peer PeerGSV)
-> Map peer PeerGSV -> Map peer PeerGSV
forall a b. (a -> b) -> a -> b
$
peer -> Map peer PeerGSV -> Map peer PeerGSV
forall k a. Ord k => k -> Map k a -> Map k a
Map.delete peer
peer Map peer PeerGSV
m
Maybe (ThreadId m, StrictTMVar m ())
-> STM m (Maybe (ThreadId m, StrictTMVar m ()))
forall a. a -> STM m a
forall (m :: * -> *) a. Monad m => a -> m a
return Maybe (ThreadId m, StrictTMVar m ())
forall a. Maybe a
Nothing
Just (ThreadId m, StrictTMVar m ())
rc -> do
StrictTVar m (Set peer) -> (Set peer -> Set peer) -> STM m ()
forall (m :: * -> *) a.
MonadSTM m =>
StrictTVar m a -> (a -> a) -> STM m ()
modifyTVar StrictTVar m (Set peer)
dyingRegistry ((Set peer -> Set peer) -> STM m ())
-> (Set peer -> Set peer) -> STM m ()
forall a b. (a -> b) -> a -> b
$ \Set peer
s ->
Bool -> Set peer -> Set peer
forall a. HasCallStack => Bool -> a -> a
assert (peer
peer peer -> Set peer -> Bool
forall a. Ord a => a -> Set a -> Bool
`Set.notMember` Set peer
s) (Set peer -> Set peer) -> Set peer -> Set peer
forall a b. (a -> b) -> a -> b
$
peer -> Set peer -> Set peer
forall a. Ord a => a -> Set a -> Set a
Set.insert peer
peer Set peer
s
Maybe (ThreadId m, StrictTMVar m ())
-> STM m (Maybe (ThreadId m, StrictTMVar m ()))
forall a. a -> STM m a
forall (m :: * -> *) a. Monad m => a -> m a
return (Maybe (ThreadId m, StrictTMVar m ())
-> STM m (Maybe (ThreadId m, StrictTMVar m ())))
-> Maybe (ThreadId m, StrictTMVar m ())
-> STM m (Maybe (ThreadId m, StrictTMVar m ()))
forall a b. (a -> b) -> a -> b
$ (ThreadId m, StrictTMVar m ())
-> Maybe (ThreadId m, StrictTMVar m ())
forall a. a -> Maybe a
Just (ThreadId m, StrictTMVar m ())
rc
case fetchclient_m of
Maybe (ThreadId m, StrictTMVar m ())
Nothing -> () -> m ()
forall a. a -> m a
forall (m :: * -> *) a. Monad m => a -> m a
return ()
Just (ThreadId m
tid, StrictTMVar m ()
doneVar) -> do
ThreadId m -> AsyncCancelled -> m ()
forall e. Exception e => ThreadId m -> e -> m ()
forall (m :: * -> *) e.
(MonadFork m, Exception e) =>
ThreadId m -> e -> m ()
throwTo ThreadId m
tid AsyncCancelled
AsyncCancelled
STM m () -> m ()
forall a. HasCallStack => STM m a -> m a
forall (m :: * -> *) a.
(MonadSTM m, HasCallStack) =>
STM m a -> m a
atomically (STM m () -> m ()) -> STM m () -> m ()
forall a b. (a -> b) -> a -> b
$ do
StrictTMVar m () -> STM m ()
forall (m :: * -> *) a. MonadSTM m => StrictTMVar m a -> STM m a
readTMVar StrictTMVar m ()
doneVar
StrictTVar m (Map peer PeerGSV)
-> (Map peer PeerGSV -> Map peer PeerGSV) -> STM m ()
forall (m :: * -> *) a.
MonadSTM m =>
StrictTVar m a -> (a -> a) -> STM m ()
modifyTVar StrictTVar m (Map peer PeerGSV)
dqRegistry ((Map peer PeerGSV -> Map peer PeerGSV) -> STM m ())
-> (Map peer PeerGSV -> Map peer PeerGSV) -> STM m ()
forall a b. (a -> b) -> a -> b
$ \Map peer PeerGSV
m ->
Bool -> Map peer PeerGSV -> Map peer PeerGSV
forall a. HasCallStack => Bool -> a -> a
assert (peer
peer peer -> Map peer PeerGSV -> Bool
forall k a. Ord k => k -> Map k a -> Bool
`Map.member` Map peer PeerGSV
m) (Map peer PeerGSV -> Map peer PeerGSV)
-> Map peer PeerGSV -> Map peer PeerGSV
forall a b. (a -> b) -> a -> b
$
peer -> Map peer PeerGSV -> Map peer PeerGSV
forall k a. Ord k => k -> Map k a -> Map k a
Map.delete peer
peer Map peer PeerGSV
m
StrictTVar m (Set peer) -> (Set peer -> Set peer) -> STM m ()
forall (m :: * -> *) a.
MonadSTM m =>
StrictTVar m a -> (a -> a) -> STM m ()
modifyTVar StrictTVar m (Set peer)
dyingRegistry ((Set peer -> Set peer) -> STM m ())
-> (Set peer -> Set peer) -> STM m ()
forall a b. (a -> b) -> a -> b
$ \Set peer
s ->
Bool -> Set peer -> Set peer
forall a. HasCallStack => Bool -> a -> a
assert (peer
peer peer -> Set peer -> Bool
forall a. Ord a => a -> Set a -> Bool
`Set.member` Set peer
s) (Set peer -> Set peer) -> Set peer -> Set peer
forall a b. (a -> b) -> a -> b
$
peer -> Set peer -> Set peer
forall a. Ord a => a -> Set a -> Set a
Set.delete peer
peer Set peer
s
setFetchClientContext :: MonadSTM m
=> FetchClientRegistry peer header block m
-> Tracer m (TraceLabelPeer peer (TraceFetchClientState header))
-> STM m (FetchClientPolicy header block m)
-> m ()
setFetchClientContext :: forall (m :: * -> *) peer header block.
MonadSTM m =>
FetchClientRegistry peer header block m
-> Tracer m (TraceLabelPeer peer (TraceFetchClientState header))
-> STM m (FetchClientPolicy header block m)
-> m ()
setFetchClientContext (FetchClientRegistry StrictTMVar
m
(Tracer m (TraceLabelPeer peer (TraceFetchClientState header)),
STM m (FetchClientPolicy header block m))
ctxVar StrictTVar m (Map peer (FetchClientStateVars m header))
_ StrictTVar
m (Map peer (ThreadId m, StrictTMVar m (), StrictTMVar m ()))
_ StrictTVar m (Map peer PeerGSV)
_ StrictTVar m (Map peer (ThreadId m, StrictTMVar m ()))
_ StrictTVar m (Set peer)
_) Tracer m (TraceLabelPeer peer (TraceFetchClientState header))
tracer STM m (FetchClientPolicy header block m)
mkPolicy =
STM m () -> m ()
forall a. HasCallStack => STM m a -> m a
forall (m :: * -> *) a.
(MonadSTM m, HasCallStack) =>
STM m a -> m a
atomically (STM m () -> m ()) -> STM m () -> m ()
forall a b. (a -> b) -> a -> b
$ do
ok <- StrictTMVar
m
(Tracer m (TraceLabelPeer peer (TraceFetchClientState header)),
STM m (FetchClientPolicy header block m))
-> (Tracer m (TraceLabelPeer peer (TraceFetchClientState header)),
STM m (FetchClientPolicy header block m))
-> STM m Bool
forall (m :: * -> *) a.
MonadSTM m =>
StrictTMVar m a -> a -> STM m Bool
tryPutTMVar StrictTMVar
m
(Tracer m (TraceLabelPeer peer (TraceFetchClientState header)),
STM m (FetchClientPolicy header block m))
ctxVar (Tracer m (TraceLabelPeer peer (TraceFetchClientState header))
tracer, STM m (FetchClientPolicy header block m)
mkPolicy)
unless ok $ error "setFetchClientContext: called more than once"
readFetchClientsStatus :: MonadSTM m
=> FetchClientRegistry peer header block m
-> STM m (Map peer (PeerFetchStatus header))
readFetchClientsStatus :: forall (m :: * -> *) peer header block.
MonadSTM m =>
FetchClientRegistry peer header block m
-> STM m (Map peer (PeerFetchStatus header))
readFetchClientsStatus (FetchClientRegistry StrictTMVar
m
(Tracer m (TraceLabelPeer peer (TraceFetchClientState header)),
STM m (FetchClientPolicy header block m))
_ StrictTVar m (Map peer (FetchClientStateVars m header))
registry StrictTVar
m (Map peer (ThreadId m, StrictTMVar m (), StrictTMVar m ()))
_ StrictTVar m (Map peer PeerGSV)
_ StrictTVar m (Map peer (ThreadId m, StrictTMVar m ()))
_ StrictTVar m (Set peer)
_) =
StrictTVar m (Map peer (FetchClientStateVars m header))
-> STM m (Map peer (FetchClientStateVars m header))
forall (m :: * -> *) a. MonadSTM m => StrictTVar m a -> STM m a
readTVar StrictTVar m (Map peer (FetchClientStateVars m header))
registry STM m (Map peer (FetchClientStateVars m header))
-> (Map peer (FetchClientStateVars m header)
-> STM m (Map peer (PeerFetchStatus header)))
-> STM m (Map peer (PeerFetchStatus header))
forall a b. STM m a -> (a -> STM m b) -> STM m b
forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= (FetchClientStateVars m header -> STM m (PeerFetchStatus header))
-> Map peer (FetchClientStateVars m header)
-> STM m (Map peer (PeerFetchStatus header))
forall (t :: * -> *) (f :: * -> *) a b.
(Traversable t, Applicative f) =>
(a -> f b) -> t a -> f (t b)
forall (f :: * -> *) a b.
Applicative f =>
(a -> f b) -> Map peer a -> f (Map peer b)
traverse (StrictTVar m (PeerFetchStatus header)
-> STM m (PeerFetchStatus header)
forall (m :: * -> *) a. MonadSTM m => StrictTVar m a -> STM m a
readTVar (StrictTVar m (PeerFetchStatus header)
-> STM m (PeerFetchStatus header))
-> (FetchClientStateVars m header
-> StrictTVar m (PeerFetchStatus header))
-> FetchClientStateVars m header
-> STM m (PeerFetchStatus header)
forall b c a. (b -> c) -> (a -> b) -> a -> c
. FetchClientStateVars m header
-> StrictTVar m (PeerFetchStatus header)
forall (m :: * -> *) header.
FetchClientStateVars m header
-> StrictTVar m (PeerFetchStatus header)
fetchClientStatusVar)
readFetchClientsStateVars :: MonadSTM m
=> FetchClientRegistry peer header block m
-> STM m (Map peer (FetchClientStateVars m header))
readFetchClientsStateVars :: forall (m :: * -> *) peer header block.
MonadSTM m =>
FetchClientRegistry peer header block m
-> STM m (Map peer (FetchClientStateVars m header))
readFetchClientsStateVars (FetchClientRegistry StrictTMVar
m
(Tracer m (TraceLabelPeer peer (TraceFetchClientState header)),
STM m (FetchClientPolicy header block m))
_ StrictTVar m (Map peer (FetchClientStateVars m header))
registry StrictTVar
m (Map peer (ThreadId m, StrictTMVar m (), StrictTMVar m ()))
_ StrictTVar m (Map peer PeerGSV)
_ StrictTVar m (Map peer (ThreadId m, StrictTMVar m ()))
_ StrictTVar m (Set peer)
_) = StrictTVar m (Map peer (FetchClientStateVars m header))
-> STM m (Map peer (FetchClientStateVars m header))
forall (m :: * -> *) a. MonadSTM m => StrictTVar m a -> STM m a
readTVar StrictTVar m (Map peer (FetchClientStateVars m header))
registry
readPeerGSVs :: forall block header m peer.
( MonadSTM m, Ord peer)
=> FetchClientRegistry peer header block m
-> STM m (Map peer PeerGSV)
readPeerGSVs :: forall block header (m :: * -> *) peer.
(MonadSTM m, Ord peer) =>
FetchClientRegistry peer header block m -> STM m (Map peer PeerGSV)
readPeerGSVs (FetchClientRegistry StrictTMVar
m
(Tracer m (TraceLabelPeer peer (TraceFetchClientState header)),
STM m (FetchClientPolicy header block m))
_ StrictTVar m (Map peer (FetchClientStateVars m header))
_ StrictTVar
m (Map peer (ThreadId m, StrictTMVar m (), StrictTMVar m ()))
_ StrictTVar m (Map peer PeerGSV)
dqRegistry StrictTVar m (Map peer (ThreadId m, StrictTMVar m ()))
keepRegistry StrictTVar m (Set peer)
_) = do
dr <- StrictTVar m (Map peer PeerGSV) -> STM m (Map peer PeerGSV)
forall (m :: * -> *) a. MonadSTM m => StrictTVar m a -> STM m a
readTVar StrictTVar m (Map peer PeerGSV)
dqRegistry
kr <- readTVar keepRegistry
return $ Map.intersection dr kr