{-# LANGUAGE DataKinds           #-}
{-# LANGUAGE DeriveGeneric       #-}
{-# LANGUAGE FlexibleContexts    #-}
{-# LANGUAGE KindSignatures      #-}
{-# LANGUAGE NamedFieldPuns      #-}
{-# LANGUAGE PolyKinds           #-}
{-# LANGUAGE ScopedTypeVariables #-}

-- | A mock (and naive) node implentation.  It only implements the @chain-sync@
-- protocol using the 'Ouroboros.Network.Mock.Chain' module.  The module
-- is used to build tests on randomly generated graphs of nodes, see
-- 'Test.Ouroboros.Network.MockNoder'.
--
-- A historical note: this was the very first node implemntation that pre-dates
-- 'typed-protocols'.
--
module Ouroboros.Network.MockNode where

import Control.Exception (assert)
import Control.Monad
import Data.Hashable
import Data.List qualified as List
import Data.Maybe (catMaybes)
import Data.Tuple (swap)
import GHC.Generics (Generic)

import Control.Concurrent.Class.MonadSTM.Strict
import Control.Monad.Class.MonadFork
import Control.Monad.Class.MonadSay
import Control.Monad.Class.MonadThrow
import Control.Monad.Class.MonadTimer.SI
import Control.Tracer (nullTracer)

import Network.TypedProtocol.Codec
import Network.TypedProtocol.Peer.Server

import Ouroboros.Network.Block
import Ouroboros.Network.Channel
import Ouroboros.Network.Driver
import Ouroboros.Network.Util.ShowProxy

-- TODO Should this be imported here
import Ouroboros.Network.Point (WithOrigin (At))
import Ouroboros.Network.Protocol.ChainSync.Client
import Ouroboros.Network.Protocol.ChainSync.Codec (codecChainSyncId)
import Ouroboros.Network.Protocol.ChainSync.Examples
import Ouroboros.Network.Protocol.ChainSync.Server
import Ouroboros.Network.Protocol.ChainSync.Type

import Ouroboros.Network.Mock.Chain (Chain (..))
import Ouroboros.Network.Mock.Chain qualified as Chain
import Ouroboros.Network.Mock.ConcreteBlock hiding (fixupBlock)
import Ouroboros.Network.Mock.ConcreteBlock qualified as Concrete
import Ouroboros.Network.Mock.ProducerState (ChainProducerState (..),
           initChainProducerState, producerChain, switchFork)

data NodeId = CoreId Int
            | RelayId Int
  deriving (NodeId -> NodeId -> Bool
(NodeId -> NodeId -> Bool)
-> (NodeId -> NodeId -> Bool) -> Eq NodeId
forall a. (a -> a -> Bool) -> (a -> a -> Bool) -> Eq a
$c== :: NodeId -> NodeId -> Bool
== :: NodeId -> NodeId -> Bool
$c/= :: NodeId -> NodeId -> Bool
/= :: NodeId -> NodeId -> Bool
Eq, Eq NodeId
Eq NodeId =>
(NodeId -> NodeId -> Ordering)
-> (NodeId -> NodeId -> Bool)
-> (NodeId -> NodeId -> Bool)
-> (NodeId -> NodeId -> Bool)
-> (NodeId -> NodeId -> Bool)
-> (NodeId -> NodeId -> NodeId)
-> (NodeId -> NodeId -> NodeId)
-> Ord NodeId
NodeId -> NodeId -> Bool
NodeId -> NodeId -> Ordering
NodeId -> NodeId -> NodeId
forall a.
Eq a =>
(a -> a -> Ordering)
-> (a -> a -> Bool)
-> (a -> a -> Bool)
-> (a -> a -> Bool)
-> (a -> a -> Bool)
-> (a -> a -> a)
-> (a -> a -> a)
-> Ord a
$ccompare :: NodeId -> NodeId -> Ordering
compare :: NodeId -> NodeId -> Ordering
$c< :: NodeId -> NodeId -> Bool
< :: NodeId -> NodeId -> Bool
$c<= :: NodeId -> NodeId -> Bool
<= :: NodeId -> NodeId -> Bool
$c> :: NodeId -> NodeId -> Bool
> :: NodeId -> NodeId -> Bool
$c>= :: NodeId -> NodeId -> Bool
>= :: NodeId -> NodeId -> Bool
$cmax :: NodeId -> NodeId -> NodeId
max :: NodeId -> NodeId -> NodeId
$cmin :: NodeId -> NodeId -> NodeId
min :: NodeId -> NodeId -> NodeId
Ord, Int -> NodeId -> ShowS
[NodeId] -> ShowS
NodeId -> String
(Int -> NodeId -> ShowS)
-> (NodeId -> String) -> ([NodeId] -> ShowS) -> Show NodeId
forall a.
(Int -> a -> ShowS) -> (a -> String) -> ([a] -> ShowS) -> Show a
$cshowsPrec :: Int -> NodeId -> ShowS
showsPrec :: Int -> NodeId -> ShowS
$cshow :: NodeId -> String
show :: NodeId -> String
$cshowList :: [NodeId] -> ShowS
showList :: [NodeId] -> ShowS
Show, (forall x. NodeId -> Rep NodeId x)
-> (forall x. Rep NodeId x -> NodeId) -> Generic NodeId
forall x. Rep NodeId x -> NodeId
forall x. NodeId -> Rep NodeId x
forall a.
(forall x. a -> Rep a x) -> (forall x. Rep a x -> a) -> Generic a
$cfrom :: forall x. NodeId -> Rep NodeId x
from :: forall x. NodeId -> Rep NodeId x
$cto :: forall x. Rep NodeId x -> NodeId
to :: forall x. Rep NodeId x -> NodeId
Generic)

instance Hashable NodeId -- let generic instance do the job

-- |
-- State-full chain selection (@'ChainProducerState'@).
longestChainSelection :: forall block m.
                         ( HasHeader block
                         , MonadSTM m
                         )
                      => [StrictTVar m (Maybe (Chain block))]
                      -> StrictTVar m (ChainProducerState block)
                      -> m ()
longestChainSelection :: forall block (m :: * -> *).
(HasHeader block, MonadSTM m) =>
[StrictTVar m (Maybe (Chain block))]
-> StrictTVar m (ChainProducerState block) -> m ()
longestChainSelection [StrictTVar m (Maybe (Chain block))]
candidateChainVars StrictTVar m (ChainProducerState block)
cpsVar =
    m () -> m ()
forall (f :: * -> *) a b. Applicative f => f a -> f b
forever (STM m () -> m ()
forall a. HasCallStack => STM m a -> m a
forall (m :: * -> *) a.
(MonadSTM m, HasCallStack) =>
STM m a -> m a
atomically STM m ()
updateCurrentChain)
  where
    updateCurrentChain :: STM m ()
    updateCurrentChain :: STM m ()
updateCurrentChain = do
      candidateChains <- (StrictTVar m (Maybe (Chain block)) -> STM m (Maybe (Chain block)))
-> [StrictTVar m (Maybe (Chain block))]
-> STM m [Maybe (Chain block)]
forall (t :: * -> *) (m :: * -> *) a b.
(Traversable t, Monad m) =>
(a -> m b) -> t a -> m (t b)
forall (m :: * -> *) a b. Monad m => (a -> m b) -> [a] -> m [b]
mapM StrictTVar m (Maybe (Chain block)) -> STM m (Maybe (Chain block))
forall (m :: * -> *) a. MonadSTM m => StrictTVar m a -> STM m a
readTVar [StrictTVar m (Maybe (Chain block))]
candidateChainVars
      cps@ChainProducerState{chainState = chain} <- readTVar cpsVar
      let -- using foldl' since @Chain.selectChain@ is left biased
          chain' = (Chain block -> Chain block -> Chain block)
-> Chain block -> [Chain block] -> Chain block
forall b a. (b -> a -> b) -> b -> [a] -> b
forall (t :: * -> *) b a.
Foldable t =>
(b -> a -> b) -> b -> t a -> b
List.foldl' Chain block -> Chain block -> Chain block
forall block.
HasHeader block =>
Chain block -> Chain block -> Chain block
Chain.selectChain Chain block
chain ([Maybe (Chain block)] -> [Chain block]
forall a. [Maybe a] -> [a]
catMaybes [Maybe (Chain block)]
candidateChains)
      if Chain.headPoint chain' == Chain.headPoint chain
        then retry
        else writeTVar cpsVar (switchFork chain' cps)


chainValidation :: forall block m. (HasFullHeader block, MonadSTM m)
                => StrictTVar m (Chain block)
                -> StrictTVar m (Maybe (Chain block))
                -> m ()
chainValidation :: forall block (m :: * -> *).
(HasFullHeader block, MonadSTM m) =>
StrictTVar m (Chain block)
-> StrictTVar m (Maybe (Chain block)) -> m ()
chainValidation StrictTVar m (Chain block)
peerChainVar StrictTVar m (Maybe (Chain block))
candidateChainVar = do
    st <- STM m (StrictTVar m (Point block))
-> m (StrictTVar m (Point block))
forall a. HasCallStack => STM m a -> m a
forall (m :: * -> *) a.
(MonadSTM m, HasCallStack) =>
STM m a -> m a
atomically (Point block -> STM m (StrictTVar m (Point block))
forall (m :: * -> *) a. MonadSTM m => a -> STM m (StrictTVar m a)
newTVar Point block
forall {k} (block :: k). Point block
genesisPoint)
    forever (atomically (update st))
  where
    update :: StrictTVar m (Point block) -> STM m ()
    update :: StrictTVar m (Point block) -> STM m ()
update StrictTVar m (Point block)
stateVar = do
      peerChain      <- StrictTVar m (Chain block) -> STM m (Chain block)
forall (m :: * -> *) a. MonadSTM m => StrictTVar m a -> STM m a
readTVar StrictTVar m (Chain block)
peerChainVar
      candidatePoint <- readTVar stateVar
      check (Chain.headPoint peerChain /= candidatePoint)
      writeTVar stateVar (Chain.headPoint peerChain)
      let candidateChain | Chain block -> Bool
forall block. HasFullHeader block => Chain block -> Bool
Chain.valid Chain block
peerChain = Chain block -> Maybe (Chain block)
forall a. a -> Maybe a
Just Chain block
peerChain
                         | Bool
otherwise             = Maybe (Chain block)
forall a. Maybe a
Nothing
      writeTVar candidateChainVar candidateChain


-- | Simulated network channels for a given network node.
--
data NodeChannels m block tip = NodeChannels
  { forall {k} {k2} (m :: * -> *) (block :: k) (tip :: k2).
NodeChannels m block tip
-> [Channel m (AnyMessage (ChainSync block (Point block) tip))]
consumerChans :: [Channel m (AnyMessage (ChainSync block (Point block) tip))]
    -- ^ channels on which the node will play the consumer role:
    -- sending @consMsg@ and receiving @prodMsg@ messages.
  , forall {k} {k2} (m :: * -> *) (block :: k) (tip :: k2).
NodeChannels m block tip
-> [Channel m (AnyMessage (ChainSync block (Point block) tip))]
producerChans :: [Channel m (AnyMessage (ChainSync block (Point block) tip))]
    -- ^ channels on which the node will play the producer role:
    -- sending @prodMsg@ and receiving @consMsg@ messages.
  }

instance Semigroup (NodeChannels m block tip) where
  NodeChannels [Channel m (AnyMessage (ChainSync block (Point block) tip))]
c1 [Channel m (AnyMessage (ChainSync block (Point block) tip))]
p1 <> :: NodeChannels m block tip
-> NodeChannels m block tip -> NodeChannels m block tip
<> NodeChannels [Channel m (AnyMessage (ChainSync block (Point block) tip))]
c2 [Channel m (AnyMessage (ChainSync block (Point block) tip))]
p2 = [Channel m (AnyMessage (ChainSync block (Point block) tip))]
-> [Channel m (AnyMessage (ChainSync block (Point block) tip))]
-> NodeChannels m block tip
forall {k} {k2} (m :: * -> *) (block :: k) (tip :: k2).
[Channel m (AnyMessage (ChainSync block (Point block) tip))]
-> [Channel m (AnyMessage (ChainSync block (Point block) tip))]
-> NodeChannels m block tip
NodeChannels ([Channel m (AnyMessage (ChainSync block (Point block) tip))]
c1 [Channel m (AnyMessage (ChainSync block (Point block) tip))]
-> [Channel m (AnyMessage (ChainSync block (Point block) tip))]
-> [Channel m (AnyMessage (ChainSync block (Point block) tip))]
forall a. [a] -> [a] -> [a]
++ [Channel m (AnyMessage (ChainSync block (Point block) tip))]
c2) ([Channel m (AnyMessage (ChainSync block (Point block) tip))]
p1 [Channel m (AnyMessage (ChainSync block (Point block) tip))]
-> [Channel m (AnyMessage (ChainSync block (Point block) tip))]
-> [Channel m (AnyMessage (ChainSync block (Point block) tip))]
forall a. [a] -> [a] -> [a]
++ [Channel m (AnyMessage (ChainSync block (Point block) tip))]
p2)

instance Monoid (NodeChannels m block tip) where
  mempty :: NodeChannels m block tip
mempty = [Channel m (AnyMessage (ChainSync block (Point block) tip))]
-> [Channel m (AnyMessage (ChainSync block (Point block) tip))]
-> NodeChannels m block tip
forall {k} {k2} (m :: * -> *) (block :: k) (tip :: k2).
[Channel m (AnyMessage (ChainSync block (Point block) tip))]
-> [Channel m (AnyMessage (ChainSync block (Point block) tip))]
-> NodeChannels m block tip
NodeChannels [] []

-- | Create channels n1 → n2, where n1 is a producer and n2 is the consumer.
--
createOneWaySubscriptionChannels
  :: forall block tip m.
     ( MonadSTM m
     , MonadDelay m
     )
  => DiffTime
  -> DiffTime
  -> m (NodeChannels m block tip, NodeChannels m block tip)
createOneWaySubscriptionChannels :: forall {k} {k2} (block :: k) (tip :: k2) (m :: * -> *).
(MonadSTM m, MonadDelay m) =>
DiffTime
-> DiffTime
-> m (NodeChannels m block tip, NodeChannels m block tip)
createOneWaySubscriptionChannels DiffTime
trDelay1 DiffTime
trDelay2 = do
  (cr, rc) <- m (Channel m (AnyMessage (ChainSync block (Point block) tip)),
   Channel m (AnyMessage (ChainSync block (Point block) tip)))
forall (m :: * -> *) a. MonadSTM m => m (Channel m a, Channel m a)
createConnectedChannels
  return
    ( NodeChannels
        { consumerChans = []
        , producerChans = [delayChannel trDelay1 cr]
        }
    , NodeChannels
        { consumerChans = [delayChannel trDelay2 rc]
        , producerChans = []
        }
    )

-- | Create channels for n1 ↔ n2 where both nodes are a consumer and a producer
-- simultaneously.
--
createTwoWaySubscriptionChannels
  :: forall block tip m.
     ( MonadDelay m
     , MonadLabelledSTM m
     , MonadTimer m
     )
  => DiffTime
  -> DiffTime
  -> m (NodeChannels m block tip, NodeChannels m block tip)
createTwoWaySubscriptionChannels :: forall {k} {k2} (block :: k) (tip :: k2) (m :: * -> *).
(MonadDelay m, MonadLabelledSTM m, MonadTimer m) =>
DiffTime
-> DiffTime
-> m (NodeChannels m block tip, NodeChannels m block tip)
createTwoWaySubscriptionChannels DiffTime
trDelay1 DiffTime
trDelay2 = do
  r12 <- DiffTime
-> DiffTime
-> m (NodeChannels m block tip, NodeChannels m block tip)
forall {k} {k2} (block :: k) (tip :: k2) (m :: * -> *).
(MonadSTM m, MonadDelay m) =>
DiffTime
-> DiffTime
-> m (NodeChannels m block tip, NodeChannels m block tip)
createOneWaySubscriptionChannels DiffTime
trDelay1 DiffTime
trDelay2
  r21 <- createOneWaySubscriptionChannels trDelay2 trDelay1
  return $ r12 <> swap r21

-- | Generate a block from a given chain.  Each @block@ is produced at
-- @slotDuration * blockSlot block@ time.
blockGenerator :: forall block m.
                  ( HasHeader block
                  , MonadDelay m
                  , MonadSTM m
                  , MonadFork m
                  )
               => DiffTime
               -- ^ slot duration
               -> [block]
               -- ^ The list of blocks to generate in increasing slot order.
               -- This allows for upstream users to generate \"half chains\" in
               -- case we want to simulate nodes having access to already part
               -- of the overall chain.
               -> m (STM m (Maybe block))
               -- ^ returns an stm transaction which returns block.  @Nothing@
               -- signifies that there will be no more blocks.  It is the caller
               -- responsibility to read this transaction untill @Nothing@ is
               -- returned.
blockGenerator :: forall block (m :: * -> *).
(HasHeader block, MonadDelay m, MonadSTM m, MonadFork m) =>
DiffTime -> [block] -> m (STM m (Maybe block))
blockGenerator DiffTime
slotDuration [block]
chain = do
  -- communicate through a @TBQueue@, it is enough to make it very shallow,
  -- since it is supposed to be written and read once a slot time.
  var <- STM m (StrictTBQueue m (Maybe block))
-> m (StrictTBQueue m (Maybe block))
forall a. HasCallStack => STM m a -> m a
forall (m :: * -> *) a.
(MonadSTM m, HasCallStack) =>
STM m a -> m a
atomically (Natural -> STM m (StrictTBQueue m (Maybe block))
forall (m :: * -> *) a.
MonadSTM m =>
Natural -> STM m (StrictTBQueue m a)
newTBQueue Natural
1)
  void $ forkIO $ go var Nothing chain
  return (readTBQueue var)
 where
  go :: StrictTBQueue m (Maybe block) -> Maybe SlotNo -> [block] -> m ()
  go :: StrictTBQueue m (Maybe block) -> Maybe SlotNo -> [block] -> m ()
go StrictTBQueue m (Maybe block)
var Maybe SlotNo
_ [] = do
    STM m () -> m ()
forall a. HasCallStack => STM m a -> m a
forall (m :: * -> *) a.
(MonadSTM m, HasCallStack) =>
STM m a -> m a
atomically (StrictTBQueue m (Maybe block) -> Maybe block -> STM m ()
forall (m :: * -> *) a.
MonadSTM m =>
StrictTBQueue m a -> a -> STM m ()
writeTBQueue StrictTBQueue m (Maybe block)
var Maybe block
forall a. Maybe a
Nothing)
  go StrictTBQueue m (Maybe block)
var Maybe SlotNo
mslot (block
b : [block]
bs) = do
    let slot :: SlotNo
slot  = block -> SlotNo
forall b. HasHeader b => b -> SlotNo
blockSlot block
b
        delay :: Word64
delay = SlotNo -> Word64
unSlotNo SlotNo
slot Word64 -> Word64 -> Word64
forall a. Num a => a -> a -> a
- Word64 -> (SlotNo -> Word64) -> Maybe SlotNo -> Word64
forall b a. b -> (a -> b) -> Maybe a -> b
maybe Word64
0 SlotNo -> Word64
unSlotNo Maybe SlotNo
mslot
    DiffTime -> m ()
forall (m :: * -> *). MonadDelay m => DiffTime -> m ()
threadDelay (DiffTime
slotDuration DiffTime -> DiffTime -> DiffTime
forall a. Num a => a -> a -> a
* Word64 -> DiffTime
forall a b. (Integral a, Num b) => a -> b
fromIntegral Word64
delay)
    STM m () -> m ()
forall a. HasCallStack => STM m a -> m a
forall (m :: * -> *) a.
(MonadSTM m, HasCallStack) =>
STM m a -> m a
atomically (StrictTBQueue m (Maybe block) -> Maybe block -> STM m ()
forall (m :: * -> *) a.
MonadSTM m =>
StrictTBQueue m a -> a -> STM m ()
writeTBQueue StrictTBQueue m (Maybe block)
var (block -> Maybe block
forall a. a -> Maybe a
Just block
b))
    StrictTBQueue m (Maybe block) -> Maybe SlotNo -> [block] -> m ()
go StrictTBQueue m (Maybe block)
var (SlotNo -> Maybe SlotNo
forall a. a -> Maybe a
Just SlotNo
slot) [block]
bs

-- | Observe @StrictTVar ('ChainProducerState' block)@, and whenever the
-- @StrictTVar@ mutates, write the result to the supplied @'Probe'@.
--
observeChainProducerState
  :: forall m block.
     ( HasHeader block
     , MonadSTM m
     )
  => NodeId
  -> StrictTVar m [(NodeId, Chain block)]
  -> StrictTVar m (ChainProducerState block)
  -> m ()
observeChainProducerState :: forall (m :: * -> *) block.
(HasHeader block, MonadSTM m) =>
NodeId
-> StrictTVar m [(NodeId, Chain block)]
-> StrictTVar m (ChainProducerState block)
-> m ()
observeChainProducerState NodeId
nid StrictTVar m [(NodeId, Chain block)]
probe StrictTVar m (ChainProducerState block)
cpsVar = do
    st <- STM m (StrictTVar m (Point block))
-> m (StrictTVar m (Point block))
forall a. HasCallStack => STM m a -> m a
forall (m :: * -> *) a.
(MonadSTM m, HasCallStack) =>
STM m a -> m a
atomically (Point block -> STM m (StrictTVar m (Point block))
forall (m :: * -> *) a. MonadSTM m => a -> STM m (StrictTVar m a)
newTVar Point block
forall {k} (block :: k). Point block
genesisPoint)
    forever (update st)
  where
    update :: StrictTVar m (Point block) -> m ()
    update :: StrictTVar m (Point block) -> m ()
update StrictTVar m (Point block)
stateVar = STM m () -> m ()
forall a. HasCallStack => STM m a -> m a
forall (m :: * -> *) a.
(MonadSTM m, HasCallStack) =>
STM m a -> m a
atomically (STM m () -> m ()) -> STM m () -> m ()
forall a b. (a -> b) -> a -> b
$ do
      chain  <- ChainProducerState block -> Chain block
forall block. ChainProducerState block -> Chain block
producerChain (ChainProducerState block -> Chain block)
-> STM m (ChainProducerState block) -> STM m (Chain block)
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> StrictTVar m (ChainProducerState block)
-> STM m (ChainProducerState block)
forall (m :: * -> *) a. MonadSTM m => StrictTVar m a -> STM m a
readTVar StrictTVar m (ChainProducerState block)
cpsVar
      curPoint <- readTVar stateVar
      check (Chain.headPoint chain /= curPoint)
      writeTVar stateVar (Chain.headPoint chain)
      modifyTVar probe ((nid, chain):)

data ConsumerId = ConsumerId NodeId Int
  deriving (ConsumerId -> ConsumerId -> Bool
(ConsumerId -> ConsumerId -> Bool)
-> (ConsumerId -> ConsumerId -> Bool) -> Eq ConsumerId
forall a. (a -> a -> Bool) -> (a -> a -> Bool) -> Eq a
$c== :: ConsumerId -> ConsumerId -> Bool
== :: ConsumerId -> ConsumerId -> Bool
$c/= :: ConsumerId -> ConsumerId -> Bool
/= :: ConsumerId -> ConsumerId -> Bool
Eq, Eq ConsumerId
Eq ConsumerId =>
(ConsumerId -> ConsumerId -> Ordering)
-> (ConsumerId -> ConsumerId -> Bool)
-> (ConsumerId -> ConsumerId -> Bool)
-> (ConsumerId -> ConsumerId -> Bool)
-> (ConsumerId -> ConsumerId -> Bool)
-> (ConsumerId -> ConsumerId -> ConsumerId)
-> (ConsumerId -> ConsumerId -> ConsumerId)
-> Ord ConsumerId
ConsumerId -> ConsumerId -> Bool
ConsumerId -> ConsumerId -> Ordering
ConsumerId -> ConsumerId -> ConsumerId
forall a.
Eq a =>
(a -> a -> Ordering)
-> (a -> a -> Bool)
-> (a -> a -> Bool)
-> (a -> a -> Bool)
-> (a -> a -> Bool)
-> (a -> a -> a)
-> (a -> a -> a)
-> Ord a
$ccompare :: ConsumerId -> ConsumerId -> Ordering
compare :: ConsumerId -> ConsumerId -> Ordering
$c< :: ConsumerId -> ConsumerId -> Bool
< :: ConsumerId -> ConsumerId -> Bool
$c<= :: ConsumerId -> ConsumerId -> Bool
<= :: ConsumerId -> ConsumerId -> Bool
$c> :: ConsumerId -> ConsumerId -> Bool
> :: ConsumerId -> ConsumerId -> Bool
$c>= :: ConsumerId -> ConsumerId -> Bool
>= :: ConsumerId -> ConsumerId -> Bool
$cmax :: ConsumerId -> ConsumerId -> ConsumerId
max :: ConsumerId -> ConsumerId -> ConsumerId
$cmin :: ConsumerId -> ConsumerId -> ConsumerId
min :: ConsumerId -> ConsumerId -> ConsumerId
Ord, Int -> ConsumerId -> ShowS
[ConsumerId] -> ShowS
ConsumerId -> String
(Int -> ConsumerId -> ShowS)
-> (ConsumerId -> String)
-> ([ConsumerId] -> ShowS)
-> Show ConsumerId
forall a.
(Int -> a -> ShowS) -> (a -> String) -> ([a] -> ShowS) -> Show a
$cshowsPrec :: Int -> ConsumerId -> ShowS
showsPrec :: Int -> ConsumerId -> ShowS
$cshow :: ConsumerId -> String
show :: ConsumerId -> String
$cshowList :: [ConsumerId] -> ShowS
showList :: [ConsumerId] -> ShowS
Show)

data ProducerId = ProducerId NodeId Int
  deriving (ProducerId -> ProducerId -> Bool
(ProducerId -> ProducerId -> Bool)
-> (ProducerId -> ProducerId -> Bool) -> Eq ProducerId
forall a. (a -> a -> Bool) -> (a -> a -> Bool) -> Eq a
$c== :: ProducerId -> ProducerId -> Bool
== :: ProducerId -> ProducerId -> Bool
$c/= :: ProducerId -> ProducerId -> Bool
/= :: ProducerId -> ProducerId -> Bool
Eq, Eq ProducerId
Eq ProducerId =>
(ProducerId -> ProducerId -> Ordering)
-> (ProducerId -> ProducerId -> Bool)
-> (ProducerId -> ProducerId -> Bool)
-> (ProducerId -> ProducerId -> Bool)
-> (ProducerId -> ProducerId -> Bool)
-> (ProducerId -> ProducerId -> ProducerId)
-> (ProducerId -> ProducerId -> ProducerId)
-> Ord ProducerId
ProducerId -> ProducerId -> Bool
ProducerId -> ProducerId -> Ordering
ProducerId -> ProducerId -> ProducerId
forall a.
Eq a =>
(a -> a -> Ordering)
-> (a -> a -> Bool)
-> (a -> a -> Bool)
-> (a -> a -> Bool)
-> (a -> a -> Bool)
-> (a -> a -> a)
-> (a -> a -> a)
-> Ord a
$ccompare :: ProducerId -> ProducerId -> Ordering
compare :: ProducerId -> ProducerId -> Ordering
$c< :: ProducerId -> ProducerId -> Bool
< :: ProducerId -> ProducerId -> Bool
$c<= :: ProducerId -> ProducerId -> Bool
<= :: ProducerId -> ProducerId -> Bool
$c> :: ProducerId -> ProducerId -> Bool
> :: ProducerId -> ProducerId -> Bool
$c>= :: ProducerId -> ProducerId -> Bool
>= :: ProducerId -> ProducerId -> Bool
$cmax :: ProducerId -> ProducerId -> ProducerId
max :: ProducerId -> ProducerId -> ProducerId
$cmin :: ProducerId -> ProducerId -> ProducerId
min :: ProducerId -> ProducerId -> ProducerId
Ord, Int -> ProducerId -> ShowS
[ProducerId] -> ShowS
ProducerId -> String
(Int -> ProducerId -> ShowS)
-> (ProducerId -> String)
-> ([ProducerId] -> ShowS)
-> Show ProducerId
forall a.
(Int -> a -> ShowS) -> (a -> String) -> ([a] -> ShowS) -> Show a
$cshowsPrec :: Int -> ProducerId -> ShowS
showsPrec :: Int -> ProducerId -> ShowS
$cshow :: ProducerId -> String
show :: ProducerId -> String
$cshowList :: [ProducerId] -> ShowS
showList :: [ProducerId] -> ShowS
Show)


-- | Relay node, which takes @'NodeChannels'@ to communicate with its peers
-- (upstream and downstream).  If it is subscribed to n nodes and has
-- m subscriptions, it will run n consumer end protocols which listen for
-- updates; verify chains and select the longest one and feed it to the producer
-- side which sends updates to its m subscribers.
--
-- The main thread of the @'relayNode'@ is not blocking; it will return
-- @StrictTVar ('ChainProducerState' block)@. This allows to extend the relay
-- node to a core node.
forkRelayKernel :: forall block m.
                ( HasFullHeader block
                , MonadSTM m
                , MonadFork m
                )
                => [StrictTVar m (Chain block)]
                -- ^ These will track the upstream producers.
                -> StrictTVar m (ChainProducerState block)
                -- ^ This is tracking the current node and the downstream.
                -> m ()
forkRelayKernel :: forall block (m :: * -> *).
(HasFullHeader block, MonadSTM m, MonadFork m) =>
[StrictTVar m (Chain block)]
-> StrictTVar m (ChainProducerState block) -> m ()
forkRelayKernel [StrictTVar m (Chain block)]
upstream StrictTVar m (ChainProducerState block)
cpsVar = do
  -- Mutable state
  -- 2. candidate chains
  candidateChainVars <- Int
-> m (StrictTVar m (Maybe (Chain block)))
-> m [StrictTVar m (Maybe (Chain block))]
forall (m :: * -> *) a. Applicative m => Int -> m a -> m [a]
replicateM ([StrictTVar m (Chain block)] -> Int
forall a. [a] -> Int
forall (t :: * -> *) a. Foldable t => t a -> Int
length [StrictTVar m (Chain block)]
upstream) (STM m (StrictTVar m (Maybe (Chain block)))
-> m (StrictTVar m (Maybe (Chain block)))
forall a. HasCallStack => STM m a -> m a
forall (m :: * -> *) a.
(MonadSTM m, HasCallStack) =>
STM m a -> m a
atomically (Maybe (Chain block) -> STM m (StrictTVar m (Maybe (Chain block)))
forall (m :: * -> *) a. MonadSTM m => a -> STM m (StrictTVar m a)
newTVar Maybe (Chain block)
forall a. Maybe a
Nothing))
  -- chain validation threads
  zipWithM_
    (\StrictTVar m (Chain block)
chain StrictTVar m (Maybe (Chain block))
cchain -> m () -> m (ThreadId m)
forall (m :: * -> *). MonadFork m => m () -> m (ThreadId m)
forkIO (m () -> m (ThreadId m)) -> m () -> m (ThreadId m)
forall a b. (a -> b) -> a -> b
$ StrictTVar m (Chain block)
-> StrictTVar m (Maybe (Chain block)) -> m ()
forall block (m :: * -> *).
(HasFullHeader block, MonadSTM m) =>
StrictTVar m (Chain block)
-> StrictTVar m (Maybe (Chain block)) -> m ()
chainValidation StrictTVar m (Chain block)
chain StrictTVar m (Maybe (Chain block))
cchain)
    upstream
    candidateChainVars
  -- chain selection thread
  void $ forkIO $ longestChainSelection candidateChainVars cpsVar

-- | Relay node, which takes @'NodeChannels'@ to communicate with its peers
-- (upstream and downstream).  If it is subscribed to n nodes and has
-- m subscriptions, it will run n consumer end protocols which listen for
-- updates; verify chains and select the longest one and feed it to the producer
-- side which sends updates to its m subscribers.
--
-- The main thread of the @'relayNode'@ is not blocking; it will return
-- @StrictTVar ('ChainProducerState' block)@. This allows to extend the relay
-- node to a core node.
relayNode :: forall m block.
             ( MonadFork m
             , MonadTimer m
             , MonadThrow m
             , MonadSay m
             , HasFullHeader block
             , Show block
             , ShowProxy block
             )
          => NodeId
          -> Chain block
          -> NodeChannels m block (Tip block)
          -> m (StrictTVar m (ChainProducerState block))
relayNode :: forall (m :: * -> *) block.
(MonadFork m, MonadTimer m, MonadThrow m, MonadSay m,
 HasFullHeader block, Show block, ShowProxy block) =>
NodeId
-> Chain block
-> NodeChannels m block (Tip block)
-> m (StrictTVar m (ChainProducerState block))
relayNode NodeId
_nid Chain block
initChain NodeChannels m block (Tip block)
chans = do
  -- Mutable state
  -- 1. input chains
  upstream <- (Int
 -> Channel
      m (AnyMessage (ChainSync block (Point block) (Tip block)))
 -> m (StrictTVar m (Chain block)))
-> [Int]
-> [Channel
      m (AnyMessage (ChainSync block (Point block) (Tip block)))]
-> m [StrictTVar m (Chain block)]
forall (m :: * -> *) a b c.
Applicative m =>
(a -> b -> m c) -> [a] -> [b] -> m [c]
zipWithM Int
-> Channel
     m (AnyMessage (ChainSync block (Point block) (Tip block)))
-> m (StrictTVar m (Chain block))
startConsumer [Int
0..] (NodeChannels m block (Tip block)
-> [Channel
      m (AnyMessage (ChainSync block (Point block) (Tip block)))]
forall {k} {k2} (m :: * -> *) (block :: k) (tip :: k2).
NodeChannels m block tip
-> [Channel m (AnyMessage (ChainSync block (Point block) tip))]
consumerChans NodeChannels m block (Tip block)
chans)
  -- 2. ChainProducerState
  cpsVar <- atomically $ newTVar (initChainProducerState initChain)

  forkRelayKernel upstream cpsVar

  -- producers which share @'ChainProducerState'@
  let producer = ChainSyncServer block (Point block) (Tip block) m ()
-> Server
     (ChainSync block (Point block) (Tip block))
     'NonPipelined
     'StIdle
     m
     ()
forall header point tip (m :: * -> *) a.
Monad m =>
ChainSyncServer header point tip m a
-> Server (ChainSync header point tip) 'NonPipelined 'StIdle m a
chainSyncServerPeer (()
-> StrictTVar m (ChainProducerState block)
-> (block -> block)
-> ChainSyncServer block (Point block) (Tip block) m ()
forall blk header (m :: * -> *) a.
(HasHeader blk, MonadSTM m, HeaderHash header ~ HeaderHash blk) =>
a
-> StrictTVar m (ChainProducerState blk)
-> (blk -> header)
-> ChainSyncServer header (Point blk) (Tip blk) m a
chainSyncServerExample () StrictTVar m (ChainProducerState block)
cpsVar block -> block
forall a. a -> a
id)
  mapM_ (uncurry $ startProducer producer) (zip [0..] (producerChans chans))

  return cpsVar
  where
    -- Note: there is asymmetry between producers and consumers: we run single
    -- @'ProducerHandlers'@ and multiple @'ConsumerHandlers'@.  An efficient
    -- implementation should run a as many producers as channels and not share
    -- state between producers than necessary (here are producers share chain
    -- state and all the reader states, while we could share just the chain).
    startConsumer :: Int
                  -> Channel m (AnyMessage (ChainSync block (Point block) (Tip block)))
                  -> m (StrictTVar m (Chain block))
    startConsumer :: Int
-> Channel
     m (AnyMessage (ChainSync block (Point block) (Tip block)))
-> m (StrictTVar m (Chain block))
startConsumer Int
_cid Channel m (AnyMessage (ChainSync block (Point block) (Tip block)))
channel = do
      chainVar <- STM m (StrictTVar m (Chain block))
-> m (StrictTVar m (Chain block))
forall a. HasCallStack => STM m a -> m a
forall (m :: * -> *) a.
(MonadSTM m, HasCallStack) =>
STM m a -> m a
atomically (STM m (StrictTVar m (Chain block))
 -> m (StrictTVar m (Chain block)))
-> STM m (StrictTVar m (Chain block))
-> m (StrictTVar m (Chain block))
forall a b. (a -> b) -> a -> b
$ Chain block -> STM m (StrictTVar m (Chain block))
forall (m :: * -> *) a. MonadSTM m => a -> STM m (StrictTVar m a)
newTVar Chain block
forall block. Chain block
Genesis
      let consumer = ChainSyncClient block (Point block) tip m a
-> Client
     (ChainSync block (Point block) tip) 'NonPipelined 'StIdle m a
forall header point tip (m :: * -> *) a.
Monad m =>
ChainSyncClient header point tip m a
-> Client (ChainSync header point tip) 'NonPipelined 'StIdle m a
chainSyncClientPeer (StrictTVar m (Chain block)
-> Client block (Point block) tip m a
-> ChainSyncClient block (Point block) tip m a
forall header block tip (m :: * -> *) a.
(HasHeader header, HasHeader block,
 HeaderHash header ~ HeaderHash block, MonadSTM m) =>
StrictTVar m (Chain header)
-> Client header (Point block) tip m a
-> ChainSyncClient header (Point block) tip m a
chainSyncClientExample StrictTVar m (Chain block)
chainVar Client block (Point block) tip m a
forall (m :: * -> *) header point tip void.
Applicative m =>
Client header point tip m void
pureClient)
      void $ forkIO $ void $ runPeer nullTracer
                                   codecChainSyncId
                                   channel
                                   consumer
      return chainVar

    startProducer :: Server (ChainSync block (Point block) (Tip block)) NonPipelined StIdle m ()
                  -> Int
                  -> Channel m (AnyMessage (ChainSync block (Point block) (Tip block)))
                  -> m ()
    startProducer :: Server
  (ChainSync block (Point block) (Tip block))
  'NonPipelined
  'StIdle
  m
  ()
-> Int
-> Channel
     m (AnyMessage (ChainSync block (Point block) (Tip block)))
-> m ()
startProducer Server
  (ChainSync block (Point block) (Tip block))
  'NonPipelined
  'StIdle
  m
  ()
producer Int
_pid Channel m (AnyMessage (ChainSync block (Point block) (Tip block)))
channel =
      -- use 'void' because 'forkIO' only works with 'm ()'
      -- No sense throwing on Unexpected right? since forkIO will just squelch
      -- it. FIXME: use async...
      m (ThreadId m) -> m ()
forall (f :: * -> *) a. Functor f => f a -> f ()
void (m (ThreadId m) -> m ()) -> m (ThreadId m) -> m ()
forall a b. (a -> b) -> a -> b
$ m () -> m (ThreadId m)
forall (m :: * -> *). MonadFork m => m () -> m (ThreadId m)
forkIO (m () -> m (ThreadId m)) -> m () -> m (ThreadId m)
forall a b. (a -> b) -> a -> b
$ m ((),
   Maybe (AnyMessage (ChainSync block (Point block) (Tip block))))
-> m ()
forall (f :: * -> *) a. Functor f => f a -> f ()
void (m ((),
    Maybe (AnyMessage (ChainSync block (Point block) (Tip block))))
 -> m ())
-> m ((),
      Maybe (AnyMessage (ChainSync block (Point block) (Tip block))))
-> m ()
forall a b. (a -> b) -> a -> b
$ Tracer
  m (TraceSendRecv (ChainSync block (Point block) (Tip block)))
-> Codec
     (ChainSync block (Point block) (Tip block))
     CodecFailure
     m
     (AnyMessage (ChainSync block (Point block) (Tip block)))
-> Channel
     m (AnyMessage (ChainSync block (Point block) (Tip block)))
-> Server
     (ChainSync block (Point block) (Tip block))
     'NonPipelined
     'StIdle
     m
     ()
-> m ((),
      Maybe (AnyMessage (ChainSync block (Point block) (Tip block))))
forall ps (st :: ps) (pr :: PeerRole) failure bytes (m :: * -> *)
       a.
(MonadThrow m, ShowProxy ps,
 forall (st' :: ps) stok. (stok ~ StateToken st') => Show stok,
 Show failure) =>
Tracer m (TraceSendRecv ps)
-> Codec ps failure m bytes
-> Channel m bytes
-> Peer ps pr 'NonPipelined st m a
-> m (a, Maybe bytes)
runPeer Tracer
  m (TraceSendRecv (ChainSync block (Point block) (Tip block)))
forall (m :: * -> *) a. Applicative m => Tracer m a
nullTracer
                                     Codec
  (ChainSync block (Point block) (Tip block))
  CodecFailure
  m
  (AnyMessage (ChainSync block (Point block) (Tip block)))
forall {k} {k1} {k2} (header :: k) (point :: k1) (tip :: k2)
       (m :: * -> *).
Monad m =>
Codec
  (ChainSync header point tip)
  CodecFailure
  m
  (AnyMessage (ChainSync header point tip))
codecChainSyncId
                                     Channel m (AnyMessage (ChainSync block (Point block) (Tip block)))
channel
                                     Server
  (ChainSync block (Point block) (Tip block))
  'NonPipelined
  'StIdle
  m
  ()
producer

-- | Core node simulation.  Given a chain it will generate a @block@ at its
-- slot time (i.e. @slotDuration * blockSlot block@).  When the node finds out
-- that the slot for which it was supposed to generate a block was already
-- occupied, it will replace it with its block.
--
-- TODO: This should not take a list of blocks, but rather a monadic action
-- to generate the blocks. At that point the 'fixup' argument can go also.
-- Alternatively, we should move this to the tests, and remove it from the
-- public network layer altogether.
--
forkCoreKernel :: forall block m.
                  ( HasFullHeader block
                  , MonadDelay m
                  , MonadLabelledSTM m
                  , MonadFork m
                  , MonadTimer m
                  )
               => DiffTime
               -- ^ slot duration
               -> [block]
               -- ^ Blocks to produce (in order they should be produced)
               -> (Chain block -> block -> block)
               -> StrictTVar m (ChainProducerState block)
               -> m ()
forkCoreKernel :: forall block (m :: * -> *).
(HasFullHeader block, MonadDelay m, MonadLabelledSTM m,
 MonadFork m, MonadTimer m) =>
DiffTime
-> [block]
-> (Chain block -> block -> block)
-> StrictTVar m (ChainProducerState block)
-> m ()
forkCoreKernel DiffTime
slotDuration [block]
gchain Chain block -> block -> block
fixupBlock StrictTVar m (ChainProducerState block)
cpsVar = do
  getBlock <- DiffTime -> [block] -> m (STM m (Maybe block))
forall block (m :: * -> *).
(HasHeader block, MonadDelay m, MonadSTM m, MonadFork m) =>
DiffTime -> [block] -> m (STM m (Maybe block))
blockGenerator DiffTime
slotDuration [block]
gchain
  void $ forkIO $ applyGeneratedBlock getBlock

  where
    applyGeneratedBlock
      :: STM m (Maybe block)
      -> m ()
    applyGeneratedBlock :: STM m (Maybe block) -> m ()
applyGeneratedBlock STM m (Maybe block)
getBlock = do
      cont <- STM m Bool -> m Bool
forall a. HasCallStack => STM m a -> m a
forall (m :: * -> *) a.
(MonadSTM m, HasCallStack) =>
STM m a -> m a
atomically (STM m Bool -> m Bool) -> STM m Bool -> m Bool
forall a b. (a -> b) -> a -> b
$ do
        mblock <- STM m (Maybe block)
getBlock
        case mblock of
          Maybe block
Nothing    -> Bool -> STM m Bool
forall a. a -> STM m a
forall (m :: * -> *) a. Monad m => a -> m a
return Bool
False
          Just block
block -> do
            cps@ChainProducerState{chainState = chain} <- StrictTVar m (ChainProducerState block)
-> STM m (ChainProducerState block)
forall (m :: * -> *) a. MonadSTM m => StrictTVar m a -> STM m a
readTVar StrictTVar m (ChainProducerState block)
cpsVar
            writeTVar cpsVar (switchFork (addBlock chain block) cps)
            return True
      if cont
        then applyGeneratedBlock getBlock
        else return ()

    addBlock :: Chain block -> block -> Chain block
    addBlock :: Chain block -> block -> Chain block
addBlock Chain block
c block
b =
      case SlotNo -> WithOrigin SlotNo
forall t. t -> WithOrigin t
At (block -> SlotNo
forall b. HasHeader b => b -> SlotNo
blockSlot block
b) WithOrigin SlotNo -> WithOrigin SlotNo -> Ordering
forall a. Ord a => a -> a -> Ordering
`compare` Chain block -> WithOrigin SlotNo
forall block. HasHeader block => Chain block -> WithOrigin SlotNo
Chain.headSlot Chain block
c of
        Ordering
LT -> String -> Chain block
forall a. HasCallStack => String -> a
error String
"blockGenerator invariant vaiolation: generated block is for slot in the past"
        -- the block is OK (slot number _can_ be equal).
        Ordering
_ -> let r :: Chain block
r = block -> Chain block -> Chain block
forall block.
HasHeader block =>
block -> Chain block -> Chain block
Chain.addBlock (Chain block -> block -> block
fixupBlock Chain block
c block
b) Chain block
c in
              Bool -> Chain block -> Chain block
forall a. HasCallStack => Bool -> a -> a
assert (Chain block -> Bool
forall block. HasFullHeader block => Chain block -> Bool
Chain.valid Chain block
r) Chain block
r

-- | Core node simulation.  Given a chain it will generate a @block@ at its
-- slot time (i.e. @slotDuration * blockSlot block@).  When the node finds out
-- that the slot for which it was supposed to generate a block was already
-- occupied, it will replace it with its block.
--
coreNode :: forall m.
        ( MonadDelay m
        , MonadLabelledSTM m
        , MonadFork m
        , MonadThrow m
        , MonadTimer m
        , MonadSay m
        )
     => NodeId
     -> DiffTime
     -- ^ slot duration
     -> [Block]
     -> NodeChannels m Block (Tip Block)
     -> m (StrictTVar m (ChainProducerState Block))
coreNode :: forall (m :: * -> *).
(MonadDelay m, MonadLabelledSTM m, MonadFork m, MonadThrow m,
 MonadTimer m, MonadSay m) =>
NodeId
-> DiffTime
-> [Block]
-> NodeChannels m Block (Tip Block)
-> m (StrictTVar m (ChainProducerState Block))
coreNode NodeId
nid DiffTime
slotDuration [Block]
gchain NodeChannels m Block (Tip Block)
chans = do
  cpsVar <- NodeId
-> Chain Block
-> NodeChannels m Block (Tip Block)
-> m (StrictTVar m (ChainProducerState Block))
forall (m :: * -> *) block.
(MonadFork m, MonadTimer m, MonadThrow m, MonadSay m,
 HasFullHeader block, Show block, ShowProxy block) =>
NodeId
-> Chain block
-> NodeChannels m block (Tip block)
-> m (StrictTVar m (ChainProducerState block))
relayNode NodeId
nid Chain Block
forall block. Chain block
Genesis NodeChannels m Block (Tip Block)
chans
  forkCoreKernel slotDuration gchain fixupBlock cpsVar
  return cpsVar
  where
    fixupBlock :: Chain Block -> Block -> Block
    fixupBlock :: Chain Block -> Block -> Block
fixupBlock Chain Block
c = Anchor Block -> Block -> Block
forall block.
(HeaderHash block ~ HeaderHash BlockHeader) =>
Anchor block -> Block -> Block
Concrete.fixupBlock (Chain Block -> Anchor Block
forall block. HasHeader block => Chain block -> Anchor block
Chain.headAnchor Chain Block
c)