{-# LANGUAGE BangPatterns        #-}
{-# LANGUAGE FlexibleContexts    #-}
{-# LANGUAGE LambdaCase          #-}
{-# LANGUAGE NamedFieldPuns      #-}
{-# LANGUAGE RecordWildCards     #-}
{-# LANGUAGE ScopedTypeVariables #-}

module Ouroboros.Network.PeerSelection.Governor.KnownPeers
  ( belowTarget
  , aboveTarget
  ) where

import Data.Hashable
import Data.List (sortBy)
import Data.Map.Strict (Map)
import Data.Map.Strict qualified as Map
import Data.Maybe (fromMaybe)
import Data.Set qualified as Set
import GHC.Stack (HasCallStack)
import System.Random (random)

import Control.Concurrent.JobPool (Job (..))
import Control.Exception (Exception (..), SomeException, assert)
import Control.Monad.Class.MonadAsync
import Control.Monad.Class.MonadSTM
import Control.Monad.Class.MonadTime.SI
import Control.Monad.Class.MonadTimer.SI

import Ouroboros.Network.Diffusion.Policies qualified as Policies
import Ouroboros.Network.PeerSelection.Bootstrap (requiresBootstrapPeers)
import Ouroboros.Network.PeerSelection.Governor.Types
import Ouroboros.Network.PeerSelection.PeerAdvertise (PeerAdvertise (..))
import Ouroboros.Network.PeerSelection.PeerSharing (PeerSharing (..))
import Ouroboros.Network.PeerSelection.PublicRootPeers qualified as PublicRootPeers
import Ouroboros.Network.PeerSelection.State.EstablishedPeers qualified as EstablishedPeers
import Ouroboros.Network.PeerSelection.State.KnownPeers qualified as KnownPeers
import Ouroboros.Network.PeerSelection.State.LocalRootPeers qualified as LocalRootPeers
import Ouroboros.Network.Protocol.PeerSharing.Type (PeerSharingAmount)


---------------------------
-- Known peers below target
--

-- | If we are below the target of /known peers/ we flip a coin to either get
-- new peers from:
--
-- * inbound connections; or
-- * peer share (if we are above the peer share request threshold).
--
-- It should be noted if the node is in bootstrap mode (i.e. in a sensitive
-- state) then this monitoring action will be disabled.
--
belowTarget
    :: (MonadAsync m, MonadTimer m, Ord peeraddr, Hashable peeraddr)
    => PeerSelectionActions peeraddr peerconn m
    -> Time -- ^ blocked at
    -> Map peeraddr PeerSharing
    -> MkGuardedDecision peeraddr peerconn m
belowTarget :: forall (m :: * -> *) peeraddr peerconn.
(MonadAsync m, MonadTimer m, Ord peeraddr, Hashable peeraddr) =>
PeerSelectionActions peeraddr peerconn m
-> Time
-> Map peeraddr PeerSharing
-> MkGuardedDecision peeraddr peerconn m
belowTarget actions :: PeerSelectionActions peeraddr peerconn m
actions@PeerSelectionActions { PeerSharing
peerSharing :: PeerSharing
peerSharing :: forall peeraddr peerconn (m :: * -> *).
PeerSelectionActions peeraddr peerconn m -> PeerSharing
peerSharing }
            Time
blockedAt
            Map peeraddr PeerSharing
inboundPeers
            policy :: PeerSelectionPolicy peeraddr m
policy@PeerSelectionPolicy {
              Int
policyMaxInProgressPeerShareReqs :: Int
policyMaxInProgressPeerShareReqs :: forall peeraddr (m :: * -> *).
PeerSelectionPolicy peeraddr m -> Int
policyMaxInProgressPeerShareReqs,
              PickPolicy peeraddr (STM m)
policyPickKnownPeersForPeerShare :: PickPolicy peeraddr (STM m)
policyPickKnownPeersForPeerShare :: forall peeraddr (m :: * -> *).
PeerSelectionPolicy peeraddr m -> PickPolicy peeraddr (STM m)
policyPickKnownPeersForPeerShare,
              PickPolicy peeraddr (STM m)
policyPickInboundPeers :: PickPolicy peeraddr (STM m)
policyPickInboundPeers :: forall peeraddr (m :: * -> *).
PeerSelectionPolicy peeraddr m -> PickPolicy peeraddr (STM m)
policyPickInboundPeers,
              DiffTime
policyPeerShareRetryTime :: DiffTime
policyPeerShareRetryTime :: forall peeraddr (m :: * -> *).
PeerSelectionPolicy peeraddr m -> DiffTime
policyPeerShareRetryTime
            }
            st :: PeerSelectionState peeraddr peerconn
st@PeerSelectionState {
              KnownPeers peeraddr
knownPeers :: KnownPeers peeraddr
knownPeers :: forall peeraddr peerconn.
PeerSelectionState peeraddr peerconn -> KnownPeers peeraddr
knownPeers,
              EstablishedPeers peeraddr peerconn
establishedPeers :: EstablishedPeers peeraddr peerconn
establishedPeers :: forall peeraddr peerconn.
PeerSelectionState peeraddr peerconn
-> EstablishedPeers peeraddr peerconn
establishedPeers,
              Int
inProgressPeerShareReqs :: Int
inProgressPeerShareReqs :: forall peeraddr peerconn.
PeerSelectionState peeraddr peerconn -> Int
inProgressPeerShareReqs,
              Set peeraddr
inProgressDemoteToCold :: Set peeraddr
inProgressDemoteToCold :: forall peeraddr peerconn.
PeerSelectionState peeraddr peerconn -> Set peeraddr
inProgressDemoteToCold,
              Time
inboundPeersRetryTime :: Time
inboundPeersRetryTime :: forall peeraddr peerconn.
PeerSelectionState peeraddr peerconn -> Time
inboundPeersRetryTime,
              targets :: forall peeraddr peerconn.
PeerSelectionState peeraddr peerconn -> PeerSelectionTargets
targets = PeerSelectionTargets {
                          Int
targetNumberOfKnownPeers :: Int
targetNumberOfKnownPeers :: PeerSelectionTargets -> Int
targetNumberOfKnownPeers
                        },
              LedgerStateJudgement
ledgerStateJudgement :: LedgerStateJudgement
ledgerStateJudgement :: forall peeraddr peerconn.
PeerSelectionState peeraddr peerconn -> LedgerStateJudgement
ledgerStateJudgement,
              UseBootstrapPeers
bootstrapPeersFlag :: UseBootstrapPeers
bootstrapPeersFlag :: forall peeraddr peerconn.
PeerSelectionState peeraddr peerconn -> UseBootstrapPeers
bootstrapPeersFlag,
              StdGen
stdGen :: StdGen
stdGen :: forall peeraddr peerconn.
PeerSelectionState peeraddr peerconn -> StdGen
stdGen
            }
    --
    -- Light peer sharing
    --

  | PeerSharing
PeerSharingEnabled <- PeerSharing
peerSharing
    -- Are we under target for number of known peers?
  , Int
numKnownPeers Int -> Int -> Bool
forall a. Ord a => a -> a -> Bool
< Int
targetNumberOfKnownPeers

    -- There are no peer share requests in-flight.
  , Int
inProgressPeerShareReqs Int -> Int -> Bool
forall a. Ord a => a -> a -> Bool
<= Int
0

    -- No inbound peers should be used when the node is using bootstrap peers.
  , Bool -> Bool
not (UseBootstrapPeers -> LedgerStateJudgement -> Bool
requiresBootstrapPeers UseBootstrapPeers
bootstrapPeersFlag LedgerStateJudgement
ledgerStateJudgement)

  , Time
blockedAt Time -> Time -> Bool
forall a. Ord a => a -> a -> Bool
>= Time
inboundPeersRetryTime

    -- Use inbound peers either if it won the coin flip or if there are no
    -- available peers to do peer sharing.
  , Bool
useInboundPeers Bool -> Bool -> Bool
|| Set peeraddr -> Bool
forall a. Set a -> Bool
Set.null Set peeraddr
availableForPeerShare

  , let availablePeers :: Set peeraddr
availablePeers = Map peeraddr PeerSharing -> Set peeraddr
forall k a. Map k a -> Set k
Map.keysSet Map peeraddr PeerSharing
inboundPeers
                  Set peeraddr -> Set peeraddr -> Set peeraddr
forall a. Ord a => Set a -> Set a -> Set a
Set.\\ KnownPeers peeraddr -> Set peeraddr
forall peeraddr. KnownPeers peeraddr -> Set peeraddr
KnownPeers.toSet KnownPeers peeraddr
knownPeers
  , Bool -> Bool
not (Set peeraddr -> Bool
forall a. Set a -> Bool
Set.null Set peeraddr
availablePeers)

  = Maybe Time
-> STM m (TimedDecision m peeraddr peerconn)
-> Guarded (STM m) (TimedDecision m peeraddr peerconn)
forall (m :: * -> *) a. Maybe Time -> m a -> Guarded m a
Guarded Maybe Time
forall a. Maybe a
Nothing (STM m (TimedDecision m peeraddr peerconn)
 -> Guarded (STM m) (TimedDecision m peeraddr peerconn))
-> STM m (TimedDecision m peeraddr peerconn)
-> Guarded (STM m) (TimedDecision m peeraddr peerconn)
forall a b. (a -> b) -> a -> b
$ do
      selected <- PeerSelectionState peeraddr peerconn
-> PickPolicy peeraddr (STM m)
-> Set peeraddr
-> Int
-> STM m (Set peeraddr)
forall peeraddr (m :: * -> *) peerconn.
(Ord peeraddr, Functor m, HasCallStack) =>
PeerSelectionState peeraddr peerconn
-> PickPolicy peeraddr m -> Set peeraddr -> Int -> m (Set peeraddr)
pickUnknownPeers
                  PeerSelectionState peeraddr peerconn
st
                  PickPolicy peeraddr (STM m)
policyPickInboundPeers
                  Set peeraddr
availablePeers
                  (Int
Policies.maxInboundPeers Int -> Int -> Int
forall a. Ord a => a -> a -> a
`min` (Int
targetNumberOfKnownPeers Int -> Int -> Int
forall a. Num a => a -> a -> a
- Int
numKnownPeers))
      let selectedMap = Map peeraddr PeerSharing
inboundPeers Map peeraddr PeerSharing
-> Set peeraddr -> Map peeraddr PeerSharing
forall k a. Ord k => Map k a -> Set k -> Map k a
`Map.restrictKeys` Set peeraddr
selected
      return $ \Time
now -> Decision {
          decisionTrace :: [TracePeerSelection peeraddr]
decisionTrace = [Int
-> Int
-> Map peeraddr PeerSharing
-> Set peeraddr
-> TracePeerSelection peeraddr
forall peeraddr.
Int
-> Int
-> Map peeraddr PeerSharing
-> Set peeraddr
-> TracePeerSelection peeraddr
TracePickInboundPeers
                            Int
targetNumberOfKnownPeers
                            Int
numKnownPeers
                            Map peeraddr PeerSharing
selectedMap
                            Set peeraddr
availablePeers
                          ],
          decisionState :: PeerSelectionState peeraddr peerconn
decisionState = PeerSelectionState peeraddr peerconn
st { knownPeers = KnownPeers.setSuccessfulConnectionFlag selected
                                          $ KnownPeers.insert
                                              (Map.map (\PeerSharing
ps -> (PeerSharing -> Maybe PeerSharing
forall a. a -> Maybe a
Just PeerSharing
ps, PeerAdvertise -> Maybe PeerAdvertise
forall a. a -> Maybe a
Just PeerAdvertise
DoAdvertisePeer)) selectedMap)
                                              knownPeers,
                               inboundPeersRetryTime = Policies.inboundPeersRetryDelay `addTime` now,
                               stdGen = stdGen'

                             },
          decisionJobs :: [Job () m (Completion m peeraddr peerconn)]
decisionJobs = []
        }

    --
    -- Peer sharing
    --

  | PeerSharing
PeerSharingEnabled <- PeerSharing
peerSharing
    -- Are we under target for number of known peers?
  , Int
numKnownPeers Int -> Int -> Bool
forall a. Ord a => a -> a -> Bool
< Int
targetNumberOfKnownPeers

    -- Are we at our limit for number of peer share requests?
  , Int
numPeerShareReqsPossible Int -> Int -> Bool
forall a. Ord a => a -> a -> Bool
> Int
0

    -- Are there any known peers that we can send a peer share request to?
    -- We can only ask ones where we have not asked them within a certain time.
  , Bool -> Bool
not (Set peeraddr -> Bool
forall a. Set a -> Bool
Set.null Set peeraddr
availableForPeerShare)

    -- No peer share requests should be issued when the node is using bootstrap
    -- peers.
  , Bool -> Bool
not (UseBootstrapPeers -> LedgerStateJudgement -> Bool
requiresBootstrapPeers UseBootstrapPeers
bootstrapPeersFlag LedgerStateJudgement
ledgerStateJudgement)

  = Maybe Time
-> STM m (TimedDecision m peeraddr peerconn)
-> Guarded (STM m) (TimedDecision m peeraddr peerconn)
forall (m :: * -> *) a. Maybe Time -> m a -> Guarded m a
Guarded Maybe Time
forall a. Maybe a
Nothing (STM m (TimedDecision m peeraddr peerconn)
 -> Guarded (STM m) (TimedDecision m peeraddr peerconn))
-> STM m (TimedDecision m peeraddr peerconn)
-> Guarded (STM m) (TimedDecision m peeraddr peerconn)
forall a b. (a -> b) -> a -> b
$ do
      -- Max selected should be <= numPeerShareReqsPossible
      selectedForPeerShare <- PeerSelectionState peeraddr peerconn
-> PickPolicy peeraddr (STM m)
-> Set peeraddr
-> Int
-> STM m (Set peeraddr)
forall peeraddr (m :: * -> *) peerconn.
(Ord peeraddr, Functor m, HasCallStack) =>
PeerSelectionState peeraddr peerconn
-> PickPolicy peeraddr m -> Set peeraddr -> Int -> m (Set peeraddr)
pickPeers PeerSelectionState peeraddr peerconn
st
                              PickPolicy peeraddr (STM m)
policyPickKnownPeersForPeerShare
                              Set peeraddr
availableForPeerShare
                              Int
numPeerShareReqsPossible

      let -- Should be <= numPeerShareReqsPossible
          numPeerShareReqs = Set peeraddr -> Int
forall a. Set a -> Int
Set.size Set peeraddr
selectedForPeerShare
          objective        = Int
targetNumberOfKnownPeers Int -> Int -> Int
forall a. Num a => a -> a -> a
- Int
numKnownPeers
          -- Split current peer target objective across all peer sharing
          -- candidates. If the objective is smaller than the number of
          -- peer share requests available, ask for at 1 peer to each.
          --
          -- This is to increase diversity.
          numPeersToReq :: PeerSharingAmount
          !numPeersToReq = Int -> PeerSharingAmount
forall a b. (Integral a, Num b) => a -> b
fromIntegral
                         (Int -> PeerSharingAmount) -> Int -> PeerSharingAmount
forall a b. (a -> b) -> a -> b
$ Int -> Int -> Int
forall a. Ord a => a -> a -> a
min Int
255 (Int -> Int -> Int
forall a. Ord a => a -> a -> a
max Int
8 (Int
objective Int -> Int -> Int
forall a. Integral a => a -> a -> a
`div` Int
numPeerShareReqs))
          (salt, stdGen'') = random stdGen'

      return $ \Time
now -> Decision {
        decisionTrace :: [TracePeerSelection peeraddr]
decisionTrace = [Int
-> Int
-> PeerSharingAmount
-> Set peeraddr
-> Set peeraddr
-> TracePeerSelection peeraddr
forall peeraddr.
Int
-> Int
-> PeerSharingAmount
-> Set peeraddr
-> Set peeraddr
-> TracePeerSelection peeraddr
TracePeerShareRequests
                          Int
targetNumberOfKnownPeers
                          Int
numKnownPeers
                          PeerSharingAmount
numPeersToReq
                          Set peeraddr
availableForPeerShare
                          Set peeraddr
selectedForPeerShare],
        decisionState :: PeerSelectionState peeraddr peerconn
decisionState = PeerSelectionState peeraddr peerconn
st {
                          inProgressPeerShareReqs = inProgressPeerShareReqs
                                                  + numPeerShareReqs,
                          establishedPeers = EstablishedPeers.setPeerShareTime
                                              selectedForPeerShare
                                              (addTime policyPeerShareRetryTime now)
                                              establishedPeers,
                          stdGen = stdGen''
                        },
        decisionJobs :: [Job () m (Completion m peeraddr peerconn)]
decisionJobs  =
          [PeerSelectionActions peeraddr peerconn m
-> PeerSelectionPolicy peeraddr m
-> Int
-> Int
-> PeerSharingAmount
-> [peeraddr]
-> Job () m (Completion m peeraddr peerconn)
forall (m :: * -> *) peeraddr peerconn.
(MonadAsync m, MonadTimer m, Ord peeraddr, Hashable peeraddr) =>
PeerSelectionActions peeraddr peerconn m
-> PeerSelectionPolicy peeraddr m
-> Int
-> Int
-> PeerSharingAmount
-> [peeraddr]
-> Job () m (Completion m peeraddr peerconn)
jobPeerShare PeerSelectionActions peeraddr peerconn m
actions PeerSelectionPolicy peeraddr m
policy Int
objective Int
salt PeerSharingAmount
numPeersToReq
             (Set peeraddr -> [peeraddr]
forall a. Set a -> [a]
Set.toList Set peeraddr
selectedForPeerShare)]
      }

    -- If we could peer share except that there are none currently available
    -- then we return the next wakeup time (if any)
  | Int
numKnownPeers Int -> Int -> Bool
forall a. Ord a => a -> a -> Bool
< Int
targetNumberOfKnownPeers
  , Int
numPeerShareReqsPossible Int -> Int -> Bool
forall a. Ord a => a -> a -> Bool
> Int
0
  , Set peeraddr -> Bool
forall a. Set a -> Bool
Set.null Set peeraddr
availableForPeerShare
  = Maybe Time -> Guarded (STM m) (TimedDecision m peeraddr peerconn)
forall (m :: * -> *) a. Maybe Time -> Guarded m a
GuardedSkip (Maybe Time -> Guarded (STM m) (TimedDecision m peeraddr peerconn))
-> Maybe Time
-> Guarded (STM m) (TimedDecision m peeraddr peerconn)
forall a b. (a -> b) -> a -> b
$ EstablishedPeers peeraddr peerconn -> Maybe Time
forall peeraddr peercon.
Ord peeraddr =>
EstablishedPeers peeraddr peercon -> Maybe Time
EstablishedPeers.minPeerShareTime EstablishedPeers peeraddr peerconn
establishedPeers

  | Bool
otherwise
  = Maybe Time -> Guarded (STM m) (TimedDecision m peeraddr peerconn)
forall (m :: * -> *) a. Maybe Time -> Guarded m a
GuardedSkip Maybe Time
forall a. Maybe a
Nothing
  where
    (Bool
useInboundPeers, StdGen
stdGen') = StdGen -> (Bool, StdGen)
forall g. RandomGen g => g -> (Bool, g)
forall a g. (Random a, RandomGen g) => g -> (a, g)
random StdGen
stdGen
    PeerSelectionCounters {
        numberOfKnownPeers :: PeerSelectionCounters -> Int
numberOfKnownPeers = Int
numKnownPeers
      }
      =
      PeerSelectionState peeraddr peerconn -> PeerSelectionCounters
forall peeraddr peerconn.
Ord peeraddr =>
PeerSelectionState peeraddr peerconn -> PeerSelectionCounters
peerSelectionStateToCounters PeerSelectionState peeraddr peerconn
st
    numPeerShareReqsPossible :: Int
numPeerShareReqsPossible = Int
policyMaxInProgressPeerShareReqs
                             Int -> Int -> Int
forall a. Num a => a -> a -> a
- Int
inProgressPeerShareReqs
    -- Only peer which permit peersharing are available
    availableForPeerShare :: Set peeraddr
availableForPeerShare    = EstablishedPeers peeraddr peerconn -> Set peeraddr
forall peeraddr peerconn.
EstablishedPeers peeraddr peerconn -> Set peeraddr
EstablishedPeers.availableForPeerShare EstablishedPeers peeraddr peerconn
establishedPeers
                             Set peeraddr -> Set peeraddr -> Set peeraddr
forall a. Ord a => Set a -> Set a -> Set a
Set.\\ Set peeraddr
inProgressDemoteToCold

---------------------------
-- Peer sharing job
--


-- | The peer sharing job is run in two stages. The expected path is for all
-- peer sharing request to return within a short timeout. The second phase is
-- with a longer timeout for all still outstanding requests.
--
-- The result from each phase is filtered. Already known peers and big ledger
-- peers are removed before adding them to known peers. Big ledger peers are
-- popular so they don't need to be shared through peer sharing. However ledger
-- peers belonging to smaller pools shouldn't be discarded. Smaller pools could
-- use extra upstream peers and we spread out the load in the network.
--
-- If we ask for more peers than needed a random subset of the peers in the filtered result
-- is used.
jobPeerShare :: forall m peeraddr peerconn.
                (MonadAsync m, MonadTimer m, Ord peeraddr, Hashable peeraddr)
             => PeerSelectionActions peeraddr peerconn m
             -> PeerSelectionPolicy peeraddr m
             -> Int
             -> Int
             -> PeerSharingAmount
             -> [peeraddr]
             -> Job () m (Completion m peeraddr peerconn)
jobPeerShare :: forall (m :: * -> *) peeraddr peerconn.
(MonadAsync m, MonadTimer m, Ord peeraddr, Hashable peeraddr) =>
PeerSelectionActions peeraddr peerconn m
-> PeerSelectionPolicy peeraddr m
-> Int
-> Int
-> PeerSharingAmount
-> [peeraddr]
-> Job () m (Completion m peeraddr peerconn)
jobPeerShare PeerSelectionActions{PeerSharingAmount -> peeraddr -> m (PeerSharingResult peeraddr)
requestPeerShare :: PeerSharingAmount -> peeraddr -> m (PeerSharingResult peeraddr)
requestPeerShare :: forall peeraddr peerconn (m :: * -> *).
PeerSelectionActions peeraddr peerconn m
-> PeerSharingAmount -> peeraddr -> m (PeerSharingResult peeraddr)
requestPeerShare}
             PeerSelectionPolicy { DiffTime
policyPeerShareBatchWaitTime :: DiffTime
policyPeerShareBatchWaitTime :: forall peeraddr (m :: * -> *).
PeerSelectionPolicy peeraddr m -> DiffTime
policyPeerShareBatchWaitTime
                                 , DiffTime
policyPeerShareOverallTimeout :: DiffTime
policyPeerShareOverallTimeout :: forall peeraddr (m :: * -> *).
PeerSelectionPolicy peeraddr m -> DiffTime
policyPeerShareOverallTimeout
                                 }
             Int
salt Int
maxAmount
             PeerSharingAmount
requestAmount =
    \[peeraddr]
peers -> m (Completion m peeraddr peerconn)
-> (SomeException -> m (Completion m peeraddr peerconn))
-> ()
-> String
-> Job () m (Completion m peeraddr peerconn)
forall group (m :: * -> *) a.
m a -> (SomeException -> m a) -> group -> String -> Job group m a
Job ([peeraddr] -> m (Completion m peeraddr peerconn)
jobPhase1 [peeraddr]
peers) ([peeraddr] -> SomeException -> m (Completion m peeraddr peerconn)
handler [peeraddr]
peers) () String
"peerSharePhase1"
  where
    -- Return n random peers from a list of peers.
    --
    -- Every jobPeerShare will be called with a new random salt.
    -- This means that even if presented with the same list peers their ordering
    -- will be unpredictable.
    takeNPeers :: Int -> [peeraddr] -> [peeraddr]
    takeNPeers :: Int -> [peeraddr] -> [peeraddr]
takeNPeers Int
n [peeraddr]
addrs = Int -> [peeraddr] -> [peeraddr]
forall a. Int -> [a] -> [a]
take Int
n ([peeraddr] -> [peeraddr]) -> [peeraddr] -> [peeraddr]
forall a b. (a -> b) -> a -> b
$
      (peeraddr -> peeraddr -> Ordering) -> [peeraddr] -> [peeraddr]
forall a. (a -> a -> Ordering) -> [a] -> [a]
sortBy (\peeraddr
a peeraddr
b -> Int -> Int -> Ordering
forall a. Ord a => a -> a -> Ordering
compare (Int -> peeraddr -> Int
forall a. Hashable a => Int -> a -> Int
hashWithSalt Int
salt peeraddr
a) (Int -> peeraddr -> Int
forall a. Hashable a => Int -> a -> Int
hashWithSalt Int
salt peeraddr
b))
      [peeraddr]
addrs

    handler :: [peeraddr] -> SomeException -> m (Completion m peeraddr peerconn)
    handler :: [peeraddr] -> SomeException -> m (Completion m peeraddr peerconn)
handler [peeraddr]
peers SomeException
e = Completion m peeraddr peerconn
-> m (Completion m peeraddr peerconn)
forall a. a -> m a
forall (m :: * -> *) a. Monad m => a -> m a
return (Completion m peeraddr peerconn
 -> m (Completion m peeraddr peerconn))
-> Completion m peeraddr peerconn
-> m (Completion m peeraddr peerconn)
forall a b. (a -> b) -> a -> b
$
      (PeerSelectionState peeraddr peerconn
 -> Time -> Decision m peeraddr peerconn)
-> Completion m peeraddr peerconn
forall (m :: * -> *) peeraddr peerconn.
(PeerSelectionState peeraddr peerconn
 -> Time -> Decision m peeraddr peerconn)
-> Completion m peeraddr peerconn
Completion ((PeerSelectionState peeraddr peerconn
  -> Time -> Decision m peeraddr peerconn)
 -> Completion m peeraddr peerconn)
-> (PeerSelectionState peeraddr peerconn
    -> Time -> Decision m peeraddr peerconn)
-> Completion m peeraddr peerconn
forall a b. (a -> b) -> a -> b
$ \PeerSelectionState peeraddr peerconn
st Time
_ ->
      Decision { decisionTrace :: [TracePeerSelection peeraddr]
decisionTrace = [[(peeraddr, Either SomeException (PeerSharingResult peeraddr))]
-> TracePeerSelection peeraddr
forall peeraddr.
[(peeraddr, Either SomeException (PeerSharingResult peeraddr))]
-> TracePeerSelection peeraddr
TracePeerShareResults [ (peeraddr
p, SomeException -> Either SomeException (PeerSharingResult peeraddr)
forall a b. a -> Either a b
Left SomeException
e) | peeraddr
p <- [peeraddr]
peers ]],
                 decisionState :: PeerSelectionState peeraddr peerconn
decisionState =
                  PeerSelectionState peeraddr peerconn
st { inProgressPeerShareReqs = inProgressPeerShareReqs st
                                               - length peers
                     },
                 decisionJobs :: [Job () m (Completion m peeraddr peerconn)]
decisionJobs = []
               }

    jobPhase1 :: [peeraddr] -> m (Completion m peeraddr peerconn)
    jobPhase1 :: [peeraddr] -> m (Completion m peeraddr peerconn)
jobPhase1 [peeraddr]
peers = do
      -- In the typical case, where most requests return within a short
      -- timeout we want to collect all the responses into a batch and
      -- add them to the known peers set in one go.
      --
      -- So fire them all off in one go:
      peerShares <- [m (Async m (PeerSharingResult peeraddr))]
-> m [Async m (PeerSharingResult peeraddr)]
forall (t :: * -> *) (m :: * -> *) a.
(Traversable t, Monad m) =>
t (m a) -> m (t a)
forall (m :: * -> *) a. Monad m => [m a] -> m [a]
sequence [ m (PeerSharingResult peeraddr)
-> m (Async m (PeerSharingResult peeraddr))
forall a. m a -> m (Async m a)
forall (m :: * -> *) a. MonadAsync m => m a -> m (Async m a)
async (PeerSharingAmount -> peeraddr -> m (PeerSharingResult peeraddr)
requestPeerShare PeerSharingAmount
requestAmount peeraddr
peer)
                             | peeraddr
peer <- [peeraddr]
peers ]

      -- First to finish synchronisation between /all/ the peer share requests
      -- completing or the timeout (with whatever partial results we have at
      -- the time)
      results <- waitAllCatchOrTimeout peerShares policyPeerShareBatchWaitTime
      case results of
        Right [Either SomeException (PeerSharingResult peeraddr)]
totalResults ->
          Completion m peeraddr peerconn
-> m (Completion m peeraddr peerconn)
forall a. a -> m a
forall (m :: * -> *) a. Monad m => a -> m a
return (Completion m peeraddr peerconn
 -> m (Completion m peeraddr peerconn))
-> Completion m peeraddr peerconn
-> m (Completion m peeraddr peerconn)
forall a b. (a -> b) -> a -> b
$ (PeerSelectionState peeraddr peerconn
 -> Time -> Decision m peeraddr peerconn)
-> Completion m peeraddr peerconn
forall (m :: * -> *) peeraddr peerconn.
(PeerSelectionState peeraddr peerconn
 -> Time -> Decision m peeraddr peerconn)
-> Completion m peeraddr peerconn
Completion ((PeerSelectionState peeraddr peerconn
  -> Time -> Decision m peeraddr peerconn)
 -> Completion m peeraddr peerconn)
-> (PeerSelectionState peeraddr peerconn
    -> Time -> Decision m peeraddr peerconn)
-> Completion m peeraddr peerconn
forall a b. (a -> b) -> a -> b
$ \PeerSelectionState peeraddr peerconn
st Time
_ ->
           let peerResults :: [(peeraddr, Either SomeException (PeerSharingResult peeraddr))]
peerResults = [peeraddr]
-> [Either SomeException (PeerSharingResult peeraddr)]
-> [(peeraddr, Either SomeException (PeerSharingResult peeraddr))]
forall a b. [a] -> [b] -> [(a, b)]
zip [peeraddr]
peers [Either SomeException (PeerSharingResult peeraddr)]
totalResults
               newPeers :: [peeraddr]
newPeers    = Int -> [peeraddr] -> [peeraddr]
takeNPeers Int
maxAmount ([peeraddr] -> [peeraddr]) -> [peeraddr] -> [peeraddr]
forall a b. (a -> b) -> a -> b
$
                                 [ peeraddr
p | Right (PeerSharingResult [peeraddr]
ps) <- [Either SomeException (PeerSharingResult peeraddr)]
totalResults
                                 , peeraddr
p <- [peeraddr]
ps
                                 , Bool -> Bool
not (peeraddr -> KnownPeers peeraddr -> Bool
forall peeraddr.
Ord peeraddr =>
peeraddr -> KnownPeers peeraddr -> Bool
KnownPeers.member peeraddr
p (PeerSelectionState peeraddr peerconn -> KnownPeers peeraddr
forall peeraddr peerconn.
PeerSelectionState peeraddr peerconn -> KnownPeers peeraddr
knownPeers PeerSelectionState peeraddr peerconn
st))
                                 , peeraddr -> Set peeraddr -> Bool
forall a. Ord a => a -> Set a -> Bool
Set.notMember peeraddr
p (PublicRootPeers peeraddr -> Set peeraddr
forall peeraddr. PublicRootPeers peeraddr -> Set peeraddr
PublicRootPeers.getBigLedgerPeers (PeerSelectionState peeraddr peerconn -> PublicRootPeers peeraddr
forall peeraddr peerconn.
PeerSelectionState peeraddr peerconn -> PublicRootPeers peeraddr
publicRootPeers PeerSelectionState peeraddr peerconn
st))]
            in Decision { decisionTrace :: [TracePeerSelection peeraddr]
decisionTrace = [ [(peeraddr, Either SomeException (PeerSharingResult peeraddr))]
-> TracePeerSelection peeraddr
forall peeraddr.
[(peeraddr, Either SomeException (PeerSharingResult peeraddr))]
-> TracePeerSelection peeraddr
TracePeerShareResults [(peeraddr, Either SomeException (PeerSharingResult peeraddr))]
peerResults
                                          , [peeraddr] -> TracePeerSelection peeraddr
forall peeraddr. [peeraddr] -> TracePeerSelection peeraddr
TracePeerShareResultsFiltered [peeraddr]
newPeers
                                          ]
                        , decisionState :: PeerSelectionState peeraddr peerconn
decisionState =
                           PeerSelectionState peeraddr peerconn
st { -- TODO: also update with the failures
                                knownPeers = KnownPeers.alter
                                              (\Maybe KnownPeerInfo
x -> case Maybe KnownPeerInfo
x of
                                                Maybe KnownPeerInfo
Nothing ->
                                                  (Maybe PeerSharing, Maybe PeerAdvertise)
-> Maybe KnownPeerInfo -> Maybe KnownPeerInfo
KnownPeers.alterKnownPeerInfo
                                                    (Maybe PeerSharing
forall a. Maybe a
Nothing, PeerAdvertise -> Maybe PeerAdvertise
forall a. a -> Maybe a
Just PeerAdvertise
DoAdvertisePeer)
                                                    Maybe KnownPeerInfo
x
                                                Just KnownPeerInfo
_ ->
                                                  (Maybe PeerSharing, Maybe PeerAdvertise)
-> Maybe KnownPeerInfo -> Maybe KnownPeerInfo
KnownPeers.alterKnownPeerInfo
                                                    (Maybe PeerSharing
forall a. Maybe a
Nothing, Maybe PeerAdvertise
forall a. Maybe a
Nothing)
                                                    Maybe KnownPeerInfo
x
                                              )
                                              (Set.fromList newPeers)
                                              (knownPeers st),
                                inProgressPeerShareReqs = inProgressPeerShareReqs st
                                                        - length peers
                           }
                        , decisionJobs :: [Job () m (Completion m peeraddr peerconn)]
decisionJobs  = []
                        }

        -- But if any don't make the first timeout then they'll be added later
        -- when they do reply or never if we hit the hard timeout.
        Left [Maybe (Either SomeException (PeerSharingResult peeraddr))]
partialResults -> do

          -- We have to keep track of the relationship between the peer
          -- addresses and the peer share requests, completed and still in progress:
          let peerResults :: [(peeraddr, Either SomeException (PeerSharingResult peeraddr))]
peerResults      = [ (peeraddr
p, Either SomeException (PeerSharingResult peeraddr)
r)
                                 | (peeraddr
p, Just Either SomeException (PeerSharingResult peeraddr)
r)  <- [peeraddr]
-> [Maybe (Either SomeException (PeerSharingResult peeraddr))]
-> [(peeraddr,
     Maybe (Either SomeException (PeerSharingResult peeraddr)))]
forall a b. [a] -> [b] -> [(a, b)]
zip [peeraddr]
peers [Maybe (Either SomeException (PeerSharingResult peeraddr))]
partialResults ]
              peersRemaining :: [peeraddr]
peersRemaining   = [  peeraddr
p
                                 | (peeraddr
p, Maybe (Either SomeException (PeerSharingResult peeraddr))
Nothing) <- [peeraddr]
-> [Maybe (Either SomeException (PeerSharingResult peeraddr))]
-> [(peeraddr,
     Maybe (Either SomeException (PeerSharingResult peeraddr)))]
forall a b. [a] -> [b] -> [(a, b)]
zip [peeraddr]
peers [Maybe (Either SomeException (PeerSharingResult peeraddr))]
partialResults ]
              peerSharesRemaining :: [Async m (PeerSharingResult peeraddr)]
peerSharesRemaining = [  Async m (PeerSharingResult peeraddr)
a
                                    | (Async m (PeerSharingResult peeraddr)
a, Maybe (Either SomeException (PeerSharingResult peeraddr))
Nothing) <- [Async m (PeerSharingResult peeraddr)]
-> [Maybe (Either SomeException (PeerSharingResult peeraddr))]
-> [(Async m (PeerSharingResult peeraddr),
     Maybe (Either SomeException (PeerSharingResult peeraddr)))]
forall a b. [a] -> [b] -> [(a, b)]
zip [Async m (PeerSharingResult peeraddr)]
peerShares [Maybe (Either SomeException (PeerSharingResult peeraddr))]
partialResults ]

          Completion m peeraddr peerconn
-> m (Completion m peeraddr peerconn)
forall a. a -> m a
forall (m :: * -> *) a. Monad m => a -> m a
return (Completion m peeraddr peerconn
 -> m (Completion m peeraddr peerconn))
-> Completion m peeraddr peerconn
-> m (Completion m peeraddr peerconn)
forall a b. (a -> b) -> a -> b
$ (PeerSelectionState peeraddr peerconn
 -> Time -> Decision m peeraddr peerconn)
-> Completion m peeraddr peerconn
forall (m :: * -> *) peeraddr peerconn.
(PeerSelectionState peeraddr peerconn
 -> Time -> Decision m peeraddr peerconn)
-> Completion m peeraddr peerconn
Completion ((PeerSelectionState peeraddr peerconn
  -> Time -> Decision m peeraddr peerconn)
 -> Completion m peeraddr peerconn)
-> (PeerSelectionState peeraddr peerconn
    -> Time -> Decision m peeraddr peerconn)
-> Completion m peeraddr peerconn
forall a b. (a -> b) -> a -> b
$ \PeerSelectionState peeraddr peerconn
st Time
_ ->
            let newPeers :: [peeraddr]
newPeers = Int -> [peeraddr] -> [peeraddr]
takeNPeers Int
maxAmount ([peeraddr] -> [peeraddr]) -> [peeraddr] -> [peeraddr]
forall a b. (a -> b) -> a -> b
$
                               [ peeraddr
p | Just (Right (PeerSharingResult [peeraddr]
ps)) <- [Maybe (Either SomeException (PeerSharingResult peeraddr))]
partialResults
                               , peeraddr
p <- [peeraddr]
ps
                               , Bool -> Bool
not (peeraddr -> KnownPeers peeraddr -> Bool
forall peeraddr.
Ord peeraddr =>
peeraddr -> KnownPeers peeraddr -> Bool
KnownPeers.member peeraddr
p (PeerSelectionState peeraddr peerconn -> KnownPeers peeraddr
forall peeraddr peerconn.
PeerSelectionState peeraddr peerconn -> KnownPeers peeraddr
knownPeers PeerSelectionState peeraddr peerconn
st))
                               , peeraddr -> Set peeraddr -> Bool
forall a. Ord a => a -> Set a -> Bool
Set.notMember peeraddr
p (PublicRootPeers peeraddr -> Set peeraddr
forall peeraddr. PublicRootPeers peeraddr -> Set peeraddr
PublicRootPeers.getBigLedgerPeers (PeerSelectionState peeraddr peerconn -> PublicRootPeers peeraddr
forall peeraddr peerconn.
PeerSelectionState peeraddr peerconn -> PublicRootPeers peeraddr
publicRootPeers PeerSelectionState peeraddr peerconn
st))]
             in Decision { decisionTrace :: [TracePeerSelection peeraddr]
decisionTrace = [ [(peeraddr, Either SomeException (PeerSharingResult peeraddr))]
-> TracePeerSelection peeraddr
forall peeraddr.
[(peeraddr, Either SomeException (PeerSharingResult peeraddr))]
-> TracePeerSelection peeraddr
TracePeerShareResults [(peeraddr, Either SomeException (PeerSharingResult peeraddr))]
peerResults
                                           , [peeraddr] -> TracePeerSelection peeraddr
forall peeraddr. [peeraddr] -> TracePeerSelection peeraddr
TracePeerShareResultsFiltered [peeraddr]
newPeers
                                           ]
                         , decisionState :: PeerSelectionState peeraddr peerconn
decisionState =
                            PeerSelectionState peeraddr peerconn
st { -- TODO: also update with the failures
                                 knownPeers = KnownPeers.alter
                                               (\Maybe KnownPeerInfo
x -> case Maybe KnownPeerInfo
x of
                                                 Maybe KnownPeerInfo
Nothing ->
                                                   (Maybe PeerSharing, Maybe PeerAdvertise)
-> Maybe KnownPeerInfo -> Maybe KnownPeerInfo
KnownPeers.alterKnownPeerInfo
                                                     (Maybe PeerSharing
forall a. Maybe a
Nothing, PeerAdvertise -> Maybe PeerAdvertise
forall a. a -> Maybe a
Just PeerAdvertise
DoAdvertisePeer)
                                                     Maybe KnownPeerInfo
x
                                                 Just KnownPeerInfo
_ ->
                                                   (Maybe PeerSharing, Maybe PeerAdvertise)
-> Maybe KnownPeerInfo -> Maybe KnownPeerInfo
KnownPeers.alterKnownPeerInfo
                                                     (Maybe PeerSharing
forall a. Maybe a
Nothing, Maybe PeerAdvertise
forall a. Maybe a
Nothing)
                                                     Maybe KnownPeerInfo
x
                                               )
                                               (Set.fromList newPeers)
                                               (knownPeers st),
                                 inProgressPeerShareReqs = inProgressPeerShareReqs st
                                                         - length peerResults
                               }
                         , decisionJobs :: [Job () m (Completion m peeraddr peerconn)]
decisionJobs  = [m (Completion m peeraddr peerconn)
-> (SomeException -> m (Completion m peeraddr peerconn))
-> ()
-> String
-> Job () m (Completion m peeraddr peerconn)
forall group (m :: * -> *) a.
m a -> (SomeException -> m a) -> group -> String -> Job group m a
Job (Int
-> [peeraddr]
-> [Async m (PeerSharingResult peeraddr)]
-> m (Completion m peeraddr peerconn)
jobPhase2 (Int
maxAmount Int -> Int -> Int
forall a. Num a => a -> a -> a
- [peeraddr] -> Int
forall a. [a] -> Int
forall (t :: * -> *) a. Foldable t => t a -> Int
length [peeraddr]
newPeers) [peeraddr]
peersRemaining
                                                 [Async m (PeerSharingResult peeraddr)]
peerSharesRemaining)
                                                ([peeraddr] -> SomeException -> m (Completion m peeraddr peerconn)
handler [peeraddr]
peersRemaining)
                                                ()
                                                String
"peerSharePhase2"]
                         }

    jobPhase2 :: Int -> [peeraddr] -> [Async m (PeerSharingResult peeraddr)]
              -> m (Completion m peeraddr peerconn)
    jobPhase2 :: Int
-> [peeraddr]
-> [Async m (PeerSharingResult peeraddr)]
-> m (Completion m peeraddr peerconn)
jobPhase2 Int
maxRemaining [peeraddr]
peers [Async m (PeerSharingResult peeraddr)]
peerShares = do

      -- Wait again, for all remaining to finish or a timeout.
      results <- [Async m (PeerSharingResult peeraddr)]
-> DiffTime
-> m (Either
        [Maybe (Either SomeException (PeerSharingResult peeraddr))]
        [Either SomeException (PeerSharingResult peeraddr)])
forall (m :: * -> *) a.
(MonadAsync m, MonadTimer m) =>
[Async m a]
-> DiffTime
-> m (Either
        [Maybe (Either SomeException a)] [Either SomeException a])
waitAllCatchOrTimeout
                      [Async m (PeerSharingResult peeraddr)]
peerShares
                      (DiffTime
policyPeerShareOverallTimeout
                       DiffTime -> DiffTime -> DiffTime
forall a. Num a => a -> a -> a
- DiffTime
policyPeerShareBatchWaitTime)
      let peerResults =
            case Either
  [Maybe (Either SomeException (PeerSharingResult peeraddr))]
  [Either SomeException (PeerSharingResult peeraddr)]
results of
              Right [Either SomeException (PeerSharingResult peeraddr)]
totalResults  -> [peeraddr]
-> [Either SomeException (PeerSharingResult peeraddr)]
-> [(peeraddr, Either SomeException (PeerSharingResult peeraddr))]
forall a b. [a] -> [b] -> [(a, b)]
zip [peeraddr]
peers [Either SomeException (PeerSharingResult peeraddr)]
totalResults
              Left [Maybe (Either SomeException (PeerSharingResult peeraddr))]
partialResults -> [ (peeraddr
p, Either SomeException (PeerSharingResult peeraddr)
-> Maybe (Either SomeException (PeerSharingResult peeraddr))
-> Either SomeException (PeerSharingResult peeraddr)
forall a. a -> Maybe a -> a
fromMaybe Either SomeException (PeerSharingResult peeraddr)
forall {b}. Either SomeException b
err Maybe (Either SomeException (PeerSharingResult peeraddr))
r)
                                     | (peeraddr
p, Maybe (Either SomeException (PeerSharingResult peeraddr))
r) <- [peeraddr]
-> [Maybe (Either SomeException (PeerSharingResult peeraddr))]
-> [(peeraddr,
     Maybe (Either SomeException (PeerSharingResult peeraddr)))]
forall a b. [a] -> [b] -> [(a, b)]
zip [peeraddr]
peers [Maybe (Either SomeException (PeerSharingResult peeraddr))]
partialResults ]
                where err :: Either SomeException b
err = SomeException -> Either SomeException b
forall a b. a -> Either a b
Left (AsyncCancelled -> SomeException
forall e. Exception e => e -> SomeException
toException AsyncCancelled
AsyncCancelled)

          peerSharesIncomplete =
            case Either
  [Maybe (Either SomeException (PeerSharingResult peeraddr))]
  [Either SomeException (PeerSharingResult peeraddr)]
results of
              Right [Either SomeException (PeerSharingResult peeraddr)]
_totalResults -> []
              Left [Maybe (Either SomeException (PeerSharingResult peeraddr))]
partialResults ->
                [ Async m (PeerSharingResult peeraddr)
a | (Async m (PeerSharingResult peeraddr)
a, Maybe (Either SomeException (PeerSharingResult peeraddr))
Nothing) <- [Async m (PeerSharingResult peeraddr)]
-> [Maybe (Either SomeException (PeerSharingResult peeraddr))]
-> [(Async m (PeerSharingResult peeraddr),
     Maybe (Either SomeException (PeerSharingResult peeraddr)))]
forall a b. [a] -> [b] -> [(a, b)]
zip [Async m (PeerSharingResult peeraddr)]
peerShares [Maybe (Either SomeException (PeerSharingResult peeraddr))]
partialResults ]

      mapM_ cancel peerSharesIncomplete

      return $ Completion $ \PeerSelectionState peeraddr peerconn
st Time
_ ->
        let newPeers :: [peeraddr]
newPeers = Int -> [peeraddr] -> [peeraddr]
takeNPeers Int
maxRemaining ([peeraddr] -> [peeraddr]) -> [peeraddr] -> [peeraddr]
forall a b. (a -> b) -> a -> b
$
              case Either
  [Maybe (Either SomeException (PeerSharingResult peeraddr))]
  [Either SomeException (PeerSharingResult peeraddr)]
results of
                Right [Either SomeException (PeerSharingResult peeraddr)]
totalResults  -> [ peeraddr
p | Right (PeerSharingResult [peeraddr]
ps) <- [Either SomeException (PeerSharingResult peeraddr)]
totalResults
                                           , peeraddr
p <- [peeraddr]
ps
                                           , Bool -> Bool
not (peeraddr -> KnownPeers peeraddr -> Bool
forall peeraddr.
Ord peeraddr =>
peeraddr -> KnownPeers peeraddr -> Bool
KnownPeers.member peeraddr
p (PeerSelectionState peeraddr peerconn -> KnownPeers peeraddr
forall peeraddr peerconn.
PeerSelectionState peeraddr peerconn -> KnownPeers peeraddr
knownPeers PeerSelectionState peeraddr peerconn
st))
                                           , peeraddr -> Set peeraddr -> Bool
forall a. Ord a => a -> Set a -> Bool
Set.notMember peeraddr
p (PublicRootPeers peeraddr -> Set peeraddr
forall peeraddr. PublicRootPeers peeraddr -> Set peeraddr
PublicRootPeers.getBigLedgerPeers (PeerSelectionState peeraddr peerconn -> PublicRootPeers peeraddr
forall peeraddr peerconn.
PeerSelectionState peeraddr peerconn -> PublicRootPeers peeraddr
publicRootPeers PeerSelectionState peeraddr peerconn
st))]
                Left [Maybe (Either SomeException (PeerSharingResult peeraddr))]
partialResults -> [ peeraddr
p | Just (Right (PeerSharingResult [peeraddr]
ps)) <- [Maybe (Either SomeException (PeerSharingResult peeraddr))]
partialResults
                                           , peeraddr
p <- [peeraddr]
ps
                                           , Bool -> Bool
not (peeraddr -> KnownPeers peeraddr -> Bool
forall peeraddr.
Ord peeraddr =>
peeraddr -> KnownPeers peeraddr -> Bool
KnownPeers.member peeraddr
p (PeerSelectionState peeraddr peerconn -> KnownPeers peeraddr
forall peeraddr peerconn.
PeerSelectionState peeraddr peerconn -> KnownPeers peeraddr
knownPeers PeerSelectionState peeraddr peerconn
st))
                                           , peeraddr -> Set peeraddr -> Bool
forall a. Ord a => a -> Set a -> Bool
Set.notMember peeraddr
p (PublicRootPeers peeraddr -> Set peeraddr
forall peeraddr. PublicRootPeers peeraddr -> Set peeraddr
PublicRootPeers.getBigLedgerPeers (PeerSelectionState peeraddr peerconn -> PublicRootPeers peeraddr
forall peeraddr peerconn.
PeerSelectionState peeraddr peerconn -> PublicRootPeers peeraddr
publicRootPeers PeerSelectionState peeraddr peerconn
st))]

         in Decision { decisionTrace :: [TracePeerSelection peeraddr]
decisionTrace = [ [(peeraddr, Either SomeException (PeerSharingResult peeraddr))]
-> TracePeerSelection peeraddr
forall peeraddr.
[(peeraddr, Either SomeException (PeerSharingResult peeraddr))]
-> TracePeerSelection peeraddr
TracePeerShareResults [(peeraddr, Either SomeException (PeerSharingResult peeraddr))]
peerResults
                                       , [peeraddr] -> TracePeerSelection peeraddr
forall peeraddr. [peeraddr] -> TracePeerSelection peeraddr
TracePeerShareResultsFiltered [peeraddr]
newPeers
                                       ]
                     , decisionState :: PeerSelectionState peeraddr peerconn
decisionState =
                        PeerSelectionState peeraddr peerconn
st { -- TODO: also update with the failures
                             knownPeers = KnownPeers.alter
                                           (\Maybe KnownPeerInfo
x -> case Maybe KnownPeerInfo
x of
                                             Maybe KnownPeerInfo
Nothing ->
                                               (Maybe PeerSharing, Maybe PeerAdvertise)
-> Maybe KnownPeerInfo -> Maybe KnownPeerInfo
KnownPeers.alterKnownPeerInfo
                                                 (Maybe PeerSharing
forall a. Maybe a
Nothing, PeerAdvertise -> Maybe PeerAdvertise
forall a. a -> Maybe a
Just PeerAdvertise
DoAdvertisePeer)
                                                 Maybe KnownPeerInfo
x
                                             Just KnownPeerInfo
_ ->
                                               (Maybe PeerSharing, Maybe PeerAdvertise)
-> Maybe KnownPeerInfo -> Maybe KnownPeerInfo
KnownPeers.alterKnownPeerInfo
                                                 (Maybe PeerSharing
forall a. Maybe a
Nothing, Maybe PeerAdvertise
forall a. Maybe a
Nothing)
                                                 Maybe KnownPeerInfo
x
                                           )
                                           (Set.fromList newPeers)
                                           (knownPeers st),
                             inProgressPeerShareReqs = inProgressPeerShareReqs st
                                                     - length peers
                           }
                     , decisionJobs :: [Job () m (Completion m peeraddr peerconn)]
decisionJobs  = []
                     }


---------------------------
-- Known peers above target
--


-- | If we are above the target of /known peers/ (i.e. /cold/, /warm/ and /hot/
-- combined), we drop some of the /cold peers/ but we protect the
-- 'targetNumberOfRootPeers' (from combined sets of /local/ and /public root/
-- peers). 'policyPickColdPeersToForget' policy is used to pick the peers.
--
aboveTarget :: (MonadSTM m, Ord peeraddr, HasCallStack)
            => MkGuardedDecision peeraddr peerconn m
aboveTarget :: forall (m :: * -> *) peeraddr peerconn.
(MonadSTM m, Ord peeraddr, HasCallStack) =>
MkGuardedDecision peeraddr peerconn m
aboveTarget PeerSelectionPolicy {
              PickPolicy peeraddr (STM m)
policyPickColdPeersToForget :: forall peeraddr (m :: * -> *).
PeerSelectionPolicy peeraddr m -> PickPolicy peeraddr (STM m)
policyPickColdPeersToForget :: PickPolicy peeraddr (STM m)
policyPickColdPeersToForget
            }
            st :: PeerSelectionState peeraddr peerconn
st@PeerSelectionState {
              LocalRootPeers peeraddr
localRootPeers :: LocalRootPeers peeraddr
localRootPeers :: forall peeraddr peerconn.
PeerSelectionState peeraddr peerconn -> LocalRootPeers peeraddr
localRootPeers,
              PublicRootPeers peeraddr
publicRootPeers :: forall peeraddr peerconn.
PeerSelectionState peeraddr peerconn -> PublicRootPeers peeraddr
publicRootPeers :: PublicRootPeers peeraddr
publicRootPeers,
              KnownPeers peeraddr
knownPeers :: forall peeraddr peerconn.
PeerSelectionState peeraddr peerconn -> KnownPeers peeraddr
knownPeers :: KnownPeers peeraddr
knownPeers,
              EstablishedPeers peeraddr peerconn
establishedPeers :: forall peeraddr peerconn.
PeerSelectionState peeraddr peerconn
-> EstablishedPeers peeraddr peerconn
establishedPeers :: EstablishedPeers peeraddr peerconn
establishedPeers,
              Set peeraddr
inProgressPromoteCold :: Set peeraddr
inProgressPromoteCold :: forall peeraddr peerconn.
PeerSelectionState peeraddr peerconn -> Set peeraddr
inProgressPromoteCold,
              targets :: forall peeraddr peerconn.
PeerSelectionState peeraddr peerconn -> PeerSelectionTargets
targets = PeerSelectionTargets {
                          Int
targetNumberOfKnownPeers :: PeerSelectionTargets -> Int
targetNumberOfKnownPeers :: Int
targetNumberOfKnownPeers,
                          Int
targetNumberOfRootPeers :: PeerSelectionTargets -> Int
targetNumberOfRootPeers :: Int
targetNumberOfRootPeers
                        }
            }
    -- Are we above the target for number of known peers?
  | Int
numKnownPeers Int -> Int -> Bool
forall a. Ord a => a -> a -> Bool
> Int
targetNumberOfKnownPeers

    -- Are there any cold peers we could pick to forget?
    -- As a first cheap approximation, check if there are any cold peers.
  , Int
numKnownPeers Int -> Int -> Bool
forall a. Ord a => a -> a -> Bool
> Int
numEstablishedPeers

    -- Beyond this it gets more complicated, and it is not clear that there
    -- are any precise cheap checks. So we just do the full calculation.
    -- In particular there can be overlap between cold peers and root peers
    -- and we have constraints on forgetting root peers.
    --
    -- We must never pick local root peers to forget as this would violate
    -- our invariant that the localRootPeers is a subset of the knownPeers.
    --
    -- We also need to avoid picking public root peers if that would put us
    -- below the target for root peers.
    --
  , let numRootPeersCanForget :: Int
numRootPeersCanForget = LocalRootPeers peeraddr -> Int
forall peeraddr. LocalRootPeers peeraddr -> Int
LocalRootPeers.size LocalRootPeers peeraddr
localRootPeers
                              Int -> Int -> Int
forall a. Num a => a -> a -> a
+ PublicRootPeers peeraddr -> Int
forall peeraddr. PublicRootPeers peeraddr -> Int
PublicRootPeers.size PublicRootPeers peeraddr
publicRootPeers
                              Int -> Int -> Int
forall a. Num a => a -> a -> a
- Int
targetNumberOfRootPeers
        availableToForget :: Set peeraddr
availableToForget     = KnownPeers peeraddr -> Set peeraddr
forall peeraddr. KnownPeers peeraddr -> Set peeraddr
KnownPeers.toSet KnownPeers peeraddr
knownPeers
                                  Set peeraddr -> Set peeraddr -> Set peeraddr
forall a. Ord a => Set a -> Set a -> Set a
Set.\\ EstablishedPeers peeraddr peerconn -> Set peeraddr
forall peeraddr peerconn.
EstablishedPeers peeraddr peerconn -> Set peeraddr
EstablishedPeers.toSet EstablishedPeers peeraddr peerconn
establishedPeers
                                  Set peeraddr -> Set peeraddr -> Set peeraddr
forall a. Ord a => Set a -> Set a -> Set a
Set.\\ LocalRootPeers peeraddr -> Set peeraddr
forall peeraddr. LocalRootPeers peeraddr -> Set peeraddr
LocalRootPeers.keysSet LocalRootPeers peeraddr
localRootPeers
                                  Set peeraddr -> Set peeraddr -> Set peeraddr
forall a. Ord a => Set a -> Set a -> Set a
Set.\\ (if Int
numRootPeersCanForget Int -> Int -> Bool
forall a. Ord a => a -> a -> Bool
<= Int
0
                                            then PublicRootPeers peeraddr -> Set peeraddr
forall peeraddr.
Ord peeraddr =>
PublicRootPeers peeraddr -> Set peeraddr
PublicRootPeers.toSet PublicRootPeers peeraddr
publicRootPeers
                                            else Set peeraddr
forall a. Set a
Set.empty)
                                  Set peeraddr -> Set peeraddr -> Set peeraddr
forall a. Ord a => Set a -> Set a -> Set a
Set.\\ Set peeraddr
inProgressPromoteCold
                                  Set peeraddr -> Set peeraddr -> Set peeraddr
forall a. Ord a => Set a -> Set a -> Set a
Set.\\ Set peeraddr
bigLedgerPeersSet

  , Bool -> Bool
not (Set peeraddr -> Bool
forall a. Set a -> Bool
Set.null Set peeraddr
availableToForget)
  = Maybe Time
-> STM m (TimedDecision m peeraddr peerconn)
-> Guarded (STM m) (TimedDecision m peeraddr peerconn)
forall (m :: * -> *) a. Maybe Time -> m a -> Guarded m a
Guarded Maybe Time
forall a. Maybe a
Nothing (STM m (TimedDecision m peeraddr peerconn)
 -> Guarded (STM m) (TimedDecision m peeraddr peerconn))
-> STM m (TimedDecision m peeraddr peerconn)
-> Guarded (STM m) (TimedDecision m peeraddr peerconn)
forall a b. (a -> b) -> a -> b
$ do
      let numOtherPeersToForget :: Int
numOtherPeersToForget         = Int
numKnownPeers
                                        Int -> Int -> Int
forall a. Num a => a -> a -> a
- Int
targetNumberOfKnownPeers
          numPeersToForget :: Int
numPeersToForget
            | Int
numRootPeersCanForget Int -> Int -> Bool
forall a. Ord a => a -> a -> Bool
> Int
0 = Int -> Int -> Int
forall a. Ord a => a -> a -> a
min Int
numRootPeersCanForget
                                              Int
numOtherPeersToForget
            | Bool
otherwise                 = Int
numOtherPeersToForget
      -- If we /might/ pick a root peer, limit the number to forget so we do
      -- not pick too many root peers. This may cause us to go round several
      -- times but that is ok.
      selectedToForget <- PeerSelectionState peeraddr peerconn
-> PickPolicy peeraddr (STM m)
-> Set peeraddr
-> Int
-> STM m (Set peeraddr)
forall peeraddr (m :: * -> *) peerconn.
(Ord peeraddr, Functor m, HasCallStack) =>
PeerSelectionState peeraddr peerconn
-> PickPolicy peeraddr m -> Set peeraddr -> Int -> m (Set peeraddr)
pickPeers PeerSelectionState peeraddr peerconn
st
                            PickPolicy peeraddr (STM m)
policyPickColdPeersToForget
                            Set peeraddr
availableToForget
                            Int
numPeersToForget
      return $ \Time
_now ->
        let knownPeers' :: KnownPeers peeraddr
knownPeers'      = Set peeraddr -> KnownPeers peeraddr -> KnownPeers peeraddr
forall peeraddr.
Ord peeraddr =>
Set peeraddr -> KnownPeers peeraddr -> KnownPeers peeraddr
KnownPeers.delete
                                 Set peeraddr
selectedToForget
                                 KnownPeers peeraddr
knownPeers
            publicRootPeers' :: PublicRootPeers peeraddr
publicRootPeers' = PublicRootPeers peeraddr
publicRootPeers
                                PublicRootPeers peeraddr
-> Set peeraddr -> PublicRootPeers peeraddr
forall peeraddr.
Ord peeraddr =>
PublicRootPeers peeraddr
-> Set peeraddr -> PublicRootPeers peeraddr
`PublicRootPeers.difference` Set peeraddr
selectedToForget
        in Bool
-> Decision m peeraddr peerconn -> Decision m peeraddr peerconn
forall a. HasCallStack => Bool -> a -> a
assert (Set peeraddr -> Set peeraddr -> Bool
forall a. Ord a => Set a -> Set a -> Bool
Set.isSubsetOf
                     (PublicRootPeers peeraddr -> Set peeraddr
forall peeraddr.
Ord peeraddr =>
PublicRootPeers peeraddr -> Set peeraddr
PublicRootPeers.toSet PublicRootPeers peeraddr
publicRootPeers')
                    (KnownPeers peeraddr -> Set peeraddr
forall peeraddr. KnownPeers peeraddr -> Set peeraddr
KnownPeers.toSet KnownPeers peeraddr
knownPeers'))

              Decision {
                decisionTrace :: [TracePeerSelection peeraddr]
decisionTrace = [Int -> Int -> Set peeraddr -> TracePeerSelection peeraddr
forall peeraddr.
Int -> Int -> Set peeraddr -> TracePeerSelection peeraddr
TraceForgetColdPeers
                                   Int
targetNumberOfKnownPeers
                                   Int
numKnownPeers
                                   Set peeraddr
selectedToForget],
                decisionState :: PeerSelectionState peeraddr peerconn
decisionState = PeerSelectionState peeraddr peerconn
st { knownPeers      = knownPeers',
                                     publicRootPeers = publicRootPeers' },
                decisionJobs :: [Job () m (Completion m peeraddr peerconn)]
decisionJobs  = []
              }

  | Bool
otherwise
  = Maybe Time -> Guarded (STM m) (TimedDecision m peeraddr peerconn)
forall (m :: * -> *) a. Maybe Time -> Guarded m a
GuardedSkip Maybe Time
forall a. Maybe a
Nothing
  where
    bigLedgerPeersSet :: Set peeraddr
bigLedgerPeersSet = PublicRootPeers peeraddr -> Set peeraddr
forall peeraddr. PublicRootPeers peeraddr -> Set peeraddr
PublicRootPeers.getBigLedgerPeers PublicRootPeers peeraddr
publicRootPeers

    PeerSelectionCounters {
        numberOfKnownPeers :: PeerSelectionCounters -> Int
numberOfKnownPeers       = Int
numKnownPeers,
        numberOfEstablishedPeers :: PeerSelectionCounters -> Int
numberOfEstablishedPeers = Int
numEstablishedPeers
      }
      =
      PeerSelectionState peeraddr peerconn -> PeerSelectionCounters
forall peeraddr peerconn.
Ord peeraddr =>
PeerSelectionState peeraddr peerconn -> PeerSelectionCounters
peerSelectionStateToCounters PeerSelectionState peeraddr peerconn
st


-------------------------------
-- Utils
--

-- | Perform a first-to-finish synchronisation between:
--
-- * /all/ the async actions completing; or
-- * the timeout with whatever partial results we have at the time
--
-- The result list is the same length and order as the asyncs, so the results
-- can be paired up.
--
waitAllCatchOrTimeout :: (MonadAsync m, MonadTimer m)
                      => [Async m a]
                      -> DiffTime
                      -> m (Either [Maybe (Either SomeException a)]
                                   [Either SomeException a])
waitAllCatchOrTimeout :: forall (m :: * -> *) a.
(MonadAsync m, MonadTimer m) =>
[Async m a]
-> DiffTime
-> m (Either
        [Maybe (Either SomeException a)] [Either SomeException a])
waitAllCatchOrTimeout [Async m a]
as DiffTime
time = do
    (readTimeout, cancelTimeout) <- DiffTime -> m (STM m TimeoutState, m ())
forall (m :: * -> *).
MonadTimer m =>
DiffTime -> m (STM m TimeoutState, m ())
registerDelayCancellable DiffTime
time
    results <- atomically $
                         (Right <$> mapM waitCatchSTM as)
                `orElse` (Left  <$> (readTimeout >>= \case TimeoutState
TimeoutPending -> STM m [Maybe (Either SomeException a)]
forall a. STM m a
forall (m :: * -> *) a. MonadSTM m => STM m a
retry
                                                           TimeoutState
_              -> (Async m a -> STM m (Maybe (Either SomeException a)))
-> [Async m a] -> STM m [Maybe (Either SomeException a)]
forall (t :: * -> *) (m :: * -> *) a b.
(Traversable t, Monad m) =>
(a -> m b) -> t a -> m (t b)
forall (m :: * -> *) a b. Monad m => (a -> m b) -> [a] -> m [b]
mapM Async m a -> STM m (Maybe (Either SomeException a))
forall a. Async m a -> STM m (Maybe (Either SomeException a))
forall (m :: * -> *) a.
MonadAsync m =>
Async m a -> STM m (Maybe (Either SomeException a))
pollSTM [Async m a]
as))
    case results of
      Right{} -> m ()
cancelTimeout
      Either [Maybe (Either SomeException a)] [Either SomeException a]
_       -> () -> m ()
forall a. a -> m a
forall (m :: * -> *) a. Monad m => a -> m a
return ()
    return results