{-# LANGUAGE FlexibleContexts #-}
{-# LANGUAGE NamedFieldPuns #-}
module Ouroboros.Network.PeerSharing
( PeerSharingController
, PeerSharingRegistry (..)
, newPeerSharingRegistry
, bracketPeerSharingClient
, peerSharingClient
, peerSharingServer
, requestPeers
, PeerSharingAPI (..)
, newPeerSharingAPI
, ps_POLICY_PEER_SHARE_STICKY_TIME
, ps_POLICY_PEER_SHARE_MAX_PEERS
, PeerSharingResult (..)
) where
import Control.Applicative (Alternative)
import Control.Concurrent.Class.MonadMVar (MVar, MonadMVar (putMVar),
newEmptyMVar, takeMVar)
import Control.Concurrent.Class.MonadSTM.Strict
import Control.Monad (when)
import Control.Monad.Class.MonadThrow
import Control.Monad.Class.MonadTime.SI
import Data.Hashable (Hashable (..))
import Data.List (sortBy)
import Data.Map.Strict (Map)
import Data.Map.Strict qualified as Map
import Data.Monoid.Synchronisation (FirstToFinish (..), runFirstToFinish)
import Data.Set qualified as Set
import Ouroboros.Network.ControlMessage (ControlMessage (..), ControlMessageSTM)
import Ouroboros.Network.PeerSelection.Governor.Types (PublicPeerSelectionState,
availableToShare)
import Ouroboros.Network.Protocol.PeerSharing.Client (PeerSharingClient (..))
import Ouroboros.Network.Protocol.PeerSharing.Server (PeerSharingServer (..))
import Ouroboros.Network.Protocol.PeerSharing.Type (PeerSharingAmount (..),
PeerSharingResult (..))
import System.Random
newtype PeerSharingController peer m = PeerSharingController {
forall peer (m :: * -> *).
PeerSharingController peer m
-> StrictTMVar m (PeerSharingAmount, MVar m [peer])
requestQueue :: StrictTMVar m (PeerSharingAmount, MVar m [peer])
}
requestPeers :: (MonadMVar m, MonadSTM m)
=> PeerSharingController peer m -> PeerSharingAmount -> m [peer]
requestPeers :: forall (m :: * -> *) peer.
(MonadMVar m, MonadSTM m) =>
PeerSharingController peer m -> PeerSharingAmount -> m [peer]
requestPeers (PeerSharingController StrictTMVar m (PeerSharingAmount, MVar m [peer])
requestQueue) PeerSharingAmount
amount = do
res <- m (MVar m [peer])
forall a. m (MVar m a)
forall (m :: * -> *) a. MonadMVar m => m (MVar m a)
newEmptyMVar
atomically $ putTMVar requestQueue (amount, res)
takeMVar res
newtype PeerSharingRegistry peer m = PeerSharingRegistry {
forall peer (m :: * -> *).
PeerSharingRegistry peer m
-> StrictTVar m (Map peer (PeerSharingController peer m))
getPeerSharingRegistry :: StrictTVar m (Map peer (PeerSharingController peer m))
}
newPeerSharingRegistry :: (MonadSTM m, Ord peer)
=> m (PeerSharingRegistry peer m)
newPeerSharingRegistry :: forall (m :: * -> *) peer.
(MonadSTM m, Ord peer) =>
m (PeerSharingRegistry peer m)
newPeerSharingRegistry = StrictTVar m (Map peer (PeerSharingController peer m))
-> PeerSharingRegistry peer m
forall peer (m :: * -> *).
StrictTVar m (Map peer (PeerSharingController peer m))
-> PeerSharingRegistry peer m
PeerSharingRegistry (StrictTVar m (Map peer (PeerSharingController peer m))
-> PeerSharingRegistry peer m)
-> m (StrictTVar m (Map peer (PeerSharingController peer m)))
-> m (PeerSharingRegistry peer m)
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> Map peer (PeerSharingController peer m)
-> m (StrictTVar m (Map peer (PeerSharingController peer m)))
forall (m :: * -> *) a. MonadSTM m => a -> m (StrictTVar m a)
newTVarIO Map peer (PeerSharingController peer m)
forall a. Monoid a => a
mempty
bracketPeerSharingClient :: (Ord peer, MonadSTM m, MonadThrow m)
=> PeerSharingRegistry peer m
-> peer
-> (PeerSharingController peer m -> m a)
-> m a
bracketPeerSharingClient :: forall peer (m :: * -> *) a.
(Ord peer, MonadSTM m, MonadThrow m) =>
PeerSharingRegistry peer m
-> peer -> (PeerSharingController peer m -> m a) -> m a
bracketPeerSharingClient (PeerSharingRegistry StrictTVar m (Map peer (PeerSharingController peer m))
registry) peer
peer PeerSharingController peer m -> m a
k = do
newPSController <- StrictTMVar m (PeerSharingAmount, MVar m [peer])
-> PeerSharingController peer m
forall peer (m :: * -> *).
StrictTMVar m (PeerSharingAmount, MVar m [peer])
-> PeerSharingController peer m
PeerSharingController (StrictTMVar m (PeerSharingAmount, MVar m [peer])
-> PeerSharingController peer m)
-> m (StrictTMVar m (PeerSharingAmount, MVar m [peer]))
-> m (PeerSharingController peer m)
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> m (StrictTMVar m (PeerSharingAmount, MVar m [peer]))
forall (m :: * -> *) a. MonadSTM m => m (StrictTMVar m a)
newEmptyTMVarIO
bracket (atomically (modifyTVar registry (Map.insert peer newPSController)))
(\()
_ -> STM m () -> m ()
forall a. HasCallStack => STM m a -> m a
forall (m :: * -> *) a.
(MonadSTM m, HasCallStack) =>
STM m a -> m a
atomically (StrictTVar m (Map peer (PeerSharingController peer m))
-> (Map peer (PeerSharingController peer m)
-> Map peer (PeerSharingController peer m))
-> STM m ()
forall (m :: * -> *) a.
MonadSTM m =>
StrictTVar m a -> (a -> a) -> STM m ()
modifyTVar StrictTVar m (Map peer (PeerSharingController peer m))
registry (peer
-> Map peer (PeerSharingController peer m)
-> Map peer (PeerSharingController peer m)
forall k a. Ord k => k -> Map k a -> Map k a
Map.delete peer
peer)))
(\()
_ -> PeerSharingController peer m -> m a
k PeerSharingController peer m
newPSController)
data PeerSharingError =
PeerSharingProtocolViolation PeerSharingAmount Int
deriving Int -> PeerSharingError -> ShowS
[PeerSharingError] -> ShowS
PeerSharingError -> String
(Int -> PeerSharingError -> ShowS)
-> (PeerSharingError -> String)
-> ([PeerSharingError] -> ShowS)
-> Show PeerSharingError
forall a.
(Int -> a -> ShowS) -> (a -> String) -> ([a] -> ShowS) -> Show a
$cshowsPrec :: Int -> PeerSharingError -> ShowS
showsPrec :: Int -> PeerSharingError -> ShowS
$cshow :: PeerSharingError -> String
show :: PeerSharingError -> String
$cshowList :: [PeerSharingError] -> ShowS
showList :: [PeerSharingError] -> ShowS
Show
instance Exception PeerSharingError
peerSharingClient :: ( Alternative (STM m)
, MonadMVar m
, MonadSTM m
, MonadThrow m
)
=> ControlMessageSTM m
-> PeerSharingController peer m
-> m (PeerSharingClient peer m ())
peerSharingClient :: forall (m :: * -> *) peer.
(Alternative (STM m), MonadMVar m, MonadSTM m, MonadThrow m) =>
ControlMessageSTM m
-> PeerSharingController peer m -> m (PeerSharingClient peer m ())
peerSharingClient ControlMessageSTM m
controlMessageSTM
psc :: PeerSharingController peer m
psc@PeerSharingController { StrictTMVar m (PeerSharingAmount, MVar m [peer])
requestQueue :: forall peer (m :: * -> *).
PeerSharingController peer m
-> StrictTMVar m (PeerSharingAmount, MVar m [peer])
requestQueue :: StrictTMVar m (PeerSharingAmount, MVar m [peer])
requestQueue } = do
mbTerminated <- STM m (Maybe (PeerSharingAmount, MVar m [peer]))
-> m (Maybe (PeerSharingAmount, MVar m [peer]))
forall a. HasCallStack => STM m a -> m a
forall (m :: * -> *) a.
(MonadSTM m, HasCallStack) =>
STM m a -> m a
atomically
(STM m (Maybe (PeerSharingAmount, MVar m [peer]))
-> m (Maybe (PeerSharingAmount, MVar m [peer])))
-> STM m (Maybe (PeerSharingAmount, MVar m [peer]))
-> m (Maybe (PeerSharingAmount, MVar m [peer]))
forall a b. (a -> b) -> a -> b
$ FirstToFinish (STM m) (Maybe (PeerSharingAmount, MVar m [peer]))
-> STM m (Maybe (PeerSharingAmount, MVar m [peer]))
forall (m :: * -> *) a. FirstToFinish m a -> m a
runFirstToFinish
(FirstToFinish (STM m) (Maybe (PeerSharingAmount, MVar m [peer]))
-> STM m (Maybe (PeerSharingAmount, MVar m [peer])))
-> FirstToFinish (STM m) (Maybe (PeerSharingAmount, MVar m [peer]))
-> STM m (Maybe (PeerSharingAmount, MVar m [peer]))
forall a b. (a -> b) -> a -> b
$ STM m (Maybe (PeerSharingAmount, MVar m [peer]))
-> FirstToFinish (STM m) (Maybe (PeerSharingAmount, MVar m [peer]))
forall (m :: * -> *) a. m a -> FirstToFinish m a
FirstToFinish ((PeerSharingAmount, MVar m [peer])
-> Maybe (PeerSharingAmount, MVar m [peer])
forall a. a -> Maybe a
Just ((PeerSharingAmount, MVar m [peer])
-> Maybe (PeerSharingAmount, MVar m [peer]))
-> STM m (PeerSharingAmount, MVar m [peer])
-> STM m (Maybe (PeerSharingAmount, MVar m [peer]))
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> StrictTMVar m (PeerSharingAmount, MVar m [peer])
-> STM m (PeerSharingAmount, MVar m [peer])
forall (m :: * -> *) a. MonadSTM m => StrictTMVar m a -> STM m a
takeTMVar StrictTMVar m (PeerSharingAmount, MVar m [peer])
requestQueue)
FirstToFinish (STM m) (Maybe (PeerSharingAmount, MVar m [peer]))
-> FirstToFinish (STM m) (Maybe (PeerSharingAmount, MVar m [peer]))
-> FirstToFinish (STM m) (Maybe (PeerSharingAmount, MVar m [peer]))
forall a. Semigroup a => a -> a -> a
<> STM m (Maybe (PeerSharingAmount, MVar m [peer]))
-> FirstToFinish (STM m) (Maybe (PeerSharingAmount, MVar m [peer]))
forall (m :: * -> *) a. m a -> FirstToFinish m a
FirstToFinish (do controlMessage <- ControlMessageSTM m
controlMessageSTM
case controlMessage of
ControlMessage
Terminate -> Maybe (PeerSharingAmount, MVar m [peer])
-> STM m (Maybe (PeerSharingAmount, MVar m [peer]))
forall a. a -> STM m a
forall (m :: * -> *) a. Monad m => a -> m a
return Maybe (PeerSharingAmount, MVar m [peer])
forall a. Maybe a
Nothing
ControlMessage
_ -> STM m (Maybe (PeerSharingAmount, MVar m [peer]))
forall a. STM m a
forall (m :: * -> *) a. MonadSTM m => STM m a
retry
)
case mbTerminated of
Maybe (PeerSharingAmount, MVar m [peer])
Nothing -> PeerSharingClient peer m () -> m (PeerSharingClient peer m ())
forall a. a -> m a
forall (m :: * -> *) a. Monad m => a -> m a
return
(PeerSharingClient peer m () -> m (PeerSharingClient peer m ()))
-> PeerSharingClient peer m () -> m (PeerSharingClient peer m ())
forall a b. (a -> b) -> a -> b
$ m () -> PeerSharingClient peer m ()
forall (m :: * -> *) a peerAddress.
m a -> PeerSharingClient peerAddress m a
SendMsgDone (() -> m ()
forall a. a -> m a
forall (m :: * -> *) a. Monad m => a -> m a
return ())
Just (PeerSharingAmount
amount, MVar m [peer]
resultQueue) -> PeerSharingClient peer m () -> m (PeerSharingClient peer m ())
forall a. a -> m a
forall (m :: * -> *) a. Monad m => a -> m a
return (PeerSharingClient peer m () -> m (PeerSharingClient peer m ()))
-> PeerSharingClient peer m () -> m (PeerSharingClient peer m ())
forall a b. (a -> b) -> a -> b
$
PeerSharingAmount
-> ([peer] -> m (PeerSharingClient peer m ()))
-> PeerSharingClient peer m ()
forall peerAddress (m :: * -> *) a.
PeerSharingAmount
-> ([peerAddress] -> m (PeerSharingClient peerAddress m a))
-> PeerSharingClient peerAddress m a
SendMsgShareRequest PeerSharingAmount
amount (([peer] -> m (PeerSharingClient peer m ()))
-> PeerSharingClient peer m ())
-> ([peer] -> m (PeerSharingClient peer m ()))
-> PeerSharingClient peer m ()
forall a b. (a -> b) -> a -> b
$ \[peer]
result -> do
let numOfReceived :: Int
numOfReceived = [peer] -> Int
forall a. [a] -> Int
forall (t :: * -> *) a. Foldable t => t a -> Int
length [peer]
result
Bool -> m () -> m ()
forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
when (Int
numOfReceived Int -> Int -> Bool
forall a. Ord a => a -> a -> Bool
> PeerSharingAmount -> Int
forall a b. (Integral a, Num b) => a -> b
fromIntegral PeerSharingAmount
amount) (m () -> m ()) -> m () -> m ()
forall a b. (a -> b) -> a -> b
$
PeerSharingError -> m ()
forall e a. Exception e => e -> m a
forall (m :: * -> *) e a. (MonadThrow m, Exception e) => e -> m a
throwIO (PeerSharingAmount -> Int -> PeerSharingError
PeerSharingProtocolViolation PeerSharingAmount
amount Int
numOfReceived)
MVar m [peer] -> [peer] -> m ()
forall a. MVar m a -> a -> m ()
forall (m :: * -> *) a. MonadMVar m => MVar m a -> a -> m ()
putMVar MVar m [peer]
resultQueue [peer]
result
ControlMessageSTM m
-> PeerSharingController peer m -> m (PeerSharingClient peer m ())
forall (m :: * -> *) peer.
(Alternative (STM m), MonadMVar m, MonadSTM m, MonadThrow m) =>
ControlMessageSTM m
-> PeerSharingController peer m -> m (PeerSharingClient peer m ())
peerSharingClient ControlMessageSTM m
controlMessageSTM PeerSharingController peer m
psc
peerSharingServer :: ( MonadSTM m
, MonadMonotonicTime m
, Hashable peer
, RandomGen s
)
=> PeerSharingAPI peer s m
-> PeerSharingServer peer m
peerSharingServer :: forall (m :: * -> *) peer s.
(MonadSTM m, MonadMonotonicTime m, Hashable peer, RandomGen s) =>
PeerSharingAPI peer s m -> PeerSharingServer peer m
peerSharingServer PeerSharingAPI peer s m
peerSharingAPI =
PeerSharingServer
{ recvMsgShareRequest :: PeerSharingAmount -> m ([peer], PeerSharingServer peer m)
recvMsgShareRequest = \PeerSharingAmount
amount -> (,) ([peer]
-> PeerSharingServer peer m -> ([peer], PeerSharingServer peer m))
-> m [peer]
-> m (PeerSharingServer peer m
-> ([peer], PeerSharingServer peer m))
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> PeerSharingAPI peer s m -> PeerSharingAmount -> m [peer]
forall (m :: * -> *) ntnAddr s.
(MonadSTM m, MonadMonotonicTime m, Hashable ntnAddr,
RandomGen s) =>
PeerSharingAPI ntnAddr s m -> PeerSharingAmount -> m [ntnAddr]
computePeerSharingPeers PeerSharingAPI peer s m
peerSharingAPI PeerSharingAmount
amount
m (PeerSharingServer peer m -> ([peer], PeerSharingServer peer m))
-> m (PeerSharingServer peer m)
-> m ([peer], PeerSharingServer peer m)
forall a b. m (a -> b) -> m a -> m b
forall (f :: * -> *) a b. Applicative f => f (a -> b) -> f a -> f b
<*> PeerSharingServer peer m -> m (PeerSharingServer peer m)
forall a. a -> m a
forall (m :: * -> *) a. Monad m => a -> m a
return (PeerSharingAPI peer s m -> PeerSharingServer peer m
forall (m :: * -> *) peer s.
(MonadSTM m, MonadMonotonicTime m, Hashable peer, RandomGen s) =>
PeerSharingAPI peer s m -> PeerSharingServer peer m
peerSharingServer PeerSharingAPI peer s m
peerSharingAPI)
}
data PeerSharingAPI addr s m =
PeerSharingAPI { forall addr s (m :: * -> *).
PeerSharingAPI addr s m
-> StrictTVar m (PublicPeerSelectionState addr)
psPublicPeerSelectionStateVar :: StrictTVar m (PublicPeerSelectionState addr)
, forall addr s (m :: * -> *).
PeerSharingAPI addr s m -> StrictTVar m s
psGenVar :: StrictTVar m s
, forall addr s (m :: * -> *).
PeerSharingAPI addr s m -> StrictTVar m Time
psReSaltAtVar :: StrictTVar m Time
, forall addr s (m :: * -> *). PeerSharingAPI addr s m -> DiffTime
psPolicyPeerShareStickyTime :: !DiffTime
, forall addr s (m :: * -> *).
PeerSharingAPI addr s m -> PeerSharingAmount
psPolicyPeerShareMaxPeers :: !PeerSharingAmount
}
ps_POLICY_PEER_SHARE_STICKY_TIME :: DiffTime
ps_POLICY_PEER_SHARE_STICKY_TIME :: DiffTime
ps_POLICY_PEER_SHARE_STICKY_TIME = DiffTime
823
ps_POLICY_PEER_SHARE_MAX_PEERS :: PeerSharingAmount
ps_POLICY_PEER_SHARE_MAX_PEERS :: PeerSharingAmount
ps_POLICY_PEER_SHARE_MAX_PEERS = PeerSharingAmount
10
newPeerSharingAPI :: MonadSTM m
=> StrictTVar m (PublicPeerSelectionState addr)
-> s
-> DiffTime
-> PeerSharingAmount
-> m (PeerSharingAPI addr s m)
newPeerSharingAPI :: forall (m :: * -> *) addr s.
MonadSTM m =>
StrictTVar m (PublicPeerSelectionState addr)
-> s
-> DiffTime
-> PeerSharingAmount
-> m (PeerSharingAPI addr s m)
newPeerSharingAPI StrictTVar m (PublicPeerSelectionState addr)
publicPeerSelectionStateVar
s
rng
DiffTime
policyPeerShareStickyTime
PeerSharingAmount
policyPeerShareMaxPeers = do
genVar <- s -> m (StrictTVar m s)
forall (m :: * -> *) a. MonadSTM m => a -> m (StrictTVar m a)
newTVarIO s
rng
reSaltAtVar <- newTVarIO (Time 0)
return $
PeerSharingAPI { psPublicPeerSelectionStateVar = publicPeerSelectionStateVar,
psGenVar = genVar,
psReSaltAtVar = reSaltAtVar,
psPolicyPeerShareStickyTime = policyPeerShareStickyTime,
psPolicyPeerShareMaxPeers = policyPeerShareMaxPeers
}
computePeerSharingPeers :: ( MonadSTM m
, MonadMonotonicTime m
, Hashable ntnAddr
, RandomGen s
)
=> PeerSharingAPI ntnAddr s m
-> PeerSharingAmount
-> m [ntnAddr]
computePeerSharingPeers :: forall (m :: * -> *) ntnAddr s.
(MonadSTM m, MonadMonotonicTime m, Hashable ntnAddr,
RandomGen s) =>
PeerSharingAPI ntnAddr s m -> PeerSharingAmount -> m [ntnAddr]
computePeerSharingPeers PeerSharingAPI{ StrictTVar m (PublicPeerSelectionState ntnAddr)
psPublicPeerSelectionStateVar :: forall addr s (m :: * -> *).
PeerSharingAPI addr s m
-> StrictTVar m (PublicPeerSelectionState addr)
psPublicPeerSelectionStateVar :: StrictTVar m (PublicPeerSelectionState ntnAddr)
psPublicPeerSelectionStateVar,
DiffTime
psPolicyPeerShareStickyTime :: forall addr s (m :: * -> *). PeerSharingAPI addr s m -> DiffTime
psPolicyPeerShareStickyTime :: DiffTime
psPolicyPeerShareStickyTime,
PeerSharingAmount
psPolicyPeerShareMaxPeers :: forall addr s (m :: * -> *).
PeerSharingAPI addr s m -> PeerSharingAmount
psPolicyPeerShareMaxPeers :: PeerSharingAmount
psPolicyPeerShareMaxPeers,
StrictTVar m Time
psReSaltAtVar :: forall addr s (m :: * -> *).
PeerSharingAPI addr s m -> StrictTVar m Time
psReSaltAtVar :: StrictTVar m Time
psReSaltAtVar,
StrictTVar m s
psGenVar :: forall addr s (m :: * -> *).
PeerSharingAPI addr s m -> StrictTVar m s
psGenVar :: StrictTVar m s
psGenVar
} PeerSharingAmount
amount = do
now <- m Time
forall (m :: * -> *). MonadMonotonicTime m => m Time
getMonotonicTime
publicState <- readTVarIO psPublicPeerSelectionStateVar
salt <- atomically $ do
reSaltAt <- readTVar psReSaltAtVar
if reSaltAt <= now
then do
writeTVar psReSaltAtVar $ addTime psPolicyPeerShareStickyTime now
stateTVar psGenVar random
else do
gen <- readTVar psGenVar
return $ fst $ random gen
let availableToShareSet = PublicPeerSelectionState ntnAddr -> Set ntnAddr
forall peeraddr. PublicPeerSelectionState peeraddr -> Set peeraddr
availableToShare PublicPeerSelectionState ntnAddr
publicState
randomList = Int -> [ntnAddr] -> [ntnAddr]
forall a. Int -> [a] -> [a]
take (PeerSharingAmount -> Int
forall a b. (Integral a, Num b) => a -> b
fromIntegral PeerSharingAmount
psPolicyPeerShareMaxPeers
Int -> Int -> Int
forall a. Ord a => a -> a -> a
`min`
PeerSharingAmount -> Int
forall a b. (Integral a, Num b) => a -> b
fromIntegral PeerSharingAmount
amount)
([ntnAddr] -> [ntnAddr]) -> [ntnAddr] -> [ntnAddr]
forall a b. (a -> b) -> a -> b
$ (ntnAddr -> ntnAddr -> Ordering) -> [ntnAddr] -> [ntnAddr]
forall a. (a -> a -> Ordering) -> [a] -> [a]
sortBy (\ntnAddr
a ntnAddr
b -> Int -> Int -> Ordering
forall a. Ord a => a -> a -> Ordering
compare (Int -> ntnAddr -> Int
forall a. Hashable a => Int -> a -> Int
hashWithSalt Int
salt ntnAddr
a) (Int -> ntnAddr -> Int
forall a. Hashable a => Int -> a -> Int
hashWithSalt Int
salt ntnAddr
b))
([ntnAddr] -> [ntnAddr]) -> [ntnAddr] -> [ntnAddr]
forall a b. (a -> b) -> a -> b
$ Set ntnAddr -> [ntnAddr]
forall a. Set a -> [a]
Set.elems Set ntnAddr
availableToShareSet
if null availableToShareSet
then return []
else return randomList