{-# LANGUAGE CPP                      #-}
{-# LANGUAGE DataKinds                #-}
{-# LANGUAGE DisambiguateRecordFields #-}
{-# LANGUAGE FlexibleContexts         #-}
{-# LANGUAGE GADTs                    #-}
{-# LANGUAGE KindSignatures           #-}
{-# LANGUAGE NamedFieldPuns           #-}
{-# LANGUAGE RankNTypes               #-}
{-# LANGUAGE ScopedTypeVariables      #-}

-- | This module is expected to be imported qualified (it will clash
-- with the "Ouroboros.Network.Diffusion.NonP2P").
--
module Cardano.Network.Diffusion
  ( module Cardano.Network.Diffusion.Types
  , run
  ) where

import Control.Monad.Class.MonadThrow
import Control.Tracer (traceWith)
import Data.Set qualified as Set
import Data.Void (Void)
import System.Exit (ExitCode)

import Cardano.Network.Diffusion.Configuration qualified as Cardano
import Cardano.Network.Diffusion.Handlers qualified as Cardano
import Cardano.Network.Diffusion.Types
import Cardano.Network.LedgerPeerConsensusInterface qualified as Cardano
import Cardano.Network.NodeToClient qualified as NodeToClient
import Cardano.Network.NodeToNode (NodeToNodeVersionData (..), RemoteAddress,
           ntnDataFlow)
import Cardano.Network.NodeToNode qualified as NodeToNode
import Cardano.Network.PeerSelection.Churn qualified as Cardano.Churn
import Cardano.Network.PeerSelection.ExtraRootPeers qualified as Cardano
import Cardano.Network.PeerSelection.Governor.PeerSelectionActions qualified as Cardano
import Cardano.Network.PeerSelection.Governor.PeerSelectionState qualified as Cardano.PeerSelectionState
import Cardano.Network.PeerSelection.Governor.Types qualified as Cardano.Types
import Cardano.Network.PeerSelection.PeerSelectionActions qualified as Cardano

import Ouroboros.Network.Diffusion qualified as Diffusion
import Ouroboros.Network.IOManager
import Ouroboros.Network.PeerSelection.LedgerPeers.Type
           (LedgerPeersConsensusInterface (..))
import Ouroboros.Network.PeerSelection.PeerStateActions
import Ouroboros.Network.Protocol.Handshake

-- | Main entry point for Cardano data diffusion service.  It allows to:
--
-- * connect to upstream peers;
-- * accept connection from downstream peers, if run in
--  'InitiatorAndResponderDiffusionMode'.
-- * runs a local service which allows to use node-to-client protocol to obtain
--   information from the running system.  This is used by 'cardano-cli' or
--   a wallet and a like local services.
--
run :: CardanoNodeArguments IO
    -- ^ node API: instantiated in `cardano-node`.
    -> CardanoConsensusArguments RemoteAddress IO
    -- ^ consensus API; instantiated in `ouroboros-consensus-diffusion` (with
    -- exception of `readUseBootstrapPeers` field).
    -> CardanoTracers IO
    -- ^ generic diffusion tracers; instantiated in `cardano-node`.
    -> CardanoConfiguration IO
    -- ^ generic diffusion configuration; instantiated in `cardano-node`.
    -> CardanoApplications IO a
    -- ^ cardano specific applications; instantiated in
    -- `ouroboros-consensus-diffusion`.
    -> IO Void
run :: forall a.
CardanoNodeArguments IO
-> CardanoConsensusArguments RemoteAddress IO
-> CardanoTracers IO
-> CardanoConfiguration IO
-> CardanoApplications IO a
-> IO Void
run CardanoNodeArguments {
      ConsensusMode
consensusMode :: ConsensusMode
consensusMode :: forall (m :: * -> *). CardanoNodeArguments m -> ConsensusMode
consensusMode,
      PeerSelectionTargets
genesisPeerSelectionTargets :: PeerSelectionTargets
genesisPeerSelectionTargets :: forall (m :: * -> *).
CardanoNodeArguments m -> PeerSelectionTargets
genesisPeerSelectionTargets,
      NumberOfBigLedgerPeers
minNumOfBigLedgerPeers :: NumberOfBigLedgerPeers
minNumOfBigLedgerPeers :: forall (m :: * -> *).
CardanoNodeArguments m -> NumberOfBigLedgerPeers
minNumOfBigLedgerPeers,
      Tracer IO TraceChurnMode
tracerChurnMode :: Tracer IO TraceChurnMode
tracerChurnMode :: forall (m :: * -> *).
CardanoNodeArguments m -> Tracer m TraceChurnMode
tracerChurnMode
    }
    CardanoConsensusArguments {
      StrictTVar IO ChurnMode
churnModeVar :: StrictTVar IO ChurnMode
churnModeVar :: forall ntnAddr (m :: * -> *).
CardanoConsensusArguments ntnAddr m -> StrictTVar m ChurnMode
churnModeVar,
      PeerMetrics IO RemoteAddress
churnMetrics :: PeerMetrics IO RemoteAddress
churnMetrics :: forall ntnAddr (m :: * -> *).
CardanoConsensusArguments ntnAddr m -> PeerMetrics m ntnAddr
churnMetrics,
      LedgerPeersConsensusInterface (LedgerPeersConsensusInterface IO) IO
ledgerPeersAPI :: LedgerPeersConsensusInterface (LedgerPeersConsensusInterface IO) IO
ledgerPeersAPI :: forall ntnAddr (m :: * -> *).
CardanoConsensusArguments ntnAddr m
-> LedgerPeersConsensusInterface
     (LedgerPeersConsensusInterface m) m
ledgerPeersAPI,
      STM IO UseBootstrapPeers
readUseBootstrapPeers :: forall ntnAddr (m :: * -> *).
CardanoConsensusArguments ntnAddr m -> STM m UseBootstrapPeers
readUseBootstrapPeers :: STM IO UseBootstrapPeers
readUseBootstrapPeers
    }
    CardanoTracers IO
tracers CardanoConfiguration IO
config CardanoApplications IO a
apps = do
    let tracer :: Tracer IO (DiffusionTracer RemoteAddress LocalAddress)
tracer = CardanoTracers IO
-> Tracer IO (DiffusionTracer RemoteAddress LocalAddress)
forall ntnAddr ntnVersion ntnVersionData ntcAddr ntcVersion
       ntcVersionData extraState extraDebugState extraFlags extraPeers
       extraCounters (m :: * -> *).
Tracers
  ntnAddr
  ntnVersion
  ntnVersionData
  ntcAddr
  ntcVersion
  ntcVersionData
  extraState
  extraDebugState
  extraFlags
  extraPeers
  extraCounters
  m
-> Tracer m (DiffusionTracer ntnAddr ntcAddr)
Diffusion.dtDiffusionTracer CardanoTracers IO
tracers
        daNtnHandshakeArguments :: HandshakeArguments
  (ConnectionId RemoteAddress)
  NodeToNodeVersion
  NodeToNodeVersionData
  IO
daNtnHandshakeArguments =
          HandshakeArguments {
              haBearerTracer :: Tracer IO (WithBearer (ConnectionId RemoteAddress) BearerTrace)
haBearerTracer    = CardanoTracers IO
-> Tracer IO (WithBearer (ConnectionId RemoteAddress) BearerTrace)
forall ntnAddr ntnVersion ntnVersionData ntcAddr ntcVersion
       ntcVersionData extraState extraDebugState extraFlags extraPeers
       extraCounters (m :: * -> *).
Tracers
  ntnAddr
  ntnVersion
  ntnVersionData
  ntcAddr
  ntcVersion
  ntcVersionData
  extraState
  extraDebugState
  extraFlags
  extraPeers
  extraCounters
  m
-> Tracer m (WithBearer (ConnectionId ntnAddr) BearerTrace)
Diffusion.dtBearerTracer CardanoTracers IO
tracers,
              haHandshakeTracer :: Tracer
  IO
  (WithBearer
     (ConnectionId RemoteAddress)
     (TraceSendRecv (Handshake NodeToNodeVersion Term)))
haHandshakeTracer = CardanoTracers IO
-> Tracer
     IO
     (WithBearer
        (ConnectionId RemoteAddress)
        (TraceSendRecv (Handshake NodeToNodeVersion Term)))
forall ntnAddr ntnVersion ntnVersionData ntcAddr ntcVersion
       ntcVersionData extraState extraDebugState extraFlags extraPeers
       extraCounters (m :: * -> *).
Tracers
  ntnAddr
  ntnVersion
  ntnVersionData
  ntcAddr
  ntcVersion
  ntcVersionData
  extraState
  extraDebugState
  extraFlags
  extraPeers
  extraCounters
  m
-> Tracer
     m
     (WithBearer
        (ConnectionId ntnAddr) (TraceSendRecv (Handshake ntnVersion Term)))
Diffusion.dtHandshakeTracer CardanoTracers IO
tracers,
              haHandshakeCodec :: Codec
  (Handshake NodeToNodeVersion Term) DeserialiseFailure IO ByteString
haHandshakeCodec  = Codec
  (Handshake NodeToNodeVersion Term) DeserialiseFailure IO ByteString
forall (m :: * -> *).
MonadST m =>
Codec
  (Handshake NodeToNodeVersion Term) DeserialiseFailure m ByteString
NodeToNode.nodeToNodeHandshakeCodec,
              haVersionDataCodec :: VersionDataCodec Term NodeToNodeVersion NodeToNodeVersionData
haVersionDataCodec =
                (NodeToNodeVersion -> CodecCBORTerm Text NodeToNodeVersionData)
-> VersionDataCodec Term NodeToNodeVersion NodeToNodeVersionData
forall vNumber vData.
(vNumber -> CodecCBORTerm Text vData)
-> VersionDataCodec Term vNumber vData
cborTermVersionDataCodec
                  NodeToNodeVersion -> CodecCBORTerm Text NodeToNodeVersionData
NodeToNode.nodeToNodeCodecCBORTerm,
              haAcceptVersion :: NodeToNodeVersionData
-> NodeToNodeVersionData -> Accept NodeToNodeVersionData
haAcceptVersion = NodeToNodeVersionData
-> NodeToNodeVersionData -> Accept NodeToNodeVersionData
forall v. Acceptable v => v -> v -> Accept v
acceptableVersion,
              haQueryVersion :: NodeToNodeVersionData -> Bool
haQueryVersion = NodeToNodeVersionData -> Bool
forall v. Queryable v => v -> Bool
queryVersion,
              haTimeLimits :: ProtocolTimeLimits (Handshake NodeToNodeVersion Term)
haTimeLimits = ProtocolTimeLimits (Handshake NodeToNodeVersion Term)
forall {k} (vNumber :: k).
ProtocolTimeLimits (Handshake vNumber Term)
timeLimitsHandshake
            }
        daNtcHandshakeArguments :: HandshakeArguments
  (ConnectionId LocalAddress)
  NodeToClientVersion
  NodeToClientVersionData
  IO
daNtcHandshakeArguments =
          HandshakeArguments {
              haBearerTracer :: Tracer IO (WithBearer (ConnectionId LocalAddress) BearerTrace)
haBearerTracer     = CardanoTracers IO
-> Tracer IO (WithBearer (ConnectionId LocalAddress) BearerTrace)
forall ntnAddr ntnVersion ntnVersionData ntcAddr ntcVersion
       ntcVersionData extraState extraDebugState extraFlags extraPeers
       extraCounters (m :: * -> *).
Tracers
  ntnAddr
  ntnVersion
  ntnVersionData
  ntcAddr
  ntcVersion
  ntcVersionData
  extraState
  extraDebugState
  extraFlags
  extraPeers
  extraCounters
  m
-> Tracer m (WithBearer (ConnectionId ntcAddr) BearerTrace)
Diffusion.dtLocalBearerTracer CardanoTracers IO
tracers,
              haHandshakeTracer :: Tracer
  IO
  (WithBearer
     (ConnectionId LocalAddress)
     (TraceSendRecv (Handshake NodeToClientVersion Term)))
haHandshakeTracer  = CardanoTracers IO
-> Tracer
     IO
     (WithBearer
        (ConnectionId LocalAddress)
        (TraceSendRecv (Handshake NodeToClientVersion Term)))
forall ntnAddr ntnVersion ntnVersionData ntcAddr ntcVersion
       ntcVersionData extraState extraDebugState extraFlags extraPeers
       extraCounters (m :: * -> *).
Tracers
  ntnAddr
  ntnVersion
  ntnVersionData
  ntcAddr
  ntcVersion
  ntcVersionData
  extraState
  extraDebugState
  extraFlags
  extraPeers
  extraCounters
  m
-> Tracer
     m
     (WithBearer
        (ConnectionId ntcAddr) (TraceSendRecv (Handshake ntcVersion Term)))
Diffusion.dtLocalHandshakeTracer CardanoTracers IO
tracers,
              haHandshakeCodec :: Codec
  (Handshake NodeToClientVersion Term)
  DeserialiseFailure
  IO
  ByteString
haHandshakeCodec   = Codec
  (Handshake NodeToClientVersion Term)
  DeserialiseFailure
  IO
  ByteString
forall (m :: * -> *).
MonadST m =>
Codec
  (Handshake NodeToClientVersion Term)
  DeserialiseFailure
  m
  ByteString
NodeToClient.nodeToClientHandshakeCodec,
              haVersionDataCodec :: VersionDataCodec Term NodeToClientVersion NodeToClientVersionData
haVersionDataCodec =
                (NodeToClientVersion -> CodecCBORTerm Text NodeToClientVersionData)
-> VersionDataCodec
     Term NodeToClientVersion NodeToClientVersionData
forall vNumber vData.
(vNumber -> CodecCBORTerm Text vData)
-> VersionDataCodec Term vNumber vData
cborTermVersionDataCodec
                  NodeToClientVersion -> CodecCBORTerm Text NodeToClientVersionData
NodeToClient.nodeToClientCodecCBORTerm,
              haAcceptVersion :: NodeToClientVersionData
-> NodeToClientVersionData -> Accept NodeToClientVersionData
haAcceptVersion = NodeToClientVersionData
-> NodeToClientVersionData -> Accept NodeToClientVersionData
forall v. Acceptable v => v -> v -> Accept v
acceptableVersion,
              haQueryVersion :: NodeToClientVersionData -> Bool
haQueryVersion = NodeToClientVersionData -> Bool
forall v. Queryable v => v -> Bool
queryVersion,
              haTimeLimits :: ProtocolTimeLimits (Handshake NodeToClientVersion Term)
haTimeLimits = ProtocolTimeLimits (Handshake NodeToClientVersion Term)
forall {k} (vNumber :: k).
ProtocolTimeLimits (Handshake vNumber Term)
noTimeLimitsHandshake
            }

    -- We run two services: for /node-to-node/ and /node-to-client/.  The
    -- naming convention is that we use /local/ prefix for /node-to-client/
    -- related terms, as this is a local only service running over a unix
    -- socket / windows named pipe.
    (SomeException -> Maybe SomeException)
-> (SomeException -> IO Void) -> IO Void -> IO Void
forall e b a.
Exception e =>
(e -> Maybe b) -> (b -> IO a) -> IO a -> IO a
forall (m :: * -> *) e b a.
(MonadCatch m, Exception e) =>
(e -> Maybe b) -> (b -> m a) -> m a -> m a
handleJust (\SomeException
e -> case SomeException -> Maybe ExitCode
forall e. Exception e => SomeException -> Maybe e
fromException SomeException
e :: Maybe ExitCode of
                  Maybe ExitCode
Nothing -> SomeException -> Maybe SomeException
forall a. a -> Maybe a
Just SomeException
e
                  Just {} -> Maybe SomeException
forall a. Maybe a
Nothing)
               (\SomeException
e -> Tracer IO (DiffusionTracer RemoteAddress LocalAddress)
-> DiffusionTracer RemoteAddress LocalAddress -> IO ()
forall (m :: * -> *) a. Tracer m a -> a -> m ()
traceWith Tracer IO (DiffusionTracer RemoteAddress LocalAddress)
tracer (SomeException -> DiffusionTracer RemoteAddress LocalAddress
forall ntnAddr ntcAddr.
SomeException -> DiffusionTracer ntnAddr ntcAddr
Diffusion.DiffusionErrored SomeException
e)
                   IO () -> IO Void -> IO Void
forall a b. IO a -> IO b -> IO b
forall (m :: * -> *) a b. Monad m => m a -> m b -> m b
>> Failure -> IO Void
forall e a. Exception e => e -> IO a
forall (m :: * -> *) e a. (MonadThrow m, Exception e) => e -> m a
throwIO (SomeException -> Failure
Diffusion.DiffusionError SomeException
e))
         (IO Void -> IO Void) -> IO Void -> IO Void
forall a b. (a -> b) -> a -> b
$ (IOManager -> IO Void) -> IO Void
WithIOManager
withIOManager ((IOManager -> IO Void) -> IO Void)
-> (IOManager -> IO Void) -> IO Void
forall a b. (a -> b) -> a -> b
$ \IOManager
iocp -> do
             interfaces <- IOManager
-> Tracer IO (DiffusionTracer RemoteAddress LocalAddress)
-> DiffTime
-> IO
     (Interfaces
        Socket RemoteAddress LocalSocket LocalAddress Resolver IO)
forall ntnAddr ntcAddr.
IOManager
-> Tracer IO (DiffusionTracer ntnAddr ntcAddr)
-> DiffTime
-> IO
     (Interfaces
        Socket RemoteAddress LocalSocket LocalAddress Resolver IO)
Diffusion.mkInterfaces IOManager
iocp Tracer IO (DiffusionTracer RemoteAddress LocalAddress)
tracer (CardanoConfiguration IO -> DiffTime
forall extraFlags (m :: * -> *) ntnFd ntnAddr ntcFd ntcAddr.
Configuration extraFlags m ntnFd ntnAddr ntcFd ntcAddr -> DiffTime
Diffusion.dcEgressPollInterval CardanoConfiguration IO
config)
             Diffusion.runM
               interfaces
               tracers
               Diffusion.Arguments {
                  daNtnDataFlow    = ntnDataFlow,
                  daNtnPeerSharing = peerSharing,
                  daUpdateVersionData = \NodeToNodeVersionData
versionData DiffusionMode
diffusionMode -> NodeToNodeVersionData
versionData { diffusionMode },
                  daNtnHandshakeArguments,
                  daNtcHandshakeArguments,
                  daLedgerPeersCtx                    = ledgerPeersAPI,
                  daEmptyExtraState                   =
                    Cardano.PeerSelectionState.empty
                      consensusMode
                      minNumOfBigLedgerPeers,
                  daEmptyExtraCounters                = Cardano.Types.empty,
                  daExtraPeersAPI                     = Cardano.cardanoPublicRootPeersAPI,
                  daInstallSigUSR1Handler             =
                    Cardano.sigUSR1Handler
                      tracers
                      (Diffusion.dcReadUseLedgerPeers config)
                      (Diffusion.dcPeerSharing config)
                      readUseBootstrapPeers
                      (Cardano.getLedgerStateJudgement (lpExtraAPI ledgerPeersAPI))
                      churnMetrics
                      getPromotedHotTime,
                  daPeerSelectionGovernorArgs         =
                    Cardano.Types.cardanoPeerSelectionGovernorArgs
                      Cardano.ExtraPeerSelectionActions {
                        Cardano.genesisPeerSelectionTargets = genesisPeerSelectionTargets,
                        Cardano.readUseBootstrapPeers       = readUseBootstrapPeers
                      },
                  daPeerSelectionStateToExtraCounters = Cardano.Types.cardanoPeerSelectionStatetoCounters,
                  daToExtraPeers                      = flip Cardano.ExtraPeers Set.empty,
                  daRequestPublicRootPeers            =
                      Just $ Cardano.requestPublicRootPeers
                               (Diffusion.dtTracePublicRootPeersTracer tracers)
                               readUseBootstrapPeers
                               (Cardano.getLedgerStateJudgement (lpExtraAPI ledgerPeersAPI))
                               (Diffusion.dcReadPublicRootPeers config),
                  daPeerChurnGovernor                 = Cardano.Churn.peerChurnGovernor,
                  daExtraChurnArgs                    =
                    Cardano.Churn.ExtraArguments {
                      Cardano.Churn.modeVar            = churnModeVar,
                      Cardano.Churn.genesisPeerSelectionTargets
                                                       = genesisPeerSelectionTargets,
                      Cardano.Churn.readUseBootstrap   = readUseBootstrapPeers,
                      Cardano.Churn.consensusMode      = consensusMode,
                      Cardano.Churn.tracerChurnMode    = tracerChurnMode
                    },
                  daSRVPrefix                         = Cardano.srvPrefix
                }
               config apps