{-# LANGUAGE DataKinds #-}
{-# LANGUAGE DeriveGeneric #-}
{-# LANGUAGE FlexibleContexts #-}
{-# LANGUAGE KindSignatures #-}
{-# LANGUAGE NamedFieldPuns #-}
{-# LANGUAGE PolyKinds #-}
{-# LANGUAGE ScopedTypeVariables #-}
module Ouroboros.Network.MockNode where
import Control.Exception (assert)
import Control.Monad
import Data.Hashable
import Data.List qualified as List
import Data.Maybe (catMaybes)
import Data.Tuple (swap)
import GHC.Generics (Generic)
import Control.Concurrent.Class.MonadSTM.Strict
import Control.Monad.Class.MonadFork
import Control.Monad.Class.MonadSay
import Control.Monad.Class.MonadThrow
import Control.Monad.Class.MonadTimer.SI
import Control.Tracer (nullTracer)
import Network.TypedProtocol.Codec
import Network.TypedProtocol.Peer.Server
import Ouroboros.Network.Block
import Ouroboros.Network.Channel
import Ouroboros.Network.Driver
import Ouroboros.Network.Util.ShowProxy
import Ouroboros.Network.Point (WithOrigin (At))
import Ouroboros.Network.Protocol.ChainSync.Client
import Ouroboros.Network.Protocol.ChainSync.Codec (codecChainSyncId)
import Ouroboros.Network.Protocol.ChainSync.Examples
import Ouroboros.Network.Protocol.ChainSync.Server
import Ouroboros.Network.Protocol.ChainSync.Type
import Ouroboros.Network.Mock.Chain (Chain (..))
import Ouroboros.Network.Mock.Chain qualified as Chain
import Ouroboros.Network.Mock.ConcreteBlock hiding (fixupBlock)
import Ouroboros.Network.Mock.ConcreteBlock qualified as Concrete
import Ouroboros.Network.Mock.ProducerState (ChainProducerState (..),
initChainProducerState, producerChain, switchFork)
data NodeId = CoreId Int
| RelayId Int
deriving (NodeId -> NodeId -> Bool
(NodeId -> NodeId -> Bool)
-> (NodeId -> NodeId -> Bool) -> Eq NodeId
forall a. (a -> a -> Bool) -> (a -> a -> Bool) -> Eq a
$c== :: NodeId -> NodeId -> Bool
== :: NodeId -> NodeId -> Bool
$c/= :: NodeId -> NodeId -> Bool
/= :: NodeId -> NodeId -> Bool
Eq, Eq NodeId
Eq NodeId =>
(NodeId -> NodeId -> Ordering)
-> (NodeId -> NodeId -> Bool)
-> (NodeId -> NodeId -> Bool)
-> (NodeId -> NodeId -> Bool)
-> (NodeId -> NodeId -> Bool)
-> (NodeId -> NodeId -> NodeId)
-> (NodeId -> NodeId -> NodeId)
-> Ord NodeId
NodeId -> NodeId -> Bool
NodeId -> NodeId -> Ordering
NodeId -> NodeId -> NodeId
forall a.
Eq a =>
(a -> a -> Ordering)
-> (a -> a -> Bool)
-> (a -> a -> Bool)
-> (a -> a -> Bool)
-> (a -> a -> Bool)
-> (a -> a -> a)
-> (a -> a -> a)
-> Ord a
$ccompare :: NodeId -> NodeId -> Ordering
compare :: NodeId -> NodeId -> Ordering
$c< :: NodeId -> NodeId -> Bool
< :: NodeId -> NodeId -> Bool
$c<= :: NodeId -> NodeId -> Bool
<= :: NodeId -> NodeId -> Bool
$c> :: NodeId -> NodeId -> Bool
> :: NodeId -> NodeId -> Bool
$c>= :: NodeId -> NodeId -> Bool
>= :: NodeId -> NodeId -> Bool
$cmax :: NodeId -> NodeId -> NodeId
max :: NodeId -> NodeId -> NodeId
$cmin :: NodeId -> NodeId -> NodeId
min :: NodeId -> NodeId -> NodeId
Ord, Int -> NodeId -> ShowS
[NodeId] -> ShowS
NodeId -> String
(Int -> NodeId -> ShowS)
-> (NodeId -> String) -> ([NodeId] -> ShowS) -> Show NodeId
forall a.
(Int -> a -> ShowS) -> (a -> String) -> ([a] -> ShowS) -> Show a
$cshowsPrec :: Int -> NodeId -> ShowS
showsPrec :: Int -> NodeId -> ShowS
$cshow :: NodeId -> String
show :: NodeId -> String
$cshowList :: [NodeId] -> ShowS
showList :: [NodeId] -> ShowS
Show, (forall x. NodeId -> Rep NodeId x)
-> (forall x. Rep NodeId x -> NodeId) -> Generic NodeId
forall x. Rep NodeId x -> NodeId
forall x. NodeId -> Rep NodeId x
forall a.
(forall x. a -> Rep a x) -> (forall x. Rep a x -> a) -> Generic a
$cfrom :: forall x. NodeId -> Rep NodeId x
from :: forall x. NodeId -> Rep NodeId x
$cto :: forall x. Rep NodeId x -> NodeId
to :: forall x. Rep NodeId x -> NodeId
Generic)
instance Hashable NodeId
longestChainSelection :: forall block m.
( HasHeader block
, MonadSTM m
)
=> [StrictTVar m (Maybe (Chain block))]
-> StrictTVar m (ChainProducerState block)
-> m ()
longestChainSelection :: forall block (m :: * -> *).
(HasHeader block, MonadSTM m) =>
[StrictTVar m (Maybe (Chain block))]
-> StrictTVar m (ChainProducerState block) -> m ()
longestChainSelection [StrictTVar m (Maybe (Chain block))]
candidateChainVars StrictTVar m (ChainProducerState block)
cpsVar =
m () -> m ()
forall (f :: * -> *) a b. Applicative f => f a -> f b
forever (STM m () -> m ()
forall a. HasCallStack => STM m a -> m a
forall (m :: * -> *) a.
(MonadSTM m, HasCallStack) =>
STM m a -> m a
atomically STM m ()
updateCurrentChain)
where
updateCurrentChain :: STM m ()
updateCurrentChain :: STM m ()
updateCurrentChain = do
candidateChains <- (StrictTVar m (Maybe (Chain block)) -> STM m (Maybe (Chain block)))
-> [StrictTVar m (Maybe (Chain block))]
-> STM m [Maybe (Chain block)]
forall (t :: * -> *) (m :: * -> *) a b.
(Traversable t, Monad m) =>
(a -> m b) -> t a -> m (t b)
forall (m :: * -> *) a b. Monad m => (a -> m b) -> [a] -> m [b]
mapM StrictTVar m (Maybe (Chain block)) -> STM m (Maybe (Chain block))
forall (m :: * -> *) a. MonadSTM m => StrictTVar m a -> STM m a
readTVar [StrictTVar m (Maybe (Chain block))]
candidateChainVars
cps@ChainProducerState{chainState = chain} <- readTVar cpsVar
let
chain' = (Chain block -> Chain block -> Chain block)
-> Chain block -> [Chain block] -> Chain block
forall b a. (b -> a -> b) -> b -> [a] -> b
forall (t :: * -> *) b a.
Foldable t =>
(b -> a -> b) -> b -> t a -> b
List.foldl' Chain block -> Chain block -> Chain block
forall block.
HasHeader block =>
Chain block -> Chain block -> Chain block
Chain.selectChain Chain block
chain ([Maybe (Chain block)] -> [Chain block]
forall a. [Maybe a] -> [a]
catMaybes [Maybe (Chain block)]
candidateChains)
if Chain.headPoint chain' == Chain.headPoint chain
then retry
else writeTVar cpsVar (switchFork chain' cps)
chainValidation :: forall block m. (HasFullHeader block, MonadSTM m)
=> StrictTVar m (Chain block)
-> StrictTVar m (Maybe (Chain block))
-> m ()
chainValidation :: forall block (m :: * -> *).
(HasFullHeader block, MonadSTM m) =>
StrictTVar m (Chain block)
-> StrictTVar m (Maybe (Chain block)) -> m ()
chainValidation StrictTVar m (Chain block)
peerChainVar StrictTVar m (Maybe (Chain block))
candidateChainVar = do
st <- STM m (StrictTVar m (Point block))
-> m (StrictTVar m (Point block))
forall a. HasCallStack => STM m a -> m a
forall (m :: * -> *) a.
(MonadSTM m, HasCallStack) =>
STM m a -> m a
atomically (Point block -> STM m (StrictTVar m (Point block))
forall (m :: * -> *) a. MonadSTM m => a -> STM m (StrictTVar m a)
newTVar Point block
forall {k} (block :: k). Point block
genesisPoint)
forever (atomically (update st))
where
update :: StrictTVar m (Point block) -> STM m ()
update :: StrictTVar m (Point block) -> STM m ()
update StrictTVar m (Point block)
stateVar = do
peerChain <- StrictTVar m (Chain block) -> STM m (Chain block)
forall (m :: * -> *) a. MonadSTM m => StrictTVar m a -> STM m a
readTVar StrictTVar m (Chain block)
peerChainVar
candidatePoint <- readTVar stateVar
check (Chain.headPoint peerChain /= candidatePoint)
writeTVar stateVar (Chain.headPoint peerChain)
let candidateChain | Chain block -> Bool
forall block. HasFullHeader block => Chain block -> Bool
Chain.valid Chain block
peerChain = Chain block -> Maybe (Chain block)
forall a. a -> Maybe a
Just Chain block
peerChain
| Bool
otherwise = Maybe (Chain block)
forall a. Maybe a
Nothing
writeTVar candidateChainVar candidateChain
data NodeChannels m block tip = NodeChannels
{ forall {k} {k2} (m :: * -> *) (block :: k) (tip :: k2).
NodeChannels m block tip
-> [Channel m (AnyMessage (ChainSync block (Point block) tip))]
consumerChans :: [Channel m (AnyMessage (ChainSync block (Point block) tip))]
, forall {k} {k2} (m :: * -> *) (block :: k) (tip :: k2).
NodeChannels m block tip
-> [Channel m (AnyMessage (ChainSync block (Point block) tip))]
producerChans :: [Channel m (AnyMessage (ChainSync block (Point block) tip))]
}
instance Semigroup (NodeChannels m block tip) where
NodeChannels [Channel m (AnyMessage (ChainSync block (Point block) tip))]
c1 [Channel m (AnyMessage (ChainSync block (Point block) tip))]
p1 <> :: NodeChannels m block tip
-> NodeChannels m block tip -> NodeChannels m block tip
<> NodeChannels [Channel m (AnyMessage (ChainSync block (Point block) tip))]
c2 [Channel m (AnyMessage (ChainSync block (Point block) tip))]
p2 = [Channel m (AnyMessage (ChainSync block (Point block) tip))]
-> [Channel m (AnyMessage (ChainSync block (Point block) tip))]
-> NodeChannels m block tip
forall {k} {k2} (m :: * -> *) (block :: k) (tip :: k2).
[Channel m (AnyMessage (ChainSync block (Point block) tip))]
-> [Channel m (AnyMessage (ChainSync block (Point block) tip))]
-> NodeChannels m block tip
NodeChannels ([Channel m (AnyMessage (ChainSync block (Point block) tip))]
c1 [Channel m (AnyMessage (ChainSync block (Point block) tip))]
-> [Channel m (AnyMessage (ChainSync block (Point block) tip))]
-> [Channel m (AnyMessage (ChainSync block (Point block) tip))]
forall a. [a] -> [a] -> [a]
++ [Channel m (AnyMessage (ChainSync block (Point block) tip))]
c2) ([Channel m (AnyMessage (ChainSync block (Point block) tip))]
p1 [Channel m (AnyMessage (ChainSync block (Point block) tip))]
-> [Channel m (AnyMessage (ChainSync block (Point block) tip))]
-> [Channel m (AnyMessage (ChainSync block (Point block) tip))]
forall a. [a] -> [a] -> [a]
++ [Channel m (AnyMessage (ChainSync block (Point block) tip))]
p2)
instance Monoid (NodeChannels m block tip) where
mempty :: NodeChannels m block tip
mempty = [Channel m (AnyMessage (ChainSync block (Point block) tip))]
-> [Channel m (AnyMessage (ChainSync block (Point block) tip))]
-> NodeChannels m block tip
forall {k} {k2} (m :: * -> *) (block :: k) (tip :: k2).
[Channel m (AnyMessage (ChainSync block (Point block) tip))]
-> [Channel m (AnyMessage (ChainSync block (Point block) tip))]
-> NodeChannels m block tip
NodeChannels [] []
createOneWaySubscriptionChannels
:: forall block tip m.
( MonadSTM m
, MonadDelay m
)
=> DiffTime
-> DiffTime
-> m (NodeChannels m block tip, NodeChannels m block tip)
createOneWaySubscriptionChannels :: forall {k} {k2} (block :: k) (tip :: k2) (m :: * -> *).
(MonadSTM m, MonadDelay m) =>
DiffTime
-> DiffTime
-> m (NodeChannels m block tip, NodeChannels m block tip)
createOneWaySubscriptionChannels DiffTime
trDelay1 DiffTime
trDelay2 = do
(cr, rc) <- m (Channel m (AnyMessage (ChainSync block (Point block) tip)),
Channel m (AnyMessage (ChainSync block (Point block) tip)))
forall (m :: * -> *) a. MonadSTM m => m (Channel m a, Channel m a)
createConnectedChannels
return
( NodeChannels
{ consumerChans = []
, producerChans = [delayChannel trDelay1 cr]
}
, NodeChannels
{ consumerChans = [delayChannel trDelay2 rc]
, producerChans = []
}
)
createTwoWaySubscriptionChannels
:: forall block tip m.
( MonadDelay m
, MonadLabelledSTM m
, MonadTimer m
)
=> DiffTime
-> DiffTime
-> m (NodeChannels m block tip, NodeChannels m block tip)
createTwoWaySubscriptionChannels :: forall {k} {k2} (block :: k) (tip :: k2) (m :: * -> *).
(MonadDelay m, MonadLabelledSTM m, MonadTimer m) =>
DiffTime
-> DiffTime
-> m (NodeChannels m block tip, NodeChannels m block tip)
createTwoWaySubscriptionChannels DiffTime
trDelay1 DiffTime
trDelay2 = do
r12 <- DiffTime
-> DiffTime
-> m (NodeChannels m block tip, NodeChannels m block tip)
forall {k} {k2} (block :: k) (tip :: k2) (m :: * -> *).
(MonadSTM m, MonadDelay m) =>
DiffTime
-> DiffTime
-> m (NodeChannels m block tip, NodeChannels m block tip)
createOneWaySubscriptionChannels DiffTime
trDelay1 DiffTime
trDelay2
r21 <- createOneWaySubscriptionChannels trDelay2 trDelay1
return $ r12 <> swap r21
blockGenerator :: forall block m.
( HasHeader block
, MonadDelay m
, MonadSTM m
, MonadFork m
)
=> DiffTime
-> [block]
-> m (STM m (Maybe block))
blockGenerator :: forall block (m :: * -> *).
(HasHeader block, MonadDelay m, MonadSTM m, MonadFork m) =>
DiffTime -> [block] -> m (STM m (Maybe block))
blockGenerator DiffTime
slotDuration [block]
chain = do
var <- STM m (StrictTBQueue m (Maybe block))
-> m (StrictTBQueue m (Maybe block))
forall a. HasCallStack => STM m a -> m a
forall (m :: * -> *) a.
(MonadSTM m, HasCallStack) =>
STM m a -> m a
atomically (Natural -> STM m (StrictTBQueue m (Maybe block))
forall (m :: * -> *) a.
MonadSTM m =>
Natural -> STM m (StrictTBQueue m a)
newTBQueue Natural
1)
void $ forkIO $ go var Nothing chain
return (readTBQueue var)
where
go :: StrictTBQueue m (Maybe block) -> Maybe SlotNo -> [block] -> m ()
go :: StrictTBQueue m (Maybe block) -> Maybe SlotNo -> [block] -> m ()
go StrictTBQueue m (Maybe block)
var Maybe SlotNo
_ [] = do
STM m () -> m ()
forall a. HasCallStack => STM m a -> m a
forall (m :: * -> *) a.
(MonadSTM m, HasCallStack) =>
STM m a -> m a
atomically (StrictTBQueue m (Maybe block) -> Maybe block -> STM m ()
forall (m :: * -> *) a.
MonadSTM m =>
StrictTBQueue m a -> a -> STM m ()
writeTBQueue StrictTBQueue m (Maybe block)
var Maybe block
forall a. Maybe a
Nothing)
go StrictTBQueue m (Maybe block)
var Maybe SlotNo
mslot (block
b : [block]
bs) = do
let slot :: SlotNo
slot = block -> SlotNo
forall b. HasHeader b => b -> SlotNo
blockSlot block
b
delay :: Word64
delay = SlotNo -> Word64
unSlotNo SlotNo
slot Word64 -> Word64 -> Word64
forall a. Num a => a -> a -> a
- Word64 -> (SlotNo -> Word64) -> Maybe SlotNo -> Word64
forall b a. b -> (a -> b) -> Maybe a -> b
maybe Word64
0 SlotNo -> Word64
unSlotNo Maybe SlotNo
mslot
DiffTime -> m ()
forall (m :: * -> *). MonadDelay m => DiffTime -> m ()
threadDelay (DiffTime
slotDuration DiffTime -> DiffTime -> DiffTime
forall a. Num a => a -> a -> a
* Word64 -> DiffTime
forall a b. (Integral a, Num b) => a -> b
fromIntegral Word64
delay)
STM m () -> m ()
forall a. HasCallStack => STM m a -> m a
forall (m :: * -> *) a.
(MonadSTM m, HasCallStack) =>
STM m a -> m a
atomically (StrictTBQueue m (Maybe block) -> Maybe block -> STM m ()
forall (m :: * -> *) a.
MonadSTM m =>
StrictTBQueue m a -> a -> STM m ()
writeTBQueue StrictTBQueue m (Maybe block)
var (block -> Maybe block
forall a. a -> Maybe a
Just block
b))
StrictTBQueue m (Maybe block) -> Maybe SlotNo -> [block] -> m ()
go StrictTBQueue m (Maybe block)
var (SlotNo -> Maybe SlotNo
forall a. a -> Maybe a
Just SlotNo
slot) [block]
bs
observeChainProducerState
:: forall m block.
( HasHeader block
, MonadSTM m
)
=> NodeId
-> StrictTVar m [(NodeId, Chain block)]
-> StrictTVar m (ChainProducerState block)
-> m ()
observeChainProducerState :: forall (m :: * -> *) block.
(HasHeader block, MonadSTM m) =>
NodeId
-> StrictTVar m [(NodeId, Chain block)]
-> StrictTVar m (ChainProducerState block)
-> m ()
observeChainProducerState NodeId
nid StrictTVar m [(NodeId, Chain block)]
probe StrictTVar m (ChainProducerState block)
cpsVar = do
st <- STM m (StrictTVar m (Point block))
-> m (StrictTVar m (Point block))
forall a. HasCallStack => STM m a -> m a
forall (m :: * -> *) a.
(MonadSTM m, HasCallStack) =>
STM m a -> m a
atomically (Point block -> STM m (StrictTVar m (Point block))
forall (m :: * -> *) a. MonadSTM m => a -> STM m (StrictTVar m a)
newTVar Point block
forall {k} (block :: k). Point block
genesisPoint)
forever (update st)
where
update :: StrictTVar m (Point block) -> m ()
update :: StrictTVar m (Point block) -> m ()
update StrictTVar m (Point block)
stateVar = STM m () -> m ()
forall a. HasCallStack => STM m a -> m a
forall (m :: * -> *) a.
(MonadSTM m, HasCallStack) =>
STM m a -> m a
atomically (STM m () -> m ()) -> STM m () -> m ()
forall a b. (a -> b) -> a -> b
$ do
chain <- ChainProducerState block -> Chain block
forall block. ChainProducerState block -> Chain block
producerChain (ChainProducerState block -> Chain block)
-> STM m (ChainProducerState block) -> STM m (Chain block)
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> StrictTVar m (ChainProducerState block)
-> STM m (ChainProducerState block)
forall (m :: * -> *) a. MonadSTM m => StrictTVar m a -> STM m a
readTVar StrictTVar m (ChainProducerState block)
cpsVar
curPoint <- readTVar stateVar
check (Chain.headPoint chain /= curPoint)
writeTVar stateVar (Chain.headPoint chain)
modifyTVar probe ((nid, chain):)
data ConsumerId = ConsumerId NodeId Int
deriving (ConsumerId -> ConsumerId -> Bool
(ConsumerId -> ConsumerId -> Bool)
-> (ConsumerId -> ConsumerId -> Bool) -> Eq ConsumerId
forall a. (a -> a -> Bool) -> (a -> a -> Bool) -> Eq a
$c== :: ConsumerId -> ConsumerId -> Bool
== :: ConsumerId -> ConsumerId -> Bool
$c/= :: ConsumerId -> ConsumerId -> Bool
/= :: ConsumerId -> ConsumerId -> Bool
Eq, Eq ConsumerId
Eq ConsumerId =>
(ConsumerId -> ConsumerId -> Ordering)
-> (ConsumerId -> ConsumerId -> Bool)
-> (ConsumerId -> ConsumerId -> Bool)
-> (ConsumerId -> ConsumerId -> Bool)
-> (ConsumerId -> ConsumerId -> Bool)
-> (ConsumerId -> ConsumerId -> ConsumerId)
-> (ConsumerId -> ConsumerId -> ConsumerId)
-> Ord ConsumerId
ConsumerId -> ConsumerId -> Bool
ConsumerId -> ConsumerId -> Ordering
ConsumerId -> ConsumerId -> ConsumerId
forall a.
Eq a =>
(a -> a -> Ordering)
-> (a -> a -> Bool)
-> (a -> a -> Bool)
-> (a -> a -> Bool)
-> (a -> a -> Bool)
-> (a -> a -> a)
-> (a -> a -> a)
-> Ord a
$ccompare :: ConsumerId -> ConsumerId -> Ordering
compare :: ConsumerId -> ConsumerId -> Ordering
$c< :: ConsumerId -> ConsumerId -> Bool
< :: ConsumerId -> ConsumerId -> Bool
$c<= :: ConsumerId -> ConsumerId -> Bool
<= :: ConsumerId -> ConsumerId -> Bool
$c> :: ConsumerId -> ConsumerId -> Bool
> :: ConsumerId -> ConsumerId -> Bool
$c>= :: ConsumerId -> ConsumerId -> Bool
>= :: ConsumerId -> ConsumerId -> Bool
$cmax :: ConsumerId -> ConsumerId -> ConsumerId
max :: ConsumerId -> ConsumerId -> ConsumerId
$cmin :: ConsumerId -> ConsumerId -> ConsumerId
min :: ConsumerId -> ConsumerId -> ConsumerId
Ord, Int -> ConsumerId -> ShowS
[ConsumerId] -> ShowS
ConsumerId -> String
(Int -> ConsumerId -> ShowS)
-> (ConsumerId -> String)
-> ([ConsumerId] -> ShowS)
-> Show ConsumerId
forall a.
(Int -> a -> ShowS) -> (a -> String) -> ([a] -> ShowS) -> Show a
$cshowsPrec :: Int -> ConsumerId -> ShowS
showsPrec :: Int -> ConsumerId -> ShowS
$cshow :: ConsumerId -> String
show :: ConsumerId -> String
$cshowList :: [ConsumerId] -> ShowS
showList :: [ConsumerId] -> ShowS
Show)
data ProducerId = ProducerId NodeId Int
deriving (ProducerId -> ProducerId -> Bool
(ProducerId -> ProducerId -> Bool)
-> (ProducerId -> ProducerId -> Bool) -> Eq ProducerId
forall a. (a -> a -> Bool) -> (a -> a -> Bool) -> Eq a
$c== :: ProducerId -> ProducerId -> Bool
== :: ProducerId -> ProducerId -> Bool
$c/= :: ProducerId -> ProducerId -> Bool
/= :: ProducerId -> ProducerId -> Bool
Eq, Eq ProducerId
Eq ProducerId =>
(ProducerId -> ProducerId -> Ordering)
-> (ProducerId -> ProducerId -> Bool)
-> (ProducerId -> ProducerId -> Bool)
-> (ProducerId -> ProducerId -> Bool)
-> (ProducerId -> ProducerId -> Bool)
-> (ProducerId -> ProducerId -> ProducerId)
-> (ProducerId -> ProducerId -> ProducerId)
-> Ord ProducerId
ProducerId -> ProducerId -> Bool
ProducerId -> ProducerId -> Ordering
ProducerId -> ProducerId -> ProducerId
forall a.
Eq a =>
(a -> a -> Ordering)
-> (a -> a -> Bool)
-> (a -> a -> Bool)
-> (a -> a -> Bool)
-> (a -> a -> Bool)
-> (a -> a -> a)
-> (a -> a -> a)
-> Ord a
$ccompare :: ProducerId -> ProducerId -> Ordering
compare :: ProducerId -> ProducerId -> Ordering
$c< :: ProducerId -> ProducerId -> Bool
< :: ProducerId -> ProducerId -> Bool
$c<= :: ProducerId -> ProducerId -> Bool
<= :: ProducerId -> ProducerId -> Bool
$c> :: ProducerId -> ProducerId -> Bool
> :: ProducerId -> ProducerId -> Bool
$c>= :: ProducerId -> ProducerId -> Bool
>= :: ProducerId -> ProducerId -> Bool
$cmax :: ProducerId -> ProducerId -> ProducerId
max :: ProducerId -> ProducerId -> ProducerId
$cmin :: ProducerId -> ProducerId -> ProducerId
min :: ProducerId -> ProducerId -> ProducerId
Ord, Int -> ProducerId -> ShowS
[ProducerId] -> ShowS
ProducerId -> String
(Int -> ProducerId -> ShowS)
-> (ProducerId -> String)
-> ([ProducerId] -> ShowS)
-> Show ProducerId
forall a.
(Int -> a -> ShowS) -> (a -> String) -> ([a] -> ShowS) -> Show a
$cshowsPrec :: Int -> ProducerId -> ShowS
showsPrec :: Int -> ProducerId -> ShowS
$cshow :: ProducerId -> String
show :: ProducerId -> String
$cshowList :: [ProducerId] -> ShowS
showList :: [ProducerId] -> ShowS
Show)
forkRelayKernel :: forall block m.
( HasFullHeader block
, MonadSTM m
, MonadFork m
)
=> [StrictTVar m (Chain block)]
-> StrictTVar m (ChainProducerState block)
-> m ()
forkRelayKernel :: forall block (m :: * -> *).
(HasFullHeader block, MonadSTM m, MonadFork m) =>
[StrictTVar m (Chain block)]
-> StrictTVar m (ChainProducerState block) -> m ()
forkRelayKernel [StrictTVar m (Chain block)]
upstream StrictTVar m (ChainProducerState block)
cpsVar = do
candidateChainVars <- Int
-> m (StrictTVar m (Maybe (Chain block)))
-> m [StrictTVar m (Maybe (Chain block))]
forall (m :: * -> *) a. Applicative m => Int -> m a -> m [a]
replicateM ([StrictTVar m (Chain block)] -> Int
forall a. [a] -> Int
forall (t :: * -> *) a. Foldable t => t a -> Int
length [StrictTVar m (Chain block)]
upstream) (STM m (StrictTVar m (Maybe (Chain block)))
-> m (StrictTVar m (Maybe (Chain block)))
forall a. HasCallStack => STM m a -> m a
forall (m :: * -> *) a.
(MonadSTM m, HasCallStack) =>
STM m a -> m a
atomically (Maybe (Chain block) -> STM m (StrictTVar m (Maybe (Chain block)))
forall (m :: * -> *) a. MonadSTM m => a -> STM m (StrictTVar m a)
newTVar Maybe (Chain block)
forall a. Maybe a
Nothing))
zipWithM_
(\StrictTVar m (Chain block)
chain StrictTVar m (Maybe (Chain block))
cchain -> m () -> m (ThreadId m)
forall (m :: * -> *). MonadFork m => m () -> m (ThreadId m)
forkIO (m () -> m (ThreadId m)) -> m () -> m (ThreadId m)
forall a b. (a -> b) -> a -> b
$ StrictTVar m (Chain block)
-> StrictTVar m (Maybe (Chain block)) -> m ()
forall block (m :: * -> *).
(HasFullHeader block, MonadSTM m) =>
StrictTVar m (Chain block)
-> StrictTVar m (Maybe (Chain block)) -> m ()
chainValidation StrictTVar m (Chain block)
chain StrictTVar m (Maybe (Chain block))
cchain)
upstream
candidateChainVars
void $ forkIO $ longestChainSelection candidateChainVars cpsVar
relayNode :: forall m block.
( MonadFork m
, MonadTimer m
, MonadThrow m
, MonadSay m
, HasFullHeader block
, Show block
, ShowProxy block
)
=> NodeId
-> Chain block
-> NodeChannels m block (Tip block)
-> m (StrictTVar m (ChainProducerState block))
relayNode :: forall (m :: * -> *) block.
(MonadFork m, MonadTimer m, MonadThrow m, MonadSay m,
HasFullHeader block, Show block, ShowProxy block) =>
NodeId
-> Chain block
-> NodeChannels m block (Tip block)
-> m (StrictTVar m (ChainProducerState block))
relayNode NodeId
_nid Chain block
initChain NodeChannels m block (Tip block)
chans = do
upstream <- (Int
-> Channel
m (AnyMessage (ChainSync block (Point block) (Tip block)))
-> m (StrictTVar m (Chain block)))
-> [Int]
-> [Channel
m (AnyMessage (ChainSync block (Point block) (Tip block)))]
-> m [StrictTVar m (Chain block)]
forall (m :: * -> *) a b c.
Applicative m =>
(a -> b -> m c) -> [a] -> [b] -> m [c]
zipWithM Int
-> Channel
m (AnyMessage (ChainSync block (Point block) (Tip block)))
-> m (StrictTVar m (Chain block))
startConsumer [Int
0..] (NodeChannels m block (Tip block)
-> [Channel
m (AnyMessage (ChainSync block (Point block) (Tip block)))]
forall {k} {k2} (m :: * -> *) (block :: k) (tip :: k2).
NodeChannels m block tip
-> [Channel m (AnyMessage (ChainSync block (Point block) tip))]
consumerChans NodeChannels m block (Tip block)
chans)
cpsVar <- atomically $ newTVar (initChainProducerState initChain)
forkRelayKernel upstream cpsVar
let producer = ChainSyncServer block (Point block) (Tip block) m ()
-> Server
(ChainSync block (Point block) (Tip block))
'NonPipelined
'StIdle
m
()
forall header point tip (m :: * -> *) a.
Monad m =>
ChainSyncServer header point tip m a
-> Server (ChainSync header point tip) 'NonPipelined 'StIdle m a
chainSyncServerPeer (()
-> StrictTVar m (ChainProducerState block)
-> (block -> block)
-> ChainSyncServer block (Point block) (Tip block) m ()
forall blk header (m :: * -> *) a.
(HasHeader blk, MonadSTM m, HeaderHash header ~ HeaderHash blk) =>
a
-> StrictTVar m (ChainProducerState blk)
-> (blk -> header)
-> ChainSyncServer header (Point blk) (Tip blk) m a
chainSyncServerExample () StrictTVar m (ChainProducerState block)
cpsVar block -> block
forall a. a -> a
id)
mapM_ (uncurry $ startProducer producer) (zip [0..] (producerChans chans))
return cpsVar
where
startConsumer :: Int
-> Channel m (AnyMessage (ChainSync block (Point block) (Tip block)))
-> m (StrictTVar m (Chain block))
startConsumer :: Int
-> Channel
m (AnyMessage (ChainSync block (Point block) (Tip block)))
-> m (StrictTVar m (Chain block))
startConsumer Int
_cid Channel m (AnyMessage (ChainSync block (Point block) (Tip block)))
channel = do
chainVar <- STM m (StrictTVar m (Chain block))
-> m (StrictTVar m (Chain block))
forall a. HasCallStack => STM m a -> m a
forall (m :: * -> *) a.
(MonadSTM m, HasCallStack) =>
STM m a -> m a
atomically (STM m (StrictTVar m (Chain block))
-> m (StrictTVar m (Chain block)))
-> STM m (StrictTVar m (Chain block))
-> m (StrictTVar m (Chain block))
forall a b. (a -> b) -> a -> b
$ Chain block -> STM m (StrictTVar m (Chain block))
forall (m :: * -> *) a. MonadSTM m => a -> STM m (StrictTVar m a)
newTVar Chain block
forall block. Chain block
Genesis
let consumer = ChainSyncClient block (Point block) tip m a
-> Client
(ChainSync block (Point block) tip) 'NonPipelined 'StIdle m a
forall header point tip (m :: * -> *) a.
Monad m =>
ChainSyncClient header point tip m a
-> Client (ChainSync header point tip) 'NonPipelined 'StIdle m a
chainSyncClientPeer (StrictTVar m (Chain block)
-> Client block (Point block) tip m a
-> ChainSyncClient block (Point block) tip m a
forall header block tip (m :: * -> *) a.
(HasHeader header, HasHeader block,
HeaderHash header ~ HeaderHash block, MonadSTM m) =>
StrictTVar m (Chain header)
-> Client header (Point block) tip m a
-> ChainSyncClient header (Point block) tip m a
chainSyncClientExample StrictTVar m (Chain block)
chainVar Client block (Point block) tip m a
forall (m :: * -> *) header point tip void.
Applicative m =>
Client header point tip m void
pureClient)
void $ forkIO $ void $ runPeer nullTracer
codecChainSyncId
channel
consumer
return chainVar
startProducer :: Server (ChainSync block (Point block) (Tip block)) NonPipelined StIdle m ()
-> Int
-> Channel m (AnyMessage (ChainSync block (Point block) (Tip block)))
-> m ()
startProducer :: Server
(ChainSync block (Point block) (Tip block))
'NonPipelined
'StIdle
m
()
-> Int
-> Channel
m (AnyMessage (ChainSync block (Point block) (Tip block)))
-> m ()
startProducer Server
(ChainSync block (Point block) (Tip block))
'NonPipelined
'StIdle
m
()
producer Int
_pid Channel m (AnyMessage (ChainSync block (Point block) (Tip block)))
channel =
m (ThreadId m) -> m ()
forall (f :: * -> *) a. Functor f => f a -> f ()
void (m (ThreadId m) -> m ()) -> m (ThreadId m) -> m ()
forall a b. (a -> b) -> a -> b
$ m () -> m (ThreadId m)
forall (m :: * -> *). MonadFork m => m () -> m (ThreadId m)
forkIO (m () -> m (ThreadId m)) -> m () -> m (ThreadId m)
forall a b. (a -> b) -> a -> b
$ m ((),
Maybe (AnyMessage (ChainSync block (Point block) (Tip block))))
-> m ()
forall (f :: * -> *) a. Functor f => f a -> f ()
void (m ((),
Maybe (AnyMessage (ChainSync block (Point block) (Tip block))))
-> m ())
-> m ((),
Maybe (AnyMessage (ChainSync block (Point block) (Tip block))))
-> m ()
forall a b. (a -> b) -> a -> b
$ Tracer
m (TraceSendRecv (ChainSync block (Point block) (Tip block)))
-> Codec
(ChainSync block (Point block) (Tip block))
CodecFailure
m
(AnyMessage (ChainSync block (Point block) (Tip block)))
-> Channel
m (AnyMessage (ChainSync block (Point block) (Tip block)))
-> Server
(ChainSync block (Point block) (Tip block))
'NonPipelined
'StIdle
m
()
-> m ((),
Maybe (AnyMessage (ChainSync block (Point block) (Tip block))))
forall ps (st :: ps) (pr :: PeerRole) failure bytes (m :: * -> *)
a.
(MonadThrow m, ShowProxy ps,
forall (st' :: ps) stok. (stok ~ StateToken st') => Show stok,
Show failure) =>
Tracer m (TraceSendRecv ps)
-> Codec ps failure m bytes
-> Channel m bytes
-> Peer ps pr 'NonPipelined st m a
-> m (a, Maybe bytes)
runPeer Tracer
m (TraceSendRecv (ChainSync block (Point block) (Tip block)))
forall (m :: * -> *) a. Applicative m => Tracer m a
nullTracer
Codec
(ChainSync block (Point block) (Tip block))
CodecFailure
m
(AnyMessage (ChainSync block (Point block) (Tip block)))
forall {k} {k1} {k2} (header :: k) (point :: k1) (tip :: k2)
(m :: * -> *).
Monad m =>
Codec
(ChainSync header point tip)
CodecFailure
m
(AnyMessage (ChainSync header point tip))
codecChainSyncId
Channel m (AnyMessage (ChainSync block (Point block) (Tip block)))
channel
Server
(ChainSync block (Point block) (Tip block))
'NonPipelined
'StIdle
m
()
producer
forkCoreKernel :: forall block m.
( HasFullHeader block
, MonadDelay m
, MonadLabelledSTM m
, MonadFork m
, MonadTimer m
)
=> DiffTime
-> [block]
-> (Chain block -> block -> block)
-> StrictTVar m (ChainProducerState block)
-> m ()
forkCoreKernel :: forall block (m :: * -> *).
(HasFullHeader block, MonadDelay m, MonadLabelledSTM m,
MonadFork m, MonadTimer m) =>
DiffTime
-> [block]
-> (Chain block -> block -> block)
-> StrictTVar m (ChainProducerState block)
-> m ()
forkCoreKernel DiffTime
slotDuration [block]
gchain Chain block -> block -> block
fixupBlock StrictTVar m (ChainProducerState block)
cpsVar = do
getBlock <- DiffTime -> [block] -> m (STM m (Maybe block))
forall block (m :: * -> *).
(HasHeader block, MonadDelay m, MonadSTM m, MonadFork m) =>
DiffTime -> [block] -> m (STM m (Maybe block))
blockGenerator DiffTime
slotDuration [block]
gchain
void $ forkIO $ applyGeneratedBlock getBlock
where
applyGeneratedBlock
:: STM m (Maybe block)
-> m ()
applyGeneratedBlock :: STM m (Maybe block) -> m ()
applyGeneratedBlock STM m (Maybe block)
getBlock = do
cont <- STM m Bool -> m Bool
forall a. HasCallStack => STM m a -> m a
forall (m :: * -> *) a.
(MonadSTM m, HasCallStack) =>
STM m a -> m a
atomically (STM m Bool -> m Bool) -> STM m Bool -> m Bool
forall a b. (a -> b) -> a -> b
$ do
mblock <- STM m (Maybe block)
getBlock
case mblock of
Maybe block
Nothing -> Bool -> STM m Bool
forall a. a -> STM m a
forall (m :: * -> *) a. Monad m => a -> m a
return Bool
False
Just block
block -> do
cps@ChainProducerState{chainState = chain} <- StrictTVar m (ChainProducerState block)
-> STM m (ChainProducerState block)
forall (m :: * -> *) a. MonadSTM m => StrictTVar m a -> STM m a
readTVar StrictTVar m (ChainProducerState block)
cpsVar
writeTVar cpsVar (switchFork (addBlock chain block) cps)
return True
if cont
then applyGeneratedBlock getBlock
else return ()
addBlock :: Chain block -> block -> Chain block
addBlock :: Chain block -> block -> Chain block
addBlock Chain block
c block
b =
case SlotNo -> WithOrigin SlotNo
forall t. t -> WithOrigin t
At (block -> SlotNo
forall b. HasHeader b => b -> SlotNo
blockSlot block
b) WithOrigin SlotNo -> WithOrigin SlotNo -> Ordering
forall a. Ord a => a -> a -> Ordering
`compare` Chain block -> WithOrigin SlotNo
forall block. HasHeader block => Chain block -> WithOrigin SlotNo
Chain.headSlot Chain block
c of
Ordering
LT -> String -> Chain block
forall a. HasCallStack => String -> a
error String
"blockGenerator invariant vaiolation: generated block is for slot in the past"
Ordering
_ -> let r :: Chain block
r = block -> Chain block -> Chain block
forall block.
HasHeader block =>
block -> Chain block -> Chain block
Chain.addBlock (Chain block -> block -> block
fixupBlock Chain block
c block
b) Chain block
c in
Bool -> Chain block -> Chain block
forall a. HasCallStack => Bool -> a -> a
assert (Chain block -> Bool
forall block. HasFullHeader block => Chain block -> Bool
Chain.valid Chain block
r) Chain block
r
coreNode :: forall m.
( MonadDelay m
, MonadLabelledSTM m
, MonadFork m
, MonadThrow m
, MonadTimer m
, MonadSay m
)
=> NodeId
-> DiffTime
-> [Block]
-> NodeChannels m Block (Tip Block)
-> m (StrictTVar m (ChainProducerState Block))
coreNode :: forall (m :: * -> *).
(MonadDelay m, MonadLabelledSTM m, MonadFork m, MonadThrow m,
MonadTimer m, MonadSay m) =>
NodeId
-> DiffTime
-> [Block]
-> NodeChannels m Block (Tip Block)
-> m (StrictTVar m (ChainProducerState Block))
coreNode NodeId
nid DiffTime
slotDuration [Block]
gchain NodeChannels m Block (Tip Block)
chans = do
cpsVar <- NodeId
-> Chain Block
-> NodeChannels m Block (Tip Block)
-> m (StrictTVar m (ChainProducerState Block))
forall (m :: * -> *) block.
(MonadFork m, MonadTimer m, MonadThrow m, MonadSay m,
HasFullHeader block, Show block, ShowProxy block) =>
NodeId
-> Chain block
-> NodeChannels m block (Tip block)
-> m (StrictTVar m (ChainProducerState block))
relayNode NodeId
nid Chain Block
forall block. Chain block
Genesis NodeChannels m Block (Tip Block)
chans
forkCoreKernel slotDuration gchain fixupBlock cpsVar
return cpsVar
where
fixupBlock :: Chain Block -> Block -> Block
fixupBlock :: Chain Block -> Block -> Block
fixupBlock Chain Block
c = Anchor Block -> Block -> Block
forall block.
(HeaderHash block ~ HeaderHash BlockHeader) =>
Anchor block -> Block -> Block
Concrete.fixupBlock (Chain Block -> Anchor Block
forall block. HasHeader block => Chain block -> Anchor block
Chain.headAnchor Chain Block
c)