{-# LANGUAGE CPP                 #-}
{-# LANGUAGE DataKinds           #-}
{-# LANGUAGE GADTs               #-}
{-# LANGUAGE NamedFieldPuns      #-}
{-# LANGUAGE RankNTypes          #-}
{-# LANGUAGE ScopedTypeVariables #-}

-- | This module is expected to be imported qualified (it will clash
-- with the "Ouroboros.Network.Diffusion.P2P").
--
module Ouroboros.Network.Diffusion.NonP2P
  ( TracersExtra (..)
  , nullTracers
  , ApplicationsExtra (..)
  , ArgumentsExtra (..)
  , run
  ) where

import Control.Concurrent.Async qualified as Async
import Control.Exception
import Control.Tracer (Tracer, contramap, nullTracer, traceWith)
import Data.Foldable (asum)
import Data.Functor (void)
import Data.Maybe (maybeToList)
import Data.Proxy (Proxy (..))
import Data.Void (Void)
import System.Exit (ExitCode)

import Network.Socket (SockAddr, Socket)
import Network.Socket qualified as Socket

import Ouroboros.Network.Snocket (LocalAddress, LocalSnocket, LocalSocket (..),
           SocketSnocket, localSocketFileDescriptor)
import Ouroboros.Network.Snocket qualified as Snocket
import Ouroboros.Network.Socket (NetworkMutableState, NetworkServerTracers (..),
           cleanNetworkMutableState, configureSocket, configureSystemdSocket,
           newNetworkMutableState)

import Ouroboros.Network.Context (ExpandedInitiatorContext (..),
           IsBigLedgerPeer (..), MinimalInitiatorContext (..))
import Ouroboros.Network.ControlMessage (continueForever)
import Ouroboros.Network.Diffusion.Common hiding (nullTracers)
import Ouroboros.Network.ErrorPolicy
import Ouroboros.Network.IOManager
import Ouroboros.Network.Mux
import Ouroboros.Network.NodeToClient (NodeToClientVersion,
           NodeToClientVersionData)
import Ouroboros.Network.NodeToClient qualified as NodeToClient
import Ouroboros.Network.NodeToNode (AcceptConnectionsPolicyTrace (..),
           DiffusionMode (..), NodeToNodeVersion, NodeToNodeVersionData,
           RemoteAddress)
import Ouroboros.Network.NodeToNode qualified as NodeToNode
import Ouroboros.Network.Subscription.Dns
import Ouroboros.Network.Subscription.Ip
import Ouroboros.Network.Subscription.Worker (LocalAddresses (..))
import Ouroboros.Network.Tracers

-- | NonP2P DiffusionTracers Extras
--
data TracersExtra = TracersExtra {
      -- | IP subscription tracer
      --
      TracersExtra -> Tracer IO (WithIPList (SubscriptionTrace SockAddr))
dtIpSubscriptionTracer
        :: Tracer IO (WithIPList (SubscriptionTrace SockAddr))

      -- | DNS subscription tracer
      --
    , TracersExtra
-> Tracer IO (WithDomainName (SubscriptionTrace SockAddr))
dtDnsSubscriptionTracer
        :: Tracer IO (WithDomainName (SubscriptionTrace SockAddr))

      -- | DNS resolver tracer
      --
    , TracersExtra -> Tracer IO (WithDomainName DnsTrace)
dtDnsResolverTracer
        :: Tracer IO (WithDomainName DnsTrace)

    , TracersExtra -> Tracer IO (WithAddr SockAddr ErrorPolicyTrace)
dtErrorPolicyTracer
        :: Tracer IO (WithAddr SockAddr     ErrorPolicyTrace)

    , TracersExtra -> Tracer IO (WithAddr LocalAddress ErrorPolicyTrace)
dtLocalErrorPolicyTracer
        :: Tracer IO (WithAddr LocalAddress ErrorPolicyTrace)

      -- | Trace rate limiting of accepted connections
      --
    , TracersExtra -> Tracer IO AcceptConnectionsPolicyTrace
dtAcceptPolicyTracer
        :: Tracer IO AcceptConnectionsPolicyTrace
    }

nullTracers :: TracersExtra
nullTracers :: TracersExtra
nullTracers = TracersExtra
nonP2PNullTracers
  where
    nonP2PNullTracers :: TracersExtra
nonP2PNullTracers =
      TracersExtra {
        dtIpSubscriptionTracer :: Tracer IO (WithIPList (SubscriptionTrace SockAddr))
dtIpSubscriptionTracer   = Tracer IO (WithIPList (SubscriptionTrace SockAddr))
forall (m :: * -> *) a. Applicative m => Tracer m a
nullTracer
      , dtDnsSubscriptionTracer :: Tracer IO (WithDomainName (SubscriptionTrace SockAddr))
dtDnsSubscriptionTracer  = Tracer IO (WithDomainName (SubscriptionTrace SockAddr))
forall (m :: * -> *) a. Applicative m => Tracer m a
nullTracer
      , dtDnsResolverTracer :: Tracer IO (WithDomainName DnsTrace)
dtDnsResolverTracer      = Tracer IO (WithDomainName DnsTrace)
forall (m :: * -> *) a. Applicative m => Tracer m a
nullTracer
      , dtErrorPolicyTracer :: Tracer IO (WithAddr SockAddr ErrorPolicyTrace)
dtErrorPolicyTracer      = Tracer IO (WithAddr SockAddr ErrorPolicyTrace)
forall (m :: * -> *) a. Applicative m => Tracer m a
nullTracer
      , dtLocalErrorPolicyTracer :: Tracer IO (WithAddr LocalAddress ErrorPolicyTrace)
dtLocalErrorPolicyTracer = Tracer IO (WithAddr LocalAddress ErrorPolicyTrace)
forall (m :: * -> *) a. Applicative m => Tracer m a
nullTracer
      , dtAcceptPolicyTracer :: Tracer IO AcceptConnectionsPolicyTrace
dtAcceptPolicyTracer     = Tracer IO AcceptConnectionsPolicyTrace
forall (m :: * -> *) a. Applicative m => Tracer m a
nullTracer
      }

-- | NonP2P extra arguments
--
data ArgumentsExtra = ArgumentsExtra {
      -- | ip subscription addresses
      --
      ArgumentsExtra -> IPSubscriptionTarget
daIpProducers  :: IPSubscriptionTarget

      -- | list of domain names to subscribe to
      --
    , ArgumentsExtra -> [DnsSubscriptionTarget]
daDnsProducers :: [DnsSubscriptionTarget]
    }

-- | NonP2P extra applications
--
newtype ApplicationsExtra = ApplicationsExtra {
      -- | Error policies
      --
      ApplicationsExtra -> ErrorPolicies
daErrorPolicies :: ErrorPolicies
    }

-- | Converts between OuroborosBundle and OuroborosApplication.
-- Converts from InitiatorResponderMode to ResponderMode.
--
-- Useful for sharing the same Applications modes.
--
mkResponderApp
    :: OuroborosBundleWithExpandedCtx     InitiatorResponderMode addr bs m a    b
    -> OuroborosApplicationWithMinimalCtx ResponderMode          addr bs m Void b
mkResponderApp :: forall addr bs (m :: * -> *) a b.
OuroborosBundleWithExpandedCtx
  'InitiatorResponderMode addr bs m a b
-> OuroborosApplicationWithMinimalCtx
     'ResponderMode addr bs m Void b
mkResponderApp OuroborosBundleWithExpandedCtx
  'InitiatorResponderMode addr bs m a b
bundle =
    [MiniProtocol
   'ResponderMode
   (MinimalInitiatorContext addr)
   (ResponderContext addr)
   bs
   m
   Void
   b]
-> OuroborosApplication
     'ResponderMode
     (MinimalInitiatorContext addr)
     (ResponderContext addr)
     bs
     m
     Void
     b
forall (mode :: MuxMode) initiatorCtx responderCtx bytes
       (m :: * -> *) a b.
[MiniProtocol mode initiatorCtx responderCtx bytes m a b]
-> OuroborosApplication mode initiatorCtx responderCtx bytes m a b
OuroborosApplication ([MiniProtocol
    'ResponderMode
    (MinimalInitiatorContext addr)
    (ResponderContext addr)
    bs
    m
    Void
    b]
 -> OuroborosApplication
      'ResponderMode
      (MinimalInitiatorContext addr)
      (ResponderContext addr)
      bs
      m
      Void
      b)
-> [MiniProtocol
      'ResponderMode
      (MinimalInitiatorContext addr)
      (ResponderContext addr)
      bs
      m
      Void
      b]
-> OuroborosApplication
     'ResponderMode
     (MinimalInitiatorContext addr)
     (ResponderContext addr)
     bs
     m
     Void
     b
forall a b. (a -> b) -> a -> b
$
      ([MiniProtocolWithExpandedCtx
    'InitiatorResponderMode addr bs m a b]
 -> [MiniProtocol
       'ResponderMode
       (MinimalInitiatorContext addr)
       (ResponderContext addr)
       bs
       m
       Void
       b])
-> OuroborosBundleWithExpandedCtx
     'InitiatorResponderMode addr bs m a b
-> [MiniProtocol
      'ResponderMode
      (MinimalInitiatorContext addr)
      (ResponderContext addr)
      bs
      m
      Void
      b]
forall m a. Monoid m => (a -> m) -> TemperatureBundle a -> m
forall (t :: * -> *) m a.
(Foldable t, Monoid m) =>
(a -> m) -> t a -> m
foldMap ((MiniProtocolWithExpandedCtx 'InitiatorResponderMode addr bs m a b
 -> MiniProtocol
      'ResponderMode
      (MinimalInitiatorContext addr)
      (ResponderContext addr)
      bs
      m
      Void
      b)
-> [MiniProtocolWithExpandedCtx
      'InitiatorResponderMode addr bs m a b]
-> [MiniProtocol
      'ResponderMode
      (MinimalInitiatorContext addr)
      (ResponderContext addr)
      bs
      m
      Void
      b]
forall a b. (a -> b) -> [a] -> [b]
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
fmap MiniProtocolWithExpandedCtx 'InitiatorResponderMode addr bs m a b
-> MiniProtocol
     'ResponderMode
     (MinimalInitiatorContext addr)
     (ResponderContext addr)
     bs
     m
     Void
     b
forall bs addr (m :: * -> *) a b.
MiniProtocolWithExpandedCtx 'InitiatorResponderMode bs addr m a b
-> MiniProtocolWithMinimalCtx 'ResponderMode bs addr m Void b
f) OuroborosBundleWithExpandedCtx
  'InitiatorResponderMode addr bs m a b
bundle
  where
    f :: MiniProtocolWithExpandedCtx InitiatorResponderMode bs addr m a    b
      -> MiniProtocolWithMinimalCtx  ResponderMode          bs addr m Void b
    f :: forall bs addr (m :: * -> *) a b.
MiniProtocolWithExpandedCtx 'InitiatorResponderMode bs addr m a b
-> MiniProtocolWithMinimalCtx 'ResponderMode bs addr m Void b
f MiniProtocol { MiniProtocolNum
miniProtocolNum :: MiniProtocolNum
miniProtocolNum :: forall (mode :: MuxMode) initiatorCtx responderCtx bytes
       (m :: * -> *) a b.
MiniProtocol mode initiatorCtx responderCtx bytes m a b
-> MiniProtocolNum
miniProtocolNum
                   , MiniProtocolLimits
miniProtocolLimits :: MiniProtocolLimits
miniProtocolLimits :: forall (mode :: MuxMode) initiatorCtx responderCtx bytes
       (m :: * -> *) a b.
MiniProtocol mode initiatorCtx responderCtx bytes m a b
-> MiniProtocolLimits
miniProtocolLimits
                   , miniProtocolRun :: forall (mode :: MuxMode) initiatorCtx responderCtx bytes
       (m :: * -> *) a b.
MiniProtocol mode initiatorCtx responderCtx bytes m a b
-> RunMiniProtocol mode initiatorCtx responderCtx bytes m a b
miniProtocolRun = InitiatorAndResponderProtocol MiniProtocolCb (ExpandedInitiatorContext bs m) addr m a
_initiator
                                                                      MiniProtocolCb (ResponderContext bs) addr m b
responder
                   } =
      MiniProtocol { MiniProtocolNum
miniProtocolNum :: MiniProtocolNum
miniProtocolNum :: MiniProtocolNum
miniProtocolNum
                   , MiniProtocolLimits
miniProtocolLimits :: MiniProtocolLimits
miniProtocolLimits :: MiniProtocolLimits
miniProtocolLimits
                   , miniProtocolRun :: RunMiniProtocol
  'ResponderMode
  (MinimalInitiatorContext bs)
  (ResponderContext bs)
  addr
  m
  Void
  b
miniProtocolRun = MiniProtocolCb (ResponderContext bs) addr m b
-> RunMiniProtocol
     'ResponderMode
     (MinimalInitiatorContext bs)
     (ResponderContext bs)
     addr
     m
     Void
     b
forall responderCtx bytes (m :: * -> *) b initiatorCtx.
MiniProtocolCb responderCtx bytes m b
-> RunMiniProtocol
     'ResponderMode initiatorCtx responderCtx bytes m Void b
ResponderProtocolOnly MiniProtocolCb (ResponderContext bs) addr m b
responder
                   }

run
    :: Tracers
         RemoteAddress NodeToNodeVersion
         LocalAddress  NodeToClientVersion
         IO
    -> TracersExtra
    -> Arguments
         IO
         Socket      RemoteAddress
         LocalSocket LocalAddress
    -> ArgumentsExtra
    -> Applications
         RemoteAddress NodeToNodeVersion   NodeToNodeVersionData
         LocalAddress  NodeToClientVersion NodeToClientVersionData
         IO a
    -> ApplicationsExtra
    -> IO ()
run :: forall a.
Tracers
  SockAddr NodeToNodeVersion LocalAddress NodeToClientVersion IO
-> TracersExtra
-> Arguments IO Socket SockAddr LocalSocket LocalAddress
-> ArgumentsExtra
-> Applications
     SockAddr
     NodeToNodeVersion
     NodeToNodeVersionData
     LocalAddress
     NodeToClientVersion
     NodeToClientVersionData
     IO
     a
-> ApplicationsExtra
-> IO ()
run Tracers
      { Tracer IO (WithMuxBearer (ConnectionId SockAddr) MuxTrace)
dtMuxTracer :: Tracer IO (WithMuxBearer (ConnectionId SockAddr) MuxTrace)
dtMuxTracer :: forall ntnAddr ntnVersion ntcAddr ntcVersion (m :: * -> *).
Tracers ntnAddr ntnVersion ntcAddr ntcVersion m
-> Tracer m (WithMuxBearer (ConnectionId ntnAddr) MuxTrace)
dtMuxTracer
      , Tracer IO (WithMuxBearer (ConnectionId LocalAddress) MuxTrace)
dtLocalMuxTracer :: Tracer IO (WithMuxBearer (ConnectionId LocalAddress) MuxTrace)
dtLocalMuxTracer :: forall ntnAddr ntnVersion ntcAddr ntcVersion (m :: * -> *).
Tracers ntnAddr ntnVersion ntcAddr ntcVersion m
-> Tracer m (WithMuxBearer (ConnectionId ntcAddr) MuxTrace)
dtLocalMuxTracer
      , Tracer IO (HandshakeTr SockAddr NodeToNodeVersion)
dtHandshakeTracer :: Tracer IO (HandshakeTr SockAddr NodeToNodeVersion)
dtHandshakeTracer :: forall ntnAddr ntnVersion ntcAddr ntcVersion (m :: * -> *).
Tracers ntnAddr ntnVersion ntcAddr ntcVersion m
-> Tracer m (HandshakeTr ntnAddr ntnVersion)
dtHandshakeTracer
      , Tracer IO (HandshakeTr LocalAddress NodeToClientVersion)
dtLocalHandshakeTracer :: Tracer IO (HandshakeTr LocalAddress NodeToClientVersion)
dtLocalHandshakeTracer :: forall ntnAddr ntnVersion ntcAddr ntcVersion (m :: * -> *).
Tracers ntnAddr ntnVersion ntcAddr ntcVersion m
-> Tracer m (HandshakeTr ntcAddr ntcVersion)
dtLocalHandshakeTracer
      , Tracer IO (DiffusionTracer SockAddr LocalAddress)
dtDiffusionTracer :: Tracer IO (DiffusionTracer SockAddr LocalAddress)
dtDiffusionTracer :: forall ntnAddr ntnVersion ntcAddr ntcVersion (m :: * -> *).
Tracers ntnAddr ntnVersion ntcAddr ntcVersion m
-> Tracer m (DiffusionTracer ntnAddr ntcAddr)
dtDiffusionTracer
      }
    TracersExtra
      { Tracer IO (WithIPList (SubscriptionTrace SockAddr))
dtIpSubscriptionTracer :: TracersExtra -> Tracer IO (WithIPList (SubscriptionTrace SockAddr))
dtIpSubscriptionTracer :: Tracer IO (WithIPList (SubscriptionTrace SockAddr))
dtIpSubscriptionTracer
      , Tracer IO (WithDomainName (SubscriptionTrace SockAddr))
dtDnsSubscriptionTracer :: TracersExtra
-> Tracer IO (WithDomainName (SubscriptionTrace SockAddr))
dtDnsSubscriptionTracer :: Tracer IO (WithDomainName (SubscriptionTrace SockAddr))
dtDnsSubscriptionTracer
      , Tracer IO (WithDomainName DnsTrace)
dtDnsResolverTracer :: TracersExtra -> Tracer IO (WithDomainName DnsTrace)
dtDnsResolverTracer :: Tracer IO (WithDomainName DnsTrace)
dtDnsResolverTracer
      , Tracer IO (WithAddr SockAddr ErrorPolicyTrace)
dtErrorPolicyTracer :: TracersExtra -> Tracer IO (WithAddr SockAddr ErrorPolicyTrace)
dtErrorPolicyTracer :: Tracer IO (WithAddr SockAddr ErrorPolicyTrace)
dtErrorPolicyTracer
      , Tracer IO (WithAddr LocalAddress ErrorPolicyTrace)
dtLocalErrorPolicyTracer :: TracersExtra -> Tracer IO (WithAddr LocalAddress ErrorPolicyTrace)
dtLocalErrorPolicyTracer :: Tracer IO (WithAddr LocalAddress ErrorPolicyTrace)
dtLocalErrorPolicyTracer
      , Tracer IO AcceptConnectionsPolicyTrace
dtAcceptPolicyTracer :: TracersExtra -> Tracer IO AcceptConnectionsPolicyTrace
dtAcceptPolicyTracer :: Tracer IO AcceptConnectionsPolicyTrace
dtAcceptPolicyTracer
      }
    Arguments
      { Maybe (Either Socket SockAddr)
daIPv4Address :: Maybe (Either Socket SockAddr)
daIPv4Address :: forall (m :: * -> *) ntnFd ntnAddr ntcFd ntcAddr.
Arguments m ntnFd ntnAddr ntcFd ntcAddr
-> Maybe (Either ntnFd ntnAddr)
daIPv4Address
      , Maybe (Either Socket SockAddr)
daIPv6Address :: Maybe (Either Socket SockAddr)
daIPv6Address :: forall (m :: * -> *) ntnFd ntnAddr ntcFd ntcAddr.
Arguments m ntnFd ntnAddr ntcFd ntcAddr
-> Maybe (Either ntnFd ntnAddr)
daIPv6Address
      , Maybe (Either LocalSocket LocalAddress)
daLocalAddress :: Maybe (Either LocalSocket LocalAddress)
daLocalAddress :: forall (m :: * -> *) ntnFd ntnAddr ntcFd ntcAddr.
Arguments m ntnFd ntnAddr ntcFd ntcAddr
-> Maybe (Either ntcFd ntcAddr)
daLocalAddress
      , AcceptedConnectionsLimit
daAcceptedConnectionsLimit :: AcceptedConnectionsLimit
daAcceptedConnectionsLimit :: forall (m :: * -> *) ntnFd ntnAddr ntcFd ntcAddr.
Arguments m ntnFd ntnAddr ntcFd ntcAddr -> AcceptedConnectionsLimit
daAcceptedConnectionsLimit
      , daMode :: forall (m :: * -> *) ntnFd ntnAddr ntcFd ntcAddr.
Arguments m ntnFd ntnAddr ntcFd ntcAddr -> DiffusionMode
daMode = DiffusionMode
diffusionMode
      }
     ArgumentsExtra
       { IPSubscriptionTarget
daIpProducers :: ArgumentsExtra -> IPSubscriptionTarget
daIpProducers :: IPSubscriptionTarget
daIpProducers
       , [DnsSubscriptionTarget]
daDnsProducers :: ArgumentsExtra -> [DnsSubscriptionTarget]
daDnsProducers :: [DnsSubscriptionTarget]
daDnsProducers
       }
    Applications
  SockAddr
  NodeToNodeVersion
  NodeToNodeVersionData
  LocalAddress
  NodeToClientVersion
  NodeToClientVersionData
  IO
  a
applications
    ApplicationsExtra
      { ErrorPolicies
daErrorPolicies :: ApplicationsExtra -> ErrorPolicies
daErrorPolicies :: ErrorPolicies
daErrorPolicies } =
  IO () -> IO ()
forall a. IO a -> IO a
traceException (IO () -> IO ())
-> ((IOManager -> IO ()) -> IO ()) -> (IOManager -> IO ()) -> IO ()
forall b c a. (b -> c) -> (a -> b) -> a -> c
. (IOManager -> IO ()) -> IO ()
WithIOManager
withIOManager ((IOManager -> IO ()) -> IO ()) -> (IOManager -> IO ()) -> IO ()
forall a b. (a -> b) -> a -> b
$ \IOManager
iocp -> do
    let -- snocket for remote communication.
        snocket :: SocketSnocket
        snocket :: SocketSnocket
snocket = IOManager -> SocketSnocket
Snocket.socketSnocket IOManager
iocp
        localSnocket :: LocalSnocket
        localSnocket :: LocalSnocket
localSnocket = IOManager -> LocalSnocket
Snocket.localSnocket IOManager
iocp
        addresses :: [Either Socket SockAddr]
addresses = Maybe (Either Socket SockAddr) -> [Either Socket SockAddr]
forall a. Maybe a -> [a]
maybeToList Maybe (Either Socket SockAddr)
daIPv4Address
                 [Either Socket SockAddr]
-> [Either Socket SockAddr] -> [Either Socket SockAddr]
forall a. [a] -> [a] -> [a]
++ Maybe (Either Socket SockAddr) -> [Either Socket SockAddr]
forall a. Maybe a -> [a]
maybeToList Maybe (Either Socket SockAddr)
daIPv6Address

    -- networking mutable state
    networkState <- IO (NetworkMutableState SockAddr)
forall addr. IO (NetworkMutableState addr)
newNetworkMutableState
    networkLocalState <- newNetworkMutableState

    lias <- getInitiatorLocalAddresses snocket

    let
        dnsSubActions = SocketSnocket
-> NetworkMutableState SockAddr
-> LocalAddresses SockAddr
-> DnsSubscriptionTarget
-> IO ()
runDnsSubscriptionWorker SocketSnocket
snocket NetworkMutableState SockAddr
networkState LocalAddresses SockAddr
lias
          (DnsSubscriptionTarget -> IO ())
-> [DnsSubscriptionTarget] -> [IO ()]
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> [DnsSubscriptionTarget]
daDnsProducers

        serverActions = case DiffusionMode
diffusionMode of
          DiffusionMode
InitiatorAndResponderDiffusionMode ->
            SocketSnocket
-> NetworkMutableState SockAddr -> Either Socket SockAddr -> IO ()
runServer SocketSnocket
snocket NetworkMutableState SockAddr
networkState (Either Socket SockAddr -> IO ())
-> [Either Socket SockAddr] -> [IO ()]
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> [Either Socket SockAddr]
addresses
          DiffusionMode
InitiatorOnlyDiffusionMode -> []

        localServerAction = LocalSnocket
-> NetworkMutableState LocalAddress
-> Either LocalSocket LocalAddress
-> IO ()
runLocalServer LocalSnocket
localSnocket NetworkMutableState LocalAddress
networkLocalState
          (Either LocalSocket LocalAddress -> IO ())
-> [Either LocalSocket LocalAddress] -> [IO ()]
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> Maybe (Either LocalSocket LocalAddress)
-> [Either LocalSocket LocalAddress]
forall a. Maybe a -> [a]
maybeToList Maybe (Either LocalSocket LocalAddress)
daLocalAddress

        actions =
          [ -- clean state thread
            NetworkMutableState SockAddr -> IO ()
forall addr. NetworkMutableState addr -> IO ()
cleanNetworkMutableState NetworkMutableState SockAddr
networkState
          , -- clean local state thread
            NetworkMutableState LocalAddress -> IO ()
forall addr. NetworkMutableState addr -> IO ()
cleanNetworkMutableState NetworkMutableState LocalAddress
networkLocalState
          , -- fork ip subscription
            SocketSnocket
-> NetworkMutableState SockAddr -> LocalAddresses SockAddr -> IO ()
runIpSubscriptionWorker SocketSnocket
snocket NetworkMutableState SockAddr
networkState LocalAddresses SockAddr
lias
          ]
          -- fork dns subscriptions
          [IO ()] -> [IO ()] -> [IO ()]
forall a. [a] -> [a] -> [a]
++ [IO ()]
dnsSubActions
          -- fork servers for remote peers
          [IO ()] -> [IO ()] -> [IO ()]
forall a. [a] -> [a] -> [a]
++ [IO ()]
serverActions
          -- fork server for local clients
          [IO ()] -> [IO ()] -> [IO ()]
forall a. [a] -> [a] -> [a]
++ [IO ()]
localServerAction

    -- Runs all threads in parallel, using Async.Concurrently's Alternative instance
    Async.runConcurrently $ asum $ Async.Concurrently <$> actions

  where
    traceException :: IO a -> IO a
    traceException :: forall a. IO a -> IO a
traceException IO a
f = (SomeException -> Maybe SomeException)
-> IO a -> (SomeException -> IO a) -> IO a
forall e b a.
Exception e =>
(e -> Maybe b) -> IO a -> (b -> IO a) -> IO a
catchJust
        (\SomeException
e -> case SomeException -> Maybe ExitCode
forall e. Exception e => SomeException -> Maybe e
fromException SomeException
e :: Maybe ExitCode of
            Maybe ExitCode
Nothing -> SomeException -> Maybe SomeException
forall a. a -> Maybe a
Just SomeException
e
            Just {} -> Maybe SomeException
forall a. Maybe a
Nothing)
        IO a
f ((SomeException -> IO a) -> IO a)
-> (SomeException -> IO a) -> IO a
forall a b. (a -> b) -> a -> b
$ \(SomeException
e :: SomeException) -> do
            Tracer IO (DiffusionTracer SockAddr LocalAddress)
-> DiffusionTracer SockAddr LocalAddress -> IO ()
forall (m :: * -> *) a. Tracer m a -> a -> m ()
traceWith Tracer IO (DiffusionTracer SockAddr LocalAddress)
dtDiffusionTracer (SomeException -> DiffusionTracer SockAddr LocalAddress
forall ntnAddr ntcAddr.
SomeException -> DiffusionTracer ntnAddr ntcAddr
DiffusionErrored SomeException
e)
            Failure -> IO a
forall e a. (HasCallStack, Exception e) => e -> IO a
throwIO (SomeException -> Failure
DiffusionError SomeException
e)

    --
    -- We can't share portnumber with our server since we run separate
    -- 'MuxInitiatorApplication' and 'MuxResponderApplication'
    -- applications instead of a 'MuxInitiatorAndResponderApplication'.
    -- This means we don't utilise full duplex connection.
    getInitiatorLocalAddresses :: SocketSnocket -> IO (LocalAddresses SockAddr)
    getInitiatorLocalAddresses :: SocketSnocket -> IO (LocalAddresses SockAddr)
getInitiatorLocalAddresses SocketSnocket
sn = do
        localIpv4 <-
          case Maybe (Either Socket SockAddr)
daIPv4Address of
              Just (Right SockAddr
ipv4) -> do
                LocalAddresses SockAddr -> IO (LocalAddresses SockAddr)
forall a. a -> IO a
forall (m :: * -> *) a. Monad m => a -> m a
return LocalAddresses
                  { laIpv4 :: Maybe SockAddr
laIpv4 = SockAddr -> Maybe SockAddr
anyIPv4Addr SockAddr
ipv4
                  , laIpv6 :: Maybe SockAddr
laIpv6 = Maybe SockAddr
forall a. Maybe a
Nothing
                  , laUnix :: Maybe SockAddr
laUnix = Maybe SockAddr
forall a. Maybe a
Nothing
                  }

              Just (Left Socket
ipv4Sock) -> do
                 ipv4Addrs <- SocketSnocket -> Socket -> IO SockAddr
forall (m :: * -> *) fd addr. Snocket m fd addr -> fd -> m addr
Snocket.getLocalAddr SocketSnocket
sn Socket
ipv4Sock
                 return LocalAddresses
                   { laIpv4 = anyIPv4Addr ipv4Addrs
                   , laIpv6 = Nothing
                   , laUnix = Nothing
                   }

              Maybe (Either Socket SockAddr)
Nothing -> do
                 LocalAddresses SockAddr -> IO (LocalAddresses SockAddr)
forall a. a -> IO a
forall (m :: * -> *) a. Monad m => a -> m a
return LocalAddresses
                   { laIpv4 :: Maybe SockAddr
laIpv4 = Maybe SockAddr
forall a. Maybe a
Nothing
                   , laIpv6 :: Maybe SockAddr
laIpv6 = Maybe SockAddr
forall a. Maybe a
Nothing
                   , laUnix :: Maybe SockAddr
laUnix = Maybe SockAddr
forall a. Maybe a
Nothing
                   }

        localIpv6 <-
          case daIPv6Address of
            Just (Right SockAddr
ipv6) -> do
              LocalAddresses SockAddr -> IO (LocalAddresses SockAddr)
forall a. a -> IO a
forall (m :: * -> *) a. Monad m => a -> m a
return LocalAddresses
                { laIpv4 :: Maybe SockAddr
laIpv4 = Maybe SockAddr
forall a. Maybe a
Nothing
                , laIpv6 :: Maybe SockAddr
laIpv6 = SockAddr -> Maybe SockAddr
anyIPv6Addr SockAddr
ipv6
                , laUnix :: Maybe SockAddr
laUnix = Maybe SockAddr
forall a. Maybe a
Nothing
                }

            Just (Left Socket
ipv6Sock) -> do
              ipv6Addrs <- SocketSnocket -> Socket -> IO SockAddr
forall (m :: * -> *) fd addr. Snocket m fd addr -> fd -> m addr
Snocket.getLocalAddr SocketSnocket
sn Socket
ipv6Sock
              return LocalAddresses
                { laIpv4 = Nothing
                , laIpv6 = anyIPv6Addr ipv6Addrs
                , laUnix = Nothing
                }

            Maybe (Either Socket SockAddr)
Nothing -> do
              LocalAddresses SockAddr -> IO (LocalAddresses SockAddr)
forall a. a -> IO a
forall (m :: * -> *) a. Monad m => a -> m a
return LocalAddresses
                { laIpv4 :: Maybe SockAddr
laIpv4 = Maybe SockAddr
forall a. Maybe a
Nothing
                , laIpv6 :: Maybe SockAddr
laIpv6 = Maybe SockAddr
forall a. Maybe a
Nothing
                , laUnix :: Maybe SockAddr
laUnix = Maybe SockAddr
forall a. Maybe a
Nothing
                  }

        return (localIpv4 <> localIpv6)
      where
        -- Return an IPv4 address with an ephemeral port number if we use IPv4
        anyIPv4Addr :: SockAddr -> Maybe SockAddr
        anyIPv4Addr :: SockAddr -> Maybe SockAddr
anyIPv4Addr Socket.SockAddrInet {} = SockAddr -> Maybe SockAddr
forall a. a -> Maybe a
Just (PortNumber -> FlowInfo -> SockAddr
Socket.SockAddrInet PortNumber
0 FlowInfo
0)
        anyIPv4Addr SockAddr
_                      = Maybe SockAddr
forall a. Maybe a
Nothing

        -- Return an IPv6 address with an ephemeral port number if we use IPv6
        anyIPv6Addr :: SockAddr -> Maybe SockAddr
        anyIPv6Addr :: SockAddr -> Maybe SockAddr
anyIPv6Addr Socket.SockAddrInet6 {} =
          SockAddr -> Maybe SockAddr
forall a. a -> Maybe a
Just (PortNumber -> FlowInfo -> HostAddress6 -> FlowInfo -> SockAddr
Socket.SockAddrInet6 PortNumber
0 FlowInfo
0 (FlowInfo
0, FlowInfo
0, FlowInfo
0, FlowInfo
0) FlowInfo
0)
        anyIPv6Addr SockAddr
_ = Maybe SockAddr
forall a. Maybe a
Nothing

    remoteErrorPolicy, localErrorPolicy :: ErrorPolicies
    remoteErrorPolicy :: ErrorPolicies
remoteErrorPolicy = ErrorPolicies
NodeToNode.remoteNetworkErrorPolicy ErrorPolicies -> ErrorPolicies -> ErrorPolicies
forall a. Semigroup a => a -> a -> a
<> ErrorPolicies
daErrorPolicies
    localErrorPolicy :: ErrorPolicies
localErrorPolicy  = ErrorPolicies
NodeToNode.localNetworkErrorPolicy ErrorPolicies -> ErrorPolicies -> ErrorPolicies
forall a. Semigroup a => a -> a -> a
<> ErrorPolicies
daErrorPolicies

    runLocalServer :: LocalSnocket
                   -> NetworkMutableState LocalAddress
                   -> Either LocalSocket  LocalAddress
                   -> IO ()
    runLocalServer :: LocalSnocket
-> NetworkMutableState LocalAddress
-> Either LocalSocket LocalAddress
-> IO ()
runLocalServer LocalSnocket
sn NetworkMutableState LocalAddress
networkLocalState Either LocalSocket LocalAddress
localAddress =
      IO LocalSocket
-> (LocalSocket -> IO ()) -> (LocalSocket -> IO ()) -> IO ()
forall a b c. IO a -> (a -> IO b) -> (a -> IO c) -> IO c
bracket
        IO LocalSocket
localServerInit
        LocalSocket -> IO ()
localServerCleanup
        LocalSocket -> IO ()
localServerBody
      where
        localServerInit :: IO LocalSocket
        localServerInit :: IO LocalSocket
localServerInit =
          case Either LocalSocket LocalAddress
localAddress of
#if defined(mingw32_HOST_OS)
            -- Windows uses named pipes so can't take advantage of existing sockets
            Left _ -> do
              traceWith dtDiffusionTracer UnsupportedReadySocketCase
              throwIO UnsupportedReadySocket
#else
            Left LocalSocket
sd -> do
              addr <- LocalSnocket -> LocalSocket -> IO LocalAddress
forall (m :: * -> *) fd addr. Snocket m fd addr -> fd -> m addr
Snocket.getLocalAddr LocalSnocket
sn LocalSocket
sd
              traceWith dtDiffusionTracer
                $ UsingSystemdSocket addr
              return sd
#endif
            Right LocalAddress
addr -> do
              Tracer IO (DiffusionTracer SockAddr LocalAddress)
-> DiffusionTracer SockAddr LocalAddress -> IO ()
forall (m :: * -> *) a. Tracer m a -> a -> m ()
traceWith Tracer IO (DiffusionTracer SockAddr LocalAddress)
dtDiffusionTracer
                (DiffusionTracer SockAddr LocalAddress -> IO ())
-> DiffusionTracer SockAddr LocalAddress -> IO ()
forall a b. (a -> b) -> a -> b
$ LocalAddress -> DiffusionTracer SockAddr LocalAddress
forall ntnAddr ntcAddr. ntcAddr -> DiffusionTracer ntnAddr ntcAddr
CreateSystemdSocketForSnocketPath LocalAddress
addr
              sd <- LocalSnocket -> AddressFamily LocalAddress -> IO LocalSocket
forall (m :: * -> *) fd addr.
Snocket m fd addr -> AddressFamily addr -> m fd
Snocket.open
                    LocalSnocket
sn
                    (LocalSnocket -> LocalAddress -> AddressFamily LocalAddress
forall (m :: * -> *) fd addr.
Snocket m fd addr -> addr -> AddressFamily addr
Snocket.addrFamily LocalSnocket
sn LocalAddress
addr)
              traceWith dtDiffusionTracer
                $ CreatedLocalSocket addr
              return sd

        -- We close the socket here, even if it was provided for us.
        localServerCleanup :: LocalSocket -> IO ()
        localServerCleanup :: LocalSocket -> IO ()
localServerCleanup = LocalSnocket -> LocalSocket -> IO ()
forall (m :: * -> *) fd addr. Snocket m fd addr -> fd -> m ()
Snocket.close LocalSnocket
sn

        localServerBody :: LocalSocket -> IO ()
        localServerBody :: LocalSocket -> IO ()
localServerBody LocalSocket
sd = do
          case Either LocalSocket LocalAddress
localAddress of
               -- If a socket was provided it should be ready to accept
               Left LocalSocket
_ -> () -> IO ()
forall a. a -> IO a
forall (f :: * -> *) a. Applicative f => a -> f a
pure ()
               Right LocalAddress
addr -> do
                 Tracer IO (DiffusionTracer SockAddr LocalAddress)
-> DiffusionTracer SockAddr LocalAddress -> IO ()
forall (m :: * -> *) a. Tracer m a -> a -> m ()
traceWith Tracer IO (DiffusionTracer SockAddr LocalAddress)
dtDiffusionTracer
                  (DiffusionTracer SockAddr LocalAddress -> IO ())
-> (FileDescriptor -> DiffusionTracer SockAddr LocalAddress)
-> FileDescriptor
-> IO ()
forall b c a. (b -> c) -> (a -> b) -> a -> c
. LocalAddress
-> FileDescriptor -> DiffusionTracer SockAddr LocalAddress
forall ntnAddr ntcAddr.
ntcAddr -> FileDescriptor -> DiffusionTracer ntnAddr ntcAddr
ConfiguringLocalSocket LocalAddress
addr
                    (FileDescriptor -> IO ()) -> IO FileDescriptor -> IO ()
forall (m :: * -> *) a b. Monad m => (a -> m b) -> m a -> m b
=<< LocalSocket -> IO FileDescriptor
localSocketFileDescriptor LocalSocket
sd

                 LocalSnocket -> LocalSocket -> LocalAddress -> IO ()
forall (m :: * -> *) fd addr.
Snocket m fd addr -> fd -> addr -> m ()
Snocket.bind LocalSnocket
sn LocalSocket
sd LocalAddress
addr

                 Tracer IO (DiffusionTracer SockAddr LocalAddress)
-> DiffusionTracer SockAddr LocalAddress -> IO ()
forall (m :: * -> *) a. Tracer m a -> a -> m ()
traceWith Tracer IO (DiffusionTracer SockAddr LocalAddress)
dtDiffusionTracer
                  (DiffusionTracer SockAddr LocalAddress -> IO ())
-> (FileDescriptor -> DiffusionTracer SockAddr LocalAddress)
-> FileDescriptor
-> IO ()
forall b c a. (b -> c) -> (a -> b) -> a -> c
. LocalAddress
-> FileDescriptor -> DiffusionTracer SockAddr LocalAddress
forall ntnAddr ntcAddr.
ntcAddr -> FileDescriptor -> DiffusionTracer ntnAddr ntcAddr
ListeningLocalSocket LocalAddress
addr
                    (FileDescriptor -> IO ()) -> IO FileDescriptor -> IO ()
forall (m :: * -> *) a b. Monad m => (a -> m b) -> m a -> m b
=<< LocalSocket -> IO FileDescriptor
localSocketFileDescriptor LocalSocket
sd

                 LocalSnocket -> LocalSocket -> IO ()
forall (m :: * -> *) fd addr. Snocket m fd addr -> fd -> m ()
Snocket.listen LocalSnocket
sn LocalSocket
sd

                 Tracer IO (DiffusionTracer SockAddr LocalAddress)
-> DiffusionTracer SockAddr LocalAddress -> IO ()
forall (m :: * -> *) a. Tracer m a -> a -> m ()
traceWith Tracer IO (DiffusionTracer SockAddr LocalAddress)
dtDiffusionTracer
                  (DiffusionTracer SockAddr LocalAddress -> IO ())
-> (FileDescriptor -> DiffusionTracer SockAddr LocalAddress)
-> FileDescriptor
-> IO ()
forall b c a. (b -> c) -> (a -> b) -> a -> c
. LocalAddress
-> FileDescriptor -> DiffusionTracer SockAddr LocalAddress
forall ntnAddr ntcAddr.
ntcAddr -> FileDescriptor -> DiffusionTracer ntnAddr ntcAddr
LocalSocketUp LocalAddress
addr
                    (FileDescriptor -> IO ()) -> IO FileDescriptor -> IO ()
forall (m :: * -> *) a b. Monad m => (a -> m b) -> m a -> m b
=<< LocalSocket -> IO FileDescriptor
localSocketFileDescriptor LocalSocket
sd

          Tracer IO (DiffusionTracer SockAddr LocalAddress)
-> DiffusionTracer SockAddr LocalAddress -> IO ()
forall (m :: * -> *) a. Tracer m a -> a -> m ()
traceWith Tracer IO (DiffusionTracer SockAddr LocalAddress)
dtDiffusionTracer
            (DiffusionTracer SockAddr LocalAddress -> IO ())
-> (LocalAddress -> DiffusionTracer SockAddr LocalAddress)
-> LocalAddress
-> IO ()
forall b c a. (b -> c) -> (a -> b) -> a -> c
. LocalAddress -> DiffusionTracer SockAddr LocalAddress
forall ntnAddr ntcAddr. ntcAddr -> DiffusionTracer ntnAddr ntcAddr
RunLocalServer (LocalAddress -> IO ()) -> IO LocalAddress -> IO ()
forall (m :: * -> *) a b. Monad m => (a -> m b) -> m a -> m b
=<< LocalSnocket -> LocalSocket -> IO LocalAddress
forall (m :: * -> *) fd addr. Snocket m fd addr -> fd -> m addr
Snocket.getLocalAddr LocalSnocket
sn LocalSocket
sd

          IO Void -> IO ()
forall (f :: * -> *) a. Functor f => f a -> f ()
void (IO Void -> IO ()) -> IO Void -> IO ()
forall a b. (a -> b) -> a -> b
$ LocalSnocket
-> NetworkServerTracers LocalAddress NodeToClientVersion
-> NetworkMutableState LocalAddress
-> LocalSocket
-> Versions
     NodeToClientVersion
     NodeToClientVersionData
     (OuroborosApplicationWithMinimalCtx
        'ResponderMode LocalAddress ByteString IO Void ())
-> ErrorPolicies
-> IO Void
forall a b.
LocalSnocket
-> NetworkServerTracers LocalAddress NodeToClientVersion
-> NetworkMutableState LocalAddress
-> LocalSocket
-> Versions
     NodeToClientVersion
     NodeToClientVersionData
     (OuroborosApplicationWithMinimalCtx
        'ResponderMode LocalAddress ByteString IO a b)
-> ErrorPolicies
-> IO Void
NodeToClient.withServer
            LocalSnocket
sn
            (Tracer IO (WithMuxBearer (ConnectionId LocalAddress) MuxTrace)
-> Tracer IO (HandshakeTr LocalAddress NodeToClientVersion)
-> Tracer IO (WithAddr LocalAddress ErrorPolicyTrace)
-> Tracer IO AcceptConnectionsPolicyTrace
-> NetworkServerTracers LocalAddress NodeToClientVersion
forall addr vNumber.
Tracer IO (WithMuxBearer (ConnectionId addr) MuxTrace)
-> Tracer
     IO
     (WithMuxBearer
        (ConnectionId addr) (TraceSendRecv (Handshake vNumber Term)))
-> Tracer IO (WithAddr addr ErrorPolicyTrace)
-> Tracer IO AcceptConnectionsPolicyTrace
-> NetworkServerTracers addr vNumber
NetworkServerTracers
              Tracer IO (WithMuxBearer (ConnectionId LocalAddress) MuxTrace)
dtLocalMuxTracer
              Tracer IO (HandshakeTr LocalAddress NodeToClientVersion)
dtLocalHandshakeTracer
              Tracer IO (WithAddr LocalAddress ErrorPolicyTrace)
dtLocalErrorPolicyTracer
              Tracer IO AcceptConnectionsPolicyTrace
dtAcceptPolicyTracer)
            NetworkMutableState LocalAddress
networkLocalState
            LocalSocket
sd
            (Applications
  SockAddr
  NodeToNodeVersion
  NodeToNodeVersionData
  LocalAddress
  NodeToClientVersion
  NodeToClientVersionData
  IO
  a
-> Versions
     NodeToClientVersion
     NodeToClientVersionData
     (OuroborosApplicationWithMinimalCtx
        'ResponderMode LocalAddress ByteString IO Void ())
forall ntnAddr ntnVersion ntnVersionData ntcAddr ntcVersion
       ntcVersionData (m :: * -> *) a.
Applications
  ntnAddr
  ntnVersion
  ntnVersionData
  ntcAddr
  ntcVersion
  ntcVersionData
  m
  a
-> Versions
     ntcVersion
     ntcVersionData
     (OuroborosApplicationWithMinimalCtx
        'ResponderMode ntcAddr ByteString m Void ())
daLocalResponderApplication Applications
  SockAddr
  NodeToNodeVersion
  NodeToNodeVersionData
  LocalAddress
  NodeToClientVersion
  NodeToClientVersionData
  IO
  a
applications)
            ErrorPolicies
localErrorPolicy

    runServer :: SocketSnocket
              -> NetworkMutableState SockAddr
              -> Either Socket.Socket SockAddr
              -> IO ()
    runServer :: SocketSnocket
-> NetworkMutableState SockAddr -> Either Socket SockAddr -> IO ()
runServer SocketSnocket
sn NetworkMutableState SockAddr
networkState Either Socket SockAddr
address =
      IO Socket -> (Socket -> IO ()) -> (Socket -> IO ()) -> IO ()
forall a b c. IO a -> (a -> IO b) -> (a -> IO c) -> IO c
bracket
        (
          case Either Socket SockAddr
address of
               Left Socket
sd -> Socket -> IO Socket
forall a. a -> IO a
forall (m :: * -> *) a. Monad m => a -> m a
return Socket
sd
               Right SockAddr
addr -> do
                 Tracer IO (DiffusionTracer SockAddr LocalAddress)
-> DiffusionTracer SockAddr LocalAddress -> IO ()
forall (m :: * -> *) a. Tracer m a -> a -> m ()
traceWith Tracer IO (DiffusionTracer SockAddr LocalAddress)
dtDiffusionTracer
                  (DiffusionTracer SockAddr LocalAddress -> IO ())
-> DiffusionTracer SockAddr LocalAddress -> IO ()
forall a b. (a -> b) -> a -> b
$ SockAddr -> DiffusionTracer SockAddr LocalAddress
forall ntnAddr ntcAddr. ntnAddr -> DiffusionTracer ntnAddr ntcAddr
CreatingServerSocket SockAddr
addr
                 SocketSnocket -> AddressFamily SockAddr -> IO Socket
forall (m :: * -> *) fd addr.
Snocket m fd addr -> AddressFamily addr -> m fd
Snocket.open SocketSnocket
sn (SocketSnocket -> SockAddr -> AddressFamily SockAddr
forall (m :: * -> *) fd addr.
Snocket m fd addr -> addr -> AddressFamily addr
Snocket.addrFamily SocketSnocket
sn SockAddr
addr)
        )
        (SocketSnocket -> Socket -> IO ()
forall (m :: * -> *) fd addr. Snocket m fd addr -> fd -> m ()
Snocket.close SocketSnocket
sn) -- We close the socket here, even if it was provided to us.
        (\Socket
sd -> do

          addr <- case Either Socket SockAddr
address of
               -- If a socket was provided it should be ready to accept
               Left Socket
sock -> do
                 addr <- SocketSnocket -> Socket -> IO SockAddr
forall (m :: * -> *) fd addr. Snocket m fd addr -> fd -> m addr
Snocket.getLocalAddr SocketSnocket
sn Socket
sock
                 configureSystemdSocket
                   (SystemdSocketConfiguration `contramap` dtDiffusionTracer)
                   sd addr
                 Snocket.getLocalAddr sn sd
               Right SockAddr
addr -> do
                 Tracer IO (DiffusionTracer SockAddr LocalAddress)
-> DiffusionTracer SockAddr LocalAddress -> IO ()
forall (m :: * -> *) a. Tracer m a -> a -> m ()
traceWith Tracer IO (DiffusionTracer SockAddr LocalAddress)
dtDiffusionTracer
                  (DiffusionTracer SockAddr LocalAddress -> IO ())
-> DiffusionTracer SockAddr LocalAddress -> IO ()
forall a b. (a -> b) -> a -> b
$ SockAddr -> DiffusionTracer SockAddr LocalAddress
forall ntnAddr ntcAddr. ntnAddr -> DiffusionTracer ntnAddr ntcAddr
ConfiguringServerSocket SockAddr
addr
                 Socket -> Maybe SockAddr -> IO ()
configureSocket Socket
sd (SockAddr -> Maybe SockAddr
forall a. a -> Maybe a
Just SockAddr
addr)
                 SocketSnocket -> Socket -> SockAddr -> IO ()
forall (m :: * -> *) fd addr.
Snocket m fd addr -> fd -> addr -> m ()
Snocket.bind SocketSnocket
sn Socket
sd SockAddr
addr
                 Tracer IO (DiffusionTracer SockAddr LocalAddress)
-> DiffusionTracer SockAddr LocalAddress -> IO ()
forall (m :: * -> *) a. Tracer m a -> a -> m ()
traceWith Tracer IO (DiffusionTracer SockAddr LocalAddress)
dtDiffusionTracer
                  (DiffusionTracer SockAddr LocalAddress -> IO ())
-> DiffusionTracer SockAddr LocalAddress -> IO ()
forall a b. (a -> b) -> a -> b
$ SockAddr -> DiffusionTracer SockAddr LocalAddress
forall ntnAddr ntcAddr. ntnAddr -> DiffusionTracer ntnAddr ntcAddr
ListeningServerSocket SockAddr
addr
                 SocketSnocket -> Socket -> IO ()
forall (m :: * -> *) fd addr. Snocket m fd addr -> fd -> m ()
Snocket.listen SocketSnocket
sn Socket
sd
                 Tracer IO (DiffusionTracer SockAddr LocalAddress)
-> DiffusionTracer SockAddr LocalAddress -> IO ()
forall (m :: * -> *) a. Tracer m a -> a -> m ()
traceWith Tracer IO (DiffusionTracer SockAddr LocalAddress)
dtDiffusionTracer
                  (DiffusionTracer SockAddr LocalAddress -> IO ())
-> DiffusionTracer SockAddr LocalAddress -> IO ()
forall a b. (a -> b) -> a -> b
$ SockAddr -> DiffusionTracer SockAddr LocalAddress
forall ntnAddr ntcAddr. ntnAddr -> DiffusionTracer ntnAddr ntcAddr
ServerSocketUp SockAddr
addr
                 SockAddr -> IO SockAddr
forall a. a -> IO a
forall (m :: * -> *) a. Monad m => a -> m a
return SockAddr
addr

          traceWith dtDiffusionTracer $ RunServer (pure addr)

          void $ NodeToNode.withServer
            sn
            (NetworkServerTracers
              dtMuxTracer
              dtHandshakeTracer
              dtErrorPolicyTracer
              dtAcceptPolicyTracer)
            networkState
            daAcceptedConnectionsLimit
            sd
            -- NonP2P does not use Peer Sharing so the callback is set to return
            -- [].
            (mkResponderApp
              <$> daApplicationInitiatorResponderMode
                    applications)
            remoteErrorPolicy
        )
    runIpSubscriptionWorker :: SocketSnocket
                            -> NetworkMutableState SockAddr
                            -> LocalAddresses SockAddr
                            -> IO ()
    runIpSubscriptionWorker :: SocketSnocket
-> NetworkMutableState SockAddr -> LocalAddresses SockAddr -> IO ()
runIpSubscriptionWorker SocketSnocket
sn NetworkMutableState SockAddr
networkState LocalAddresses SockAddr
la =
      IO Void -> IO ()
forall (f :: * -> *) a. Functor f => f a -> f ()
void
        (IO Void -> IO ()) -> IO Void -> IO ()
forall a b. (a -> b) -> a -> b
$ SocketSnocket
-> NetworkIPSubscriptionTracers SockAddr NodeToNodeVersion
-> NetworkMutableState SockAddr
-> IPSubscriptionParams ()
-> Versions
     NodeToNodeVersion
     NodeToNodeVersionData
     (OuroborosApplicationWithMinimalCtx
        'InitiatorMode SockAddr ByteString IO a Void)
-> IO Void
forall (mode :: MuxMode) x y.
(HasInitiator mode ~ 'True) =>
SocketSnocket
-> NetworkIPSubscriptionTracers SockAddr NodeToNodeVersion
-> NetworkMutableState SockAddr
-> IPSubscriptionParams ()
-> Versions
     NodeToNodeVersion
     NodeToNodeVersionData
     (OuroborosApplicationWithMinimalCtx
        mode SockAddr ByteString IO x y)
-> IO Void
NodeToNode.ipSubscriptionWorker
            SocketSnocket
sn
            (Tracer IO (WithMuxBearer (ConnectionId SockAddr) MuxTrace)
-> Tracer IO (HandshakeTr SockAddr NodeToNodeVersion)
-> Tracer IO (WithAddr SockAddr ErrorPolicyTrace)
-> Tracer IO (WithIPList (SubscriptionTrace SockAddr))
-> NetworkIPSubscriptionTracers SockAddr NodeToNodeVersion
forall (withIPList :: * -> *) addr vNumber.
Tracer IO (WithMuxBearer (ConnectionId addr) MuxTrace)
-> Tracer
     IO
     (WithMuxBearer
        (ConnectionId addr) (TraceSendRecv (Handshake vNumber Term)))
-> Tracer IO (WithAddr addr ErrorPolicyTrace)
-> Tracer IO (withIPList (SubscriptionTrace addr))
-> NetworkSubscriptionTracers withIPList addr vNumber
NetworkSubscriptionTracers
              Tracer IO (WithMuxBearer (ConnectionId SockAddr) MuxTrace)
dtMuxTracer
              Tracer IO (HandshakeTr SockAddr NodeToNodeVersion)
dtHandshakeTracer
              Tracer IO (WithAddr SockAddr ErrorPolicyTrace)
dtErrorPolicyTracer
              Tracer IO (WithIPList (SubscriptionTrace SockAddr))
dtIpSubscriptionTracer)
            NetworkMutableState SockAddr
networkState
            SubscriptionParams
              { spLocalAddresses :: LocalAddresses SockAddr
spLocalAddresses         = LocalAddresses SockAddr
la
              , spConnectionAttemptDelay :: SockAddr -> Maybe DiffTime
spConnectionAttemptDelay = Maybe DiffTime -> SockAddr -> Maybe DiffTime
forall a b. a -> b -> a
const Maybe DiffTime
forall a. Maybe a
Nothing
              , spErrorPolicies :: ErrorPolicies
spErrorPolicies          = ErrorPolicies
remoteErrorPolicy
              , spSubscriptionTarget :: IPSubscriptionTarget
spSubscriptionTarget     = IPSubscriptionTarget
daIpProducers
              }
            ((MinimalInitiatorContext SockAddr
 -> ExpandedInitiatorContext SockAddr IO)
-> OuroborosApplication
     'InitiatorMode
     (ExpandedInitiatorContext SockAddr IO)
     (ResponderContext SockAddr)
     ByteString
     IO
     a
     Void
-> OuroborosApplicationWithMinimalCtx
     'InitiatorMode SockAddr ByteString IO a Void
forall initiatorCtx' initiatorCtx (mode :: MuxMode) responderCtx
       bytes (m :: * -> *) a b.
(initiatorCtx' -> initiatorCtx)
-> OuroborosApplication mode initiatorCtx responderCtx bytes m a b
-> OuroborosApplication mode initiatorCtx' responderCtx bytes m a b
contramapInitiatorCtx MinimalInitiatorContext SockAddr
-> ExpandedInitiatorContext SockAddr IO
expandContext (OuroborosApplication
   'InitiatorMode
   (ExpandedInitiatorContext SockAddr IO)
   (ResponderContext SockAddr)
   ByteString
   IO
   a
   Void
 -> OuroborosApplicationWithMinimalCtx
      'InitiatorMode SockAddr ByteString IO a Void)
-> (OuroborosBundle
      'InitiatorMode
      (ExpandedInitiatorContext SockAddr IO)
      (ResponderContext SockAddr)
      ByteString
      IO
      a
      Void
    -> OuroborosApplication
         'InitiatorMode
         (ExpandedInitiatorContext SockAddr IO)
         (ResponderContext SockAddr)
         ByteString
         IO
         a
         Void)
-> OuroborosBundle
     'InitiatorMode
     (ExpandedInitiatorContext SockAddr IO)
     (ResponderContext SockAddr)
     ByteString
     IO
     a
     Void
-> OuroborosApplicationWithMinimalCtx
     'InitiatorMode SockAddr ByteString IO a Void
forall b c a. (b -> c) -> (a -> b) -> a -> c
. OuroborosBundle
  'InitiatorMode
  (ExpandedInitiatorContext SockAddr IO)
  (ResponderContext SockAddr)
  ByteString
  IO
  a
  Void
-> OuroborosApplication
     'InitiatorMode
     (ExpandedInitiatorContext SockAddr IO)
     (ResponderContext SockAddr)
     ByteString
     IO
     a
     Void
forall (mode :: MuxMode) initiatorCtx responderCtx bytes
       (m :: * -> *) a b.
OuroborosBundle mode initiatorCtx responderCtx bytes m a b
-> OuroborosApplication mode initiatorCtx responderCtx bytes m a b
fromOuroborosBundle
              (OuroborosBundle
   'InitiatorMode
   (ExpandedInitiatorContext SockAddr IO)
   (ResponderContext SockAddr)
   ByteString
   IO
   a
   Void
 -> OuroborosApplicationWithMinimalCtx
      'InitiatorMode SockAddr ByteString IO a Void)
-> Versions
     NodeToNodeVersion
     NodeToNodeVersionData
     (OuroborosBundle
        'InitiatorMode
        (ExpandedInitiatorContext SockAddr IO)
        (ResponderContext SockAddr)
        ByteString
        IO
        a
        Void)
-> Versions
     NodeToNodeVersion
     NodeToNodeVersionData
     (OuroborosApplicationWithMinimalCtx
        'InitiatorMode SockAddr ByteString IO a Void)
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> Applications
  SockAddr
  NodeToNodeVersion
  NodeToNodeVersionData
  LocalAddress
  NodeToClientVersion
  NodeToClientVersionData
  IO
  a
-> Versions
     NodeToNodeVersion
     NodeToNodeVersionData
     (OuroborosBundle
        'InitiatorMode
        (ExpandedInitiatorContext SockAddr IO)
        (ResponderContext SockAddr)
        ByteString
        IO
        a
        Void)
forall ntnAddr ntnVersion ntnVersionData ntcAddr ntcVersion
       ntcVersionData (m :: * -> *) a.
Applications
  ntnAddr
  ntnVersion
  ntnVersionData
  ntcAddr
  ntcVersion
  ntcVersionData
  m
  a
-> Versions
     ntnVersion
     ntnVersionData
     (OuroborosBundleWithExpandedCtx
        'InitiatorMode ntnAddr ByteString m a Void)
daApplicationInitiatorMode Applications
  SockAddr
  NodeToNodeVersion
  NodeToNodeVersionData
  LocalAddress
  NodeToClientVersion
  NodeToClientVersionData
  IO
  a
applications)

    runDnsSubscriptionWorker :: SocketSnocket
                             -> NetworkMutableState SockAddr
                             -> LocalAddresses SockAddr
                             -> DnsSubscriptionTarget
                             -> IO ()
    runDnsSubscriptionWorker :: SocketSnocket
-> NetworkMutableState SockAddr
-> LocalAddresses SockAddr
-> DnsSubscriptionTarget
-> IO ()
runDnsSubscriptionWorker SocketSnocket
sn NetworkMutableState SockAddr
networkState LocalAddresses SockAddr
la DnsSubscriptionTarget
dnsProducer =
      IO Void -> IO ()
forall (f :: * -> *) a. Functor f => f a -> f ()
void
        (IO Void -> IO ()) -> IO Void -> IO ()
forall a b. (a -> b) -> a -> b
$ SocketSnocket
-> NetworkDNSSubscriptionTracers NodeToNodeVersion SockAddr
-> NetworkMutableState SockAddr
-> DnsSubscriptionParams ()
-> Versions
     NodeToNodeVersion
     NodeToNodeVersionData
     (OuroborosApplicationWithMinimalCtx
        'InitiatorMode SockAddr ByteString IO a Void)
-> IO Void
forall (mode :: MuxMode) x y.
(HasInitiator mode ~ 'True) =>
SocketSnocket
-> NetworkDNSSubscriptionTracers NodeToNodeVersion SockAddr
-> NetworkMutableState SockAddr
-> DnsSubscriptionParams ()
-> Versions
     NodeToNodeVersion
     NodeToNodeVersionData
     (OuroborosApplicationWithMinimalCtx
        mode SockAddr ByteString IO x y)
-> IO Void
NodeToNode.dnsSubscriptionWorker
            SocketSnocket
sn
            (Tracer IO (WithMuxBearer (ConnectionId SockAddr) MuxTrace)
-> Tracer IO (HandshakeTr SockAddr NodeToNodeVersion)
-> Tracer IO (WithAddr SockAddr ErrorPolicyTrace)
-> Tracer IO (WithDomainName (SubscriptionTrace SockAddr))
-> Tracer IO (WithDomainName DnsTrace)
-> NetworkDNSSubscriptionTracers NodeToNodeVersion SockAddr
forall vNumber addr.
Tracer IO (WithMuxBearer (ConnectionId addr) MuxTrace)
-> Tracer
     IO
     (WithMuxBearer
        (ConnectionId addr) (TraceSendRecv (Handshake vNumber Term)))
-> Tracer IO (WithAddr addr ErrorPolicyTrace)
-> Tracer IO (WithDomainName (SubscriptionTrace addr))
-> Tracer IO (WithDomainName DnsTrace)
-> NetworkDNSSubscriptionTracers vNumber addr
NetworkDNSSubscriptionTracers
              Tracer IO (WithMuxBearer (ConnectionId SockAddr) MuxTrace)
dtMuxTracer
              Tracer IO (HandshakeTr SockAddr NodeToNodeVersion)
dtHandshakeTracer
              Tracer IO (WithAddr SockAddr ErrorPolicyTrace)
dtErrorPolicyTracer
              Tracer IO (WithDomainName (SubscriptionTrace SockAddr))
dtDnsSubscriptionTracer
              Tracer IO (WithDomainName DnsTrace)
dtDnsResolverTracer)
            NetworkMutableState SockAddr
networkState
            SubscriptionParams
              { spLocalAddresses :: LocalAddresses SockAddr
spLocalAddresses         = LocalAddresses SockAddr
la
              , spConnectionAttemptDelay :: SockAddr -> Maybe DiffTime
spConnectionAttemptDelay = Maybe DiffTime -> SockAddr -> Maybe DiffTime
forall a b. a -> b -> a
const Maybe DiffTime
forall a. Maybe a
Nothing
              , spErrorPolicies :: ErrorPolicies
spErrorPolicies          = ErrorPolicies
remoteErrorPolicy
              , spSubscriptionTarget :: DnsSubscriptionTarget
spSubscriptionTarget     = DnsSubscriptionTarget
dnsProducer
              }
            ((MinimalInitiatorContext SockAddr
 -> ExpandedInitiatorContext SockAddr IO)
-> OuroborosApplication
     'InitiatorMode
     (ExpandedInitiatorContext SockAddr IO)
     (ResponderContext SockAddr)
     ByteString
     IO
     a
     Void
-> OuroborosApplicationWithMinimalCtx
     'InitiatorMode SockAddr ByteString IO a Void
forall initiatorCtx' initiatorCtx (mode :: MuxMode) responderCtx
       bytes (m :: * -> *) a b.
(initiatorCtx' -> initiatorCtx)
-> OuroborosApplication mode initiatorCtx responderCtx bytes m a b
-> OuroborosApplication mode initiatorCtx' responderCtx bytes m a b
contramapInitiatorCtx MinimalInitiatorContext SockAddr
-> ExpandedInitiatorContext SockAddr IO
expandContext (OuroborosApplication
   'InitiatorMode
   (ExpandedInitiatorContext SockAddr IO)
   (ResponderContext SockAddr)
   ByteString
   IO
   a
   Void
 -> OuroborosApplicationWithMinimalCtx
      'InitiatorMode SockAddr ByteString IO a Void)
-> (OuroborosBundle
      'InitiatorMode
      (ExpandedInitiatorContext SockAddr IO)
      (ResponderContext SockAddr)
      ByteString
      IO
      a
      Void
    -> OuroborosApplication
         'InitiatorMode
         (ExpandedInitiatorContext SockAddr IO)
         (ResponderContext SockAddr)
         ByteString
         IO
         a
         Void)
-> OuroborosBundle
     'InitiatorMode
     (ExpandedInitiatorContext SockAddr IO)
     (ResponderContext SockAddr)
     ByteString
     IO
     a
     Void
-> OuroborosApplicationWithMinimalCtx
     'InitiatorMode SockAddr ByteString IO a Void
forall b c a. (b -> c) -> (a -> b) -> a -> c
. OuroborosBundle
  'InitiatorMode
  (ExpandedInitiatorContext SockAddr IO)
  (ResponderContext SockAddr)
  ByteString
  IO
  a
  Void
-> OuroborosApplication
     'InitiatorMode
     (ExpandedInitiatorContext SockAddr IO)
     (ResponderContext SockAddr)
     ByteString
     IO
     a
     Void
forall (mode :: MuxMode) initiatorCtx responderCtx bytes
       (m :: * -> *) a b.
OuroborosBundle mode initiatorCtx responderCtx bytes m a b
-> OuroborosApplication mode initiatorCtx responderCtx bytes m a b
fromOuroborosBundle
              (OuroborosBundle
   'InitiatorMode
   (ExpandedInitiatorContext SockAddr IO)
   (ResponderContext SockAddr)
   ByteString
   IO
   a
   Void
 -> OuroborosApplicationWithMinimalCtx
      'InitiatorMode SockAddr ByteString IO a Void)
-> Versions
     NodeToNodeVersion
     NodeToNodeVersionData
     (OuroborosBundle
        'InitiatorMode
        (ExpandedInitiatorContext SockAddr IO)
        (ResponderContext SockAddr)
        ByteString
        IO
        a
        Void)
-> Versions
     NodeToNodeVersion
     NodeToNodeVersionData
     (OuroborosApplicationWithMinimalCtx
        'InitiatorMode SockAddr ByteString IO a Void)
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> Applications
  SockAddr
  NodeToNodeVersion
  NodeToNodeVersionData
  LocalAddress
  NodeToClientVersion
  NodeToClientVersionData
  IO
  a
-> Versions
     NodeToNodeVersion
     NodeToNodeVersionData
     (OuroborosBundle
        'InitiatorMode
        (ExpandedInitiatorContext SockAddr IO)
        (ResponderContext SockAddr)
        ByteString
        IO
        a
        Void)
forall ntnAddr ntnVersion ntnVersionData ntcAddr ntcVersion
       ntcVersionData (m :: * -> *) a.
Applications
  ntnAddr
  ntnVersion
  ntnVersionData
  ntcAddr
  ntcVersion
  ntcVersionData
  m
  a
-> Versions
     ntnVersion
     ntnVersionData
     (OuroborosBundleWithExpandedCtx
        'InitiatorMode ntnAddr ByteString m a Void)
daApplicationInitiatorMode Applications
  SockAddr
  NodeToNodeVersion
  NodeToNodeVersionData
  LocalAddress
  NodeToClientVersion
  NodeToClientVersionData
  IO
  a
applications)


-- | Contramap context from `ExpandedInitiatorContext` to `MinimalInitiatorContext`.
--
expandContext :: MinimalInitiatorContext  RemoteAddress
              -> ExpandedInitiatorContext RemoteAddress IO
expandContext :: MinimalInitiatorContext SockAddr
-> ExpandedInitiatorContext SockAddr IO
expandContext MinimalInitiatorContext { micConnectionId :: forall addr. MinimalInitiatorContext addr -> ConnectionId addr
micConnectionId = ConnectionId SockAddr
connId } =
              ExpandedInitiatorContext {
                eicConnectionId :: ConnectionId SockAddr
eicConnectionId    = ConnectionId SockAddr
connId,
                eicControlMessage :: ControlMessageSTM IO
eicControlMessage  = Proxy IO -> ControlMessageSTM IO
forall (m :: * -> *) (proxy :: (* -> *) -> *).
Applicative (STM m) =>
proxy m -> ControlMessageSTM m
continueForever Proxy IO
forall {k} (t :: k). Proxy t
Proxy,
                eicIsBigLedgerPeer :: IsBigLedgerPeer
eicIsBigLedgerPeer = IsBigLedgerPeer
IsNotBigLedgerPeer
              }