{-# LANGUAGE NamedFieldPuns      #-}
{-# LANGUAGE ScopedTypeVariables #-}
{-# LANGUAGE TypeFamilies        #-}

module Ouroboros.Network.BlockFetch.DeltaQ
  ( GSV
  , Distribution
  , DeltaQ
  , PeerGSV (..)
  , SizeInBytes
  , PeerFetchInFlightLimits (..)
  , calculatePeerFetchInFlightLimits
  , estimateResponseDeadlineProbability
  , estimateExpectedResponseDuration
  , comparePeerGSV
  , comparePeerGSV'
  ) where

import Control.Monad.Class.MonadTime.SI
import Data.Fixed as Fixed (Pico)
import Data.Hashable
import Data.Set (Set)
import Data.Set qualified as Set

import Ouroboros.Network.DeltaQ


data PeerFetchInFlightLimits = PeerFetchInFlightLimits {
       PeerFetchInFlightLimits -> SizeInBytes
inFlightBytesHighWatermark :: SizeInBytes,
       PeerFetchInFlightLimits -> SizeInBytes
inFlightBytesLowWatermark  :: SizeInBytes
     }
  deriving Int -> PeerFetchInFlightLimits -> ShowS
[PeerFetchInFlightLimits] -> ShowS
PeerFetchInFlightLimits -> String
(Int -> PeerFetchInFlightLimits -> ShowS)
-> (PeerFetchInFlightLimits -> String)
-> ([PeerFetchInFlightLimits] -> ShowS)
-> Show PeerFetchInFlightLimits
forall a.
(Int -> a -> ShowS) -> (a -> String) -> ([a] -> ShowS) -> Show a
$cshowsPrec :: Int -> PeerFetchInFlightLimits -> ShowS
showsPrec :: Int -> PeerFetchInFlightLimits -> ShowS
$cshow :: PeerFetchInFlightLimits -> String
show :: PeerFetchInFlightLimits -> String
$cshowList :: [PeerFetchInFlightLimits] -> ShowS
showList :: [PeerFetchInFlightLimits] -> ShowS
Show

-- | Order two PeerGSVs based on `g`.
-- Incase the g values are within +/- 5% of each other `peer` is used as a tie breaker.
-- The salt is unique per running node, which avoids all nodes prefering the same peer in case of
-- a tie.
comparePeerGSV :: forall peer.
      ( Hashable peer
      , Ord peer
      )
      => Set peer
      -> Int
      -> (PeerGSV, peer)
      -> (PeerGSV, peer)
      -> Ordering
comparePeerGSV :: forall peer.
(Hashable peer, Ord peer) =>
Set peer -> Int -> (PeerGSV, peer) -> (PeerGSV, peer) -> Ordering
comparePeerGSV Set peer
activePeers Int
salt (PeerGSV
a, peer
a_p) (PeerGSV
b, peer
b_p) =
    let gs_a :: DiffTime
gs_a = if peer -> Bool
isActive peer
a_p then DiffTime
activeAdvantage DiffTime -> DiffTime -> DiffTime
forall a. Num a => a -> a -> a
* PeerGSV -> DiffTime
gs PeerGSV
a
                               else PeerGSV -> DiffTime
gs PeerGSV
a
        gs_b :: DiffTime
gs_b = if peer -> Bool
isActive peer
b_p then DiffTime
activeAdvantage DiffTime -> DiffTime -> DiffTime
forall a. Num a => a -> a -> a
* PeerGSV -> DiffTime
gs PeerGSV
b
                               else PeerGSV -> DiffTime
gs PeerGSV
b in
    if DiffTime -> DiffTime
forall a. Num a => a -> a
abs (DiffTime
gs_a DiffTime -> DiffTime -> DiffTime
forall a. Num a => a -> a -> a
- DiffTime
gs_b) DiffTime -> DiffTime -> Bool
forall a. Ord a => a -> a -> Bool
< DiffTime
0.05 DiffTime -> DiffTime -> DiffTime
forall a. Num a => a -> a -> a
* DiffTime -> DiffTime -> DiffTime
forall a. Ord a => a -> a -> a
max DiffTime
gs_a DiffTime
gs_b
       then Int -> Int -> Ordering
forall a. Ord a => a -> a -> Ordering
compare (Int -> peer -> Int
forall a. Hashable a => Int -> a -> Int
hashWithSalt Int
salt peer
a_p) (Int -> peer -> Int
forall a. Hashable a => Int -> a -> Int
hashWithSalt Int
salt peer
b_p)
       else DiffTime -> DiffTime -> Ordering
forall a. Ord a => a -> a -> Ordering
compare DiffTime
gs_a DiffTime
gs_b
  where
    -- In order to avoid switching between peers with similar g we give
    -- active peers a slight advantage.
    activeAdvantage :: DiffTime
    activeAdvantage :: DiffTime
activeAdvantage = DiffTime
0.8

    isActive :: peer -> Bool
    isActive :: peer -> Bool
isActive peer
p = peer -> Set peer -> Bool
forall a. Ord a => a -> Set a -> Bool
Set.member peer
p Set peer
activePeers

    gs :: PeerGSV -> DiffTime
    gs :: PeerGSV -> DiffTime
gs PeerGSV { outboundGSV :: PeerGSV -> GSV
outboundGSV = GSV DiffTime
g_out SizeInBytes -> DiffTime
_s_out Distribution DiffTime
_v_out,
                 inboundGSV :: PeerGSV -> GSV
inboundGSV  = GSV DiffTime
g_in  SizeInBytes -> DiffTime
_s_in  Distribution DiffTime
_v_in
               } = DiffTime
g_out DiffTime -> DiffTime -> DiffTime
forall a. Num a => a -> a -> a
+ DiffTime
g_in

-- | Order two PeerGSVs based on `g`.
-- Like comparePeerGSV but doesn't take active status into account
comparePeerGSV' :: forall peer.
      ( Hashable peer
      , Ord peer
      )
      => Int
      -> (PeerGSV, peer)
      -> (PeerGSV, peer)
      -> Ordering
comparePeerGSV' :: forall peer.
(Hashable peer, Ord peer) =>
Int -> (PeerGSV, peer) -> (PeerGSV, peer) -> Ordering
comparePeerGSV' = Set peer -> Int -> (PeerGSV, peer) -> (PeerGSV, peer) -> Ordering
forall peer.
(Hashable peer, Ord peer) =>
Set peer -> Int -> (PeerGSV, peer) -> (PeerGSV, peer) -> Ordering
comparePeerGSV Set peer
forall a. Set a
Set.empty


calculatePeerFetchInFlightLimits :: PeerGSV -> PeerFetchInFlightLimits
calculatePeerFetchInFlightLimits :: PeerGSV -> PeerFetchInFlightLimits
calculatePeerFetchInFlightLimits PeerGSV {
                                   outboundGSV :: PeerGSV -> GSV
outboundGSV = GSV DiffTime
g_out SizeInBytes -> DiffTime
_s_out Distribution DiffTime
_v_out,
                                   inboundGSV :: PeerGSV -> GSV
inboundGSV  = GSV DiffTime
g_in   SizeInBytes -> DiffTime
s_in  Distribution DiffTime
_v_in
                                 } =
    PeerFetchInFlightLimits {
      SizeInBytes
inFlightBytesLowWatermark :: SizeInBytes
inFlightBytesLowWatermark :: SizeInBytes
inFlightBytesLowWatermark,
      SizeInBytes
inFlightBytesHighWatermark :: SizeInBytes
inFlightBytesHighWatermark :: SizeInBytes
inFlightBytesHighWatermark
    }
  where
    -- To keep the remote end busy at all times, we want to make sure that by
    -- the time it finishes sending its last response that there's another
    -- request at the remote end that's ready to go. So we must initiate another
    -- request g_out seconds before the remote end becomes idle.
    --
    -- Now it turns out to be more convenient to measure this not in time, but
    -- based on the number of bytes of requests that are in-flight. This of
    -- course directly corresponds to time, via S_in.
    --
    -- The units of S_in is seconds / octet. We need to calculate the number
    -- of bytes that can be in flight during a time interval t. So we need
    -- octets / second * seconds-in-flight = octets.
    --
    -- > (1/s_in) * t   or equivalently  t/s_in
    --
    -- So for a remote peer, g_out seconds before it becomes idle, it will have
    -- \( g_in\/s_in \) bytes inbound. Our request will arrive after g_out
    -- seconds, we should request at minimum \( g_in\/s_in \) bytes.
    --
    -- We should also account for the fact that we do not have perfect
    -- scheduling and cannot initiate the request at exactly the right time, so
    -- we should request it slightly early (and thus request correspondingly
    -- more). Lets say our maximum schedule delay is @d@ seconds.
    --
    inFlightBytesLowWatermark :: SizeInBytes
inFlightBytesLowWatermark =
        SizeInBytes -> SizeInBytes -> SizeInBytes
forall a. Ord a => a -> a -> a
max SizeInBytes
minLowWaterMark (Pico -> SizeInBytes
forall b. Integral b => Pico -> b
forall a b. (RealFrac a, Integral b) => a -> b
ceiling (DiffTime -> Pico
seconds (DiffTime
g_out DiffTime -> DiffTime -> DiffTime
forall a. Num a => a -> a -> a
+ DiffTime
g_in DiffTime -> DiffTime -> DiffTime
forall a. Num a => a -> a -> a
+ DiffTime
d) Pico -> Pico -> Pico
forall a. Fractional a => a -> a -> a
/ DiffTime -> Pico
seconds (SizeInBytes -> DiffTime
s_in SizeInBytes
1)))
      where
        -- To ensure that blockfetch can do pipelining we enforce a minimal
        -- low water mark of at least 3 64k blocks
        minLowWaterMark :: SizeInBytes
        minLowWaterMark :: SizeInBytes
minLowWaterMark = SizeInBytes
3 SizeInBytes -> SizeInBytes -> SizeInBytes
forall a. Num a => a -> a -> a
* SizeInBytes
64 SizeInBytes -> SizeInBytes -> SizeInBytes
forall a. Num a => a -> a -> a
* SizeInBytes
1024

        seconds :: DiffTime -> Fixed.Pico
        seconds :: DiffTime -> Pico
seconds = DiffTime -> Pico
forall a b. (Real a, Fractional b) => a -> b
realToFrac
      --FIXME: s is now a function of bytes, not unit seconds / octet

    d :: DiffTime
d = DiffTime
2e-2 -- 20 milliseconds, we desire to make a new descison every 10ms.
             -- This gives us some margin.

    -- But note that the minimum here is based on the assumption that we can
    -- react as the /leading/ edge of the low watermark arrives, but in fact
    -- we can only react when the /trailing/ edge arrives. So when we

    -- The high watermark is a bit arbitrary. It's just about making sure we
    -- have a bit of a buffer so we can ask for more in one go, rather than
    -- asking for lots of small requests very frequently.
    inFlightBytesHighWatermark :: SizeInBytes
inFlightBytesHighWatermark = SizeInBytes
inFlightBytesLowWatermark SizeInBytes -> SizeInBytes -> SizeInBytes
forall a. Num a => a -> a -> a
* SizeInBytes
2


-- | Given the 'PeerGSV', the bytes already in flight and the size of new
-- blocks to download, estimate the probability of the download completing
-- within the deadline.
--
-- This is an appropriate estimator to use in a situation where meeting a
-- known deadline is the goal.
--
estimateResponseDeadlineProbability :: PeerGSV
                                    -> SizeInBytes
                                    -> SizeInBytes
                                    -> DiffTime
                                    -> Double
estimateResponseDeadlineProbability :: PeerGSV -> SizeInBytes -> SizeInBytes -> DiffTime -> Double
estimateResponseDeadlineProbability PeerGSV{GSV
outboundGSV :: PeerGSV -> GSV
outboundGSV :: GSV
outboundGSV, GSV
inboundGSV :: PeerGSV -> GSV
inboundGSV :: GSV
inboundGSV}
                                    SizeInBytes
bytesInFlight SizeInBytes
bytesRequested DiffTime
deadline =
    DiffTime -> DeltaQ -> Double
deltaqProbabilityMassBeforeDeadline DiffTime
deadline (DeltaQ -> Double) -> DeltaQ -> Double
forall a b. (a -> b) -> a -> b
$
        GSV -> SizeInBytes -> DeltaQ
gsvTrailingEdgeArrive GSV
outboundGSV SizeInBytes
reqSize
     DeltaQ -> DeltaQ -> DeltaQ
forall a. Semigroup a => a -> a -> a
<> GSV -> SizeInBytes -> DeltaQ
gsvTrailingEdgeArrive GSV
inboundGSV SizeInBytes
respSize
  where
    reqSize :: SizeInBytes
reqSize  = SizeInBytes
100 -- TODO not exact, but it's small
    respSize :: SizeInBytes
respSize = SizeInBytes
bytesInFlight SizeInBytes -> SizeInBytes -> SizeInBytes
forall a. Num a => a -> a -> a
+ SizeInBytes
bytesRequested


-- | Given the 'PeerGSV', the bytes already in flight and the size of new
-- blocks to download, estimate the expected (mean) time to complete the
-- download.
--
-- This is an appropriate estimator to use when trying to minimising the
-- expected overall download time case in the long run (rather than optimising
-- for the worst case in the short term).
--
estimateExpectedResponseDuration :: PeerGSV
                                 -> SizeInBytes -- ^ Request size
                                 -> SizeInBytes -- ^ Expected response size
                                 -> DiffTime
estimateExpectedResponseDuration :: PeerGSV -> SizeInBytes -> SizeInBytes -> DiffTime
estimateExpectedResponseDuration PeerGSV{GSV
outboundGSV :: PeerGSV -> GSV
outboundGSV :: GSV
outboundGSV, GSV
inboundGSV :: PeerGSV -> GSV
inboundGSV :: GSV
inboundGSV}
                           SizeInBytes
bytesInFlight SizeInBytes
bytesRequested =
    DeltaQ -> DiffTime
deltaqQ50thPercentile (DeltaQ -> DiffTime) -> DeltaQ -> DiffTime
forall a b. (a -> b) -> a -> b
$
        GSV -> SizeInBytes -> DeltaQ
gsvTrailingEdgeArrive GSV
outboundGSV SizeInBytes
reqSize
     DeltaQ -> DeltaQ -> DeltaQ
forall a. Semigroup a => a -> a -> a
<> GSV -> SizeInBytes -> DeltaQ
gsvTrailingEdgeArrive GSV
inboundGSV SizeInBytes
respSize
  where
    reqSize :: SizeInBytes
reqSize  = SizeInBytes
100 -- TODO not exact, but it's small
    respSize :: SizeInBytes
respSize = SizeInBytes
bytesInFlight SizeInBytes -> SizeInBytes -> SizeInBytes
forall a. Num a => a -> a -> a
+ SizeInBytes
bytesRequested

{-
estimateBlockFetchResponse :: PeerGSV
                           -> PeerFetchInFlight header
                           -> [SizeInBytes]
                           -> DiffTime
estimateBlockFetchResponse gsvs
                           PeerFetchInFlight{peerFetchBytesInFlight}
                           blockSizes =
    gsvRequestResponseDuration gsvs reqSize respSize
  where
    reqSize  = 100 -- not exact, but it's small
    respSize = peerFetchBytesInFlight + sum blockSizes


-- | The /trailing/ edge arrival schedule for a bunch of blocks.
--
blockArrivalShedule :: PeerGSV
                    -> PeerFetchInFlight header
                    -> [SizeInBytes]
                    -> [DiffTime]
blockArrivalShedule gsvs
                    PeerFetchInFlight{peerFetchBytesInFlight}
                    blockSizes =
    [ gsvRequestResponseDuration gsvs reqSize respSize
    | respSize <- cumulativeSumFrom peerFetchBytesInFlight blockSizes
    ]
  where
    reqSize = 100 -- not exact, but it's small

    cumulativeSumFrom n = tail . scanl (+) n
-}