{-# LANGUAGE CPP #-}
{-# LANGUAGE DataKinds #-}
{-# LANGUAGE DisambiguateRecordFields #-}
{-# LANGUAGE FlexibleContexts #-}
{-# LANGUAGE GADTs #-}
{-# LANGUAGE KindSignatures #-}
{-# LANGUAGE NamedFieldPuns #-}
{-# LANGUAGE RankNTypes #-}
{-# LANGUAGE ScopedTypeVariables #-}
module Cardano.Network.Diffusion
( module Cardano.Network.Diffusion.Types
, run
) where
import Control.Monad.Class.MonadThrow
import Control.Tracer (traceWith)
import Data.Set qualified as Set
import Data.Void (Void)
import System.Exit (ExitCode)
import Cardano.Network.Diffusion.Configuration qualified as Cardano
import Cardano.Network.Diffusion.Handlers qualified as Cardano
import Cardano.Network.Diffusion.Types
import Cardano.Network.LedgerPeerConsensusInterface qualified as Cardano
import Cardano.Network.NodeToClient qualified as NodeToClient
import Cardano.Network.NodeToNode (NodeToNodeVersionData (..), RemoteAddress,
ntnDataFlow)
import Cardano.Network.NodeToNode qualified as NodeToNode
import Cardano.Network.PeerSelection.Churn qualified as Cardano.Churn
import Cardano.Network.PeerSelection.ExtraRootPeers qualified as Cardano
import Cardano.Network.PeerSelection.Governor.PeerSelectionActions qualified as Cardano
import Cardano.Network.PeerSelection.Governor.PeerSelectionState qualified as Cardano.PeerSelectionState
import Cardano.Network.PeerSelection.Governor.Types qualified as Cardano.Types
import Cardano.Network.PeerSelection.PeerSelectionActions qualified as Cardano
import Ouroboros.Network.Diffusion qualified as Diffusion
import Ouroboros.Network.IOManager
import Ouroboros.Network.PeerSelection.LedgerPeers.Type
(LedgerPeersConsensusInterface (..))
import Ouroboros.Network.PeerSelection.PeerStateActions
import Ouroboros.Network.Protocol.Handshake
run :: CardanoNodeArguments IO
-> CardanoConsensusArguments RemoteAddress IO
-> CardanoTracers IO
-> CardanoConfiguration IO
-> CardanoApplications IO a
-> IO Void
run :: forall a.
CardanoNodeArguments IO
-> CardanoConsensusArguments RemoteAddress IO
-> CardanoTracers IO
-> CardanoConfiguration IO
-> CardanoApplications IO a
-> IO Void
run CardanoNodeArguments {
ConsensusMode
consensusMode :: ConsensusMode
consensusMode :: forall (m :: * -> *). CardanoNodeArguments m -> ConsensusMode
consensusMode,
PeerSelectionTargets
genesisPeerSelectionTargets :: PeerSelectionTargets
genesisPeerSelectionTargets :: forall (m :: * -> *).
CardanoNodeArguments m -> PeerSelectionTargets
genesisPeerSelectionTargets,
NumberOfBigLedgerPeers
minNumOfBigLedgerPeers :: NumberOfBigLedgerPeers
minNumOfBigLedgerPeers :: forall (m :: * -> *).
CardanoNodeArguments m -> NumberOfBigLedgerPeers
minNumOfBigLedgerPeers,
Tracer IO TraceChurnMode
tracerChurnMode :: Tracer IO TraceChurnMode
tracerChurnMode :: forall (m :: * -> *).
CardanoNodeArguments m -> Tracer m TraceChurnMode
tracerChurnMode
}
CardanoConsensusArguments {
StrictTVar IO ChurnMode
churnModeVar :: StrictTVar IO ChurnMode
churnModeVar :: forall ntnAddr (m :: * -> *).
CardanoConsensusArguments ntnAddr m -> StrictTVar m ChurnMode
churnModeVar,
PeerMetrics IO RemoteAddress
churnMetrics :: PeerMetrics IO RemoteAddress
churnMetrics :: forall ntnAddr (m :: * -> *).
CardanoConsensusArguments ntnAddr m -> PeerMetrics m ntnAddr
churnMetrics,
LedgerPeersConsensusInterface (LedgerPeersConsensusInterface IO) IO
ledgerPeersAPI :: LedgerPeersConsensusInterface (LedgerPeersConsensusInterface IO) IO
ledgerPeersAPI :: forall ntnAddr (m :: * -> *).
CardanoConsensusArguments ntnAddr m
-> LedgerPeersConsensusInterface
(LedgerPeersConsensusInterface m) m
ledgerPeersAPI,
STM IO UseBootstrapPeers
readUseBootstrapPeers :: forall ntnAddr (m :: * -> *).
CardanoConsensusArguments ntnAddr m -> STM m UseBootstrapPeers
readUseBootstrapPeers :: STM IO UseBootstrapPeers
readUseBootstrapPeers
}
CardanoTracers IO
tracers CardanoConfiguration IO
config CardanoApplications IO a
apps = do
let tracer :: Tracer IO (DiffusionTracer RemoteAddress LocalAddress)
tracer = CardanoTracers IO
-> Tracer IO (DiffusionTracer RemoteAddress LocalAddress)
forall ntnAddr ntnVersion ntnVersionData ntcAddr ntcVersion
ntcVersionData extraState extraDebugState extraFlags extraPeers
extraCounters (m :: * -> *).
Tracers
ntnAddr
ntnVersion
ntnVersionData
ntcAddr
ntcVersion
ntcVersionData
extraState
extraDebugState
extraFlags
extraPeers
extraCounters
m
-> Tracer m (DiffusionTracer ntnAddr ntcAddr)
Diffusion.dtDiffusionTracer CardanoTracers IO
tracers
daNtnHandshakeArguments :: HandshakeArguments
(ConnectionId RemoteAddress)
NodeToNodeVersion
NodeToNodeVersionData
IO
daNtnHandshakeArguments =
HandshakeArguments {
haBearerTracer :: Tracer IO (WithBearer (ConnectionId RemoteAddress) BearerTrace)
haBearerTracer = CardanoTracers IO
-> Tracer IO (WithBearer (ConnectionId RemoteAddress) BearerTrace)
forall ntnAddr ntnVersion ntnVersionData ntcAddr ntcVersion
ntcVersionData extraState extraDebugState extraFlags extraPeers
extraCounters (m :: * -> *).
Tracers
ntnAddr
ntnVersion
ntnVersionData
ntcAddr
ntcVersion
ntcVersionData
extraState
extraDebugState
extraFlags
extraPeers
extraCounters
m
-> Tracer m (WithBearer (ConnectionId ntnAddr) BearerTrace)
Diffusion.dtBearerTracer CardanoTracers IO
tracers,
haHandshakeTracer :: Tracer
IO
(WithBearer
(ConnectionId RemoteAddress)
(TraceSendRecv (Handshake NodeToNodeVersion Term)))
haHandshakeTracer = CardanoTracers IO
-> Tracer
IO
(WithBearer
(ConnectionId RemoteAddress)
(TraceSendRecv (Handshake NodeToNodeVersion Term)))
forall ntnAddr ntnVersion ntnVersionData ntcAddr ntcVersion
ntcVersionData extraState extraDebugState extraFlags extraPeers
extraCounters (m :: * -> *).
Tracers
ntnAddr
ntnVersion
ntnVersionData
ntcAddr
ntcVersion
ntcVersionData
extraState
extraDebugState
extraFlags
extraPeers
extraCounters
m
-> Tracer
m
(WithBearer
(ConnectionId ntnAddr) (TraceSendRecv (Handshake ntnVersion Term)))
Diffusion.dtHandshakeTracer CardanoTracers IO
tracers,
haHandshakeCodec :: Codec
(Handshake NodeToNodeVersion Term) DeserialiseFailure IO ByteString
haHandshakeCodec = Codec
(Handshake NodeToNodeVersion Term) DeserialiseFailure IO ByteString
forall (m :: * -> *).
MonadST m =>
Codec
(Handshake NodeToNodeVersion Term) DeserialiseFailure m ByteString
NodeToNode.nodeToNodeHandshakeCodec,
haVersionDataCodec :: VersionDataCodec Term NodeToNodeVersion NodeToNodeVersionData
haVersionDataCodec =
(NodeToNodeVersion -> CodecCBORTerm Text NodeToNodeVersionData)
-> VersionDataCodec Term NodeToNodeVersion NodeToNodeVersionData
forall vNumber vData.
(vNumber -> CodecCBORTerm Text vData)
-> VersionDataCodec Term vNumber vData
cborTermVersionDataCodec
NodeToNodeVersion -> CodecCBORTerm Text NodeToNodeVersionData
NodeToNode.nodeToNodeCodecCBORTerm,
haAcceptVersion :: NodeToNodeVersionData
-> NodeToNodeVersionData -> Accept NodeToNodeVersionData
haAcceptVersion = NodeToNodeVersionData
-> NodeToNodeVersionData -> Accept NodeToNodeVersionData
forall v. Acceptable v => v -> v -> Accept v
acceptableVersion,
haQueryVersion :: NodeToNodeVersionData -> Bool
haQueryVersion = NodeToNodeVersionData -> Bool
forall v. Queryable v => v -> Bool
queryVersion,
haTimeLimits :: ProtocolTimeLimits (Handshake NodeToNodeVersion Term)
haTimeLimits = ProtocolTimeLimits (Handshake NodeToNodeVersion Term)
forall {k} (vNumber :: k).
ProtocolTimeLimits (Handshake vNumber Term)
timeLimitsHandshake
}
daNtcHandshakeArguments :: HandshakeArguments
(ConnectionId LocalAddress)
NodeToClientVersion
NodeToClientVersionData
IO
daNtcHandshakeArguments =
HandshakeArguments {
haBearerTracer :: Tracer IO (WithBearer (ConnectionId LocalAddress) BearerTrace)
haBearerTracer = CardanoTracers IO
-> Tracer IO (WithBearer (ConnectionId LocalAddress) BearerTrace)
forall ntnAddr ntnVersion ntnVersionData ntcAddr ntcVersion
ntcVersionData extraState extraDebugState extraFlags extraPeers
extraCounters (m :: * -> *).
Tracers
ntnAddr
ntnVersion
ntnVersionData
ntcAddr
ntcVersion
ntcVersionData
extraState
extraDebugState
extraFlags
extraPeers
extraCounters
m
-> Tracer m (WithBearer (ConnectionId ntcAddr) BearerTrace)
Diffusion.dtLocalBearerTracer CardanoTracers IO
tracers,
haHandshakeTracer :: Tracer
IO
(WithBearer
(ConnectionId LocalAddress)
(TraceSendRecv (Handshake NodeToClientVersion Term)))
haHandshakeTracer = CardanoTracers IO
-> Tracer
IO
(WithBearer
(ConnectionId LocalAddress)
(TraceSendRecv (Handshake NodeToClientVersion Term)))
forall ntnAddr ntnVersion ntnVersionData ntcAddr ntcVersion
ntcVersionData extraState extraDebugState extraFlags extraPeers
extraCounters (m :: * -> *).
Tracers
ntnAddr
ntnVersion
ntnVersionData
ntcAddr
ntcVersion
ntcVersionData
extraState
extraDebugState
extraFlags
extraPeers
extraCounters
m
-> Tracer
m
(WithBearer
(ConnectionId ntcAddr) (TraceSendRecv (Handshake ntcVersion Term)))
Diffusion.dtLocalHandshakeTracer CardanoTracers IO
tracers,
haHandshakeCodec :: Codec
(Handshake NodeToClientVersion Term)
DeserialiseFailure
IO
ByteString
haHandshakeCodec = Codec
(Handshake NodeToClientVersion Term)
DeserialiseFailure
IO
ByteString
forall (m :: * -> *).
MonadST m =>
Codec
(Handshake NodeToClientVersion Term)
DeserialiseFailure
m
ByteString
NodeToClient.nodeToClientHandshakeCodec,
haVersionDataCodec :: VersionDataCodec Term NodeToClientVersion NodeToClientVersionData
haVersionDataCodec =
(NodeToClientVersion -> CodecCBORTerm Text NodeToClientVersionData)
-> VersionDataCodec
Term NodeToClientVersion NodeToClientVersionData
forall vNumber vData.
(vNumber -> CodecCBORTerm Text vData)
-> VersionDataCodec Term vNumber vData
cborTermVersionDataCodec
NodeToClientVersion -> CodecCBORTerm Text NodeToClientVersionData
NodeToClient.nodeToClientCodecCBORTerm,
haAcceptVersion :: NodeToClientVersionData
-> NodeToClientVersionData -> Accept NodeToClientVersionData
haAcceptVersion = NodeToClientVersionData
-> NodeToClientVersionData -> Accept NodeToClientVersionData
forall v. Acceptable v => v -> v -> Accept v
acceptableVersion,
haQueryVersion :: NodeToClientVersionData -> Bool
haQueryVersion = NodeToClientVersionData -> Bool
forall v. Queryable v => v -> Bool
queryVersion,
haTimeLimits :: ProtocolTimeLimits (Handshake NodeToClientVersion Term)
haTimeLimits = ProtocolTimeLimits (Handshake NodeToClientVersion Term)
forall {k} (vNumber :: k).
ProtocolTimeLimits (Handshake vNumber Term)
noTimeLimitsHandshake
}
(SomeException -> Maybe SomeException)
-> (SomeException -> IO Void) -> IO Void -> IO Void
forall e b a.
Exception e =>
(e -> Maybe b) -> (b -> IO a) -> IO a -> IO a
forall (m :: * -> *) e b a.
(MonadCatch m, Exception e) =>
(e -> Maybe b) -> (b -> m a) -> m a -> m a
handleJust (\SomeException
e -> case SomeException -> Maybe ExitCode
forall e. Exception e => SomeException -> Maybe e
fromException SomeException
e :: Maybe ExitCode of
Maybe ExitCode
Nothing -> SomeException -> Maybe SomeException
forall a. a -> Maybe a
Just SomeException
e
Just {} -> Maybe SomeException
forall a. Maybe a
Nothing)
(\SomeException
e -> Tracer IO (DiffusionTracer RemoteAddress LocalAddress)
-> DiffusionTracer RemoteAddress LocalAddress -> IO ()
forall (m :: * -> *) a. Tracer m a -> a -> m ()
traceWith Tracer IO (DiffusionTracer RemoteAddress LocalAddress)
tracer (SomeException -> DiffusionTracer RemoteAddress LocalAddress
forall ntnAddr ntcAddr.
SomeException -> DiffusionTracer ntnAddr ntcAddr
Diffusion.DiffusionErrored SomeException
e)
IO () -> IO Void -> IO Void
forall a b. IO a -> IO b -> IO b
forall (m :: * -> *) a b. Monad m => m a -> m b -> m b
>> Failure -> IO Void
forall e a. Exception e => e -> IO a
forall (m :: * -> *) e a. (MonadThrow m, Exception e) => e -> m a
throwIO (SomeException -> Failure
Diffusion.DiffusionError SomeException
e))
(IO Void -> IO Void) -> IO Void -> IO Void
forall a b. (a -> b) -> a -> b
$ (IOManager -> IO Void) -> IO Void
WithIOManager
withIOManager ((IOManager -> IO Void) -> IO Void)
-> (IOManager -> IO Void) -> IO Void
forall a b. (a -> b) -> a -> b
$ \IOManager
iocp -> do
interfaces <- IOManager
-> Tracer IO (DiffusionTracer RemoteAddress LocalAddress)
-> DiffTime
-> IO
(Interfaces
Socket RemoteAddress LocalSocket LocalAddress Resolver IO)
forall ntnAddr ntcAddr.
IOManager
-> Tracer IO (DiffusionTracer ntnAddr ntcAddr)
-> DiffTime
-> IO
(Interfaces
Socket RemoteAddress LocalSocket LocalAddress Resolver IO)
Diffusion.mkInterfaces IOManager
iocp Tracer IO (DiffusionTracer RemoteAddress LocalAddress)
tracer (CardanoConfiguration IO -> DiffTime
forall extraFlags (m :: * -> *) ntnFd ntnAddr ntcFd ntcAddr.
Configuration extraFlags m ntnFd ntnAddr ntcFd ntcAddr -> DiffTime
Diffusion.dcEgressPollInterval CardanoConfiguration IO
config)
Diffusion.runM
interfaces
tracers
Diffusion.Arguments {
daNtnDataFlow = ntnDataFlow,
daNtnPeerSharing = peerSharing,
daUpdateVersionData = \NodeToNodeVersionData
versionData DiffusionMode
diffusionMode -> NodeToNodeVersionData
versionData { diffusionMode },
daNtnHandshakeArguments,
daNtcHandshakeArguments,
daLedgerPeersCtx = ledgerPeersAPI,
daEmptyExtraState =
Cardano.PeerSelectionState.empty
consensusMode
minNumOfBigLedgerPeers,
daEmptyExtraCounters = Cardano.Types.empty,
daExtraPeersAPI = Cardano.cardanoPublicRootPeersAPI,
daInstallSigUSR1Handler =
Cardano.sigUSR1Handler
tracers
(Diffusion.dcReadUseLedgerPeers config)
(Diffusion.dcPeerSharing config)
readUseBootstrapPeers
(Cardano.getLedgerStateJudgement (lpExtraAPI ledgerPeersAPI))
churnMetrics
getPromotedHotTime,
daPeerSelectionGovernorArgs =
Cardano.Types.cardanoPeerSelectionGovernorArgs
Cardano.ExtraPeerSelectionActions {
Cardano.genesisPeerSelectionTargets = genesisPeerSelectionTargets,
Cardano.readUseBootstrapPeers = readUseBootstrapPeers
},
daPeerSelectionStateToExtraCounters = Cardano.Types.cardanoPeerSelectionStatetoCounters,
daToExtraPeers = flip Cardano.ExtraPeers Set.empty,
daRequestPublicRootPeers =
Just $ Cardano.requestPublicRootPeers
(Diffusion.dtTracePublicRootPeersTracer tracers)
readUseBootstrapPeers
(Cardano.getLedgerStateJudgement (lpExtraAPI ledgerPeersAPI))
(Diffusion.dcReadPublicRootPeers config),
daPeerChurnGovernor = Cardano.Churn.peerChurnGovernor,
daExtraChurnArgs =
Cardano.Churn.ExtraArguments {
Cardano.Churn.modeVar = churnModeVar,
Cardano.Churn.genesisPeerSelectionTargets
= genesisPeerSelectionTargets,
Cardano.Churn.readUseBootstrap = readUseBootstrapPeers,
Cardano.Churn.consensusMode = consensusMode,
Cardano.Churn.tracerChurnMode = tracerChurnMode
},
daSRVPrefix = Cardano.srvPrefix
}
config apps