Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions dmq-node/app/Main.hs
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ import DMQ.Configuration.CLIOptions (parseCLIOptions)
import DMQ.Configuration.Topology (readTopologyFile)
import DMQ.Diffusion.Applications (diffusionApplications)
import DMQ.Diffusion.Arguments
import DMQ.Diffusion.NodeKernel
import DMQ.Diffusion.NodeKernel as NodeKernel
import DMQ.Diffusion.PeerSelection (policy)
import DMQ.Handlers.TopLevel (toplevelExceptionHandler)
import DMQ.NodeToClient qualified as NtC
Expand Down Expand Up @@ -247,7 +247,7 @@ runDMQ commandLineConfig = do
dmqLimitsAndTimeouts
dmqNtNApps
dmqNtCApps
(policy policyRngVar)
(policy policyRngVar nodeKernel.peerMetrics)

Diffusion.run dmqDiffusionArguments
dmqDiffusionTracers
Expand Down
2 changes: 2 additions & 0 deletions dmq-node/dmq-node.cabal
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,7 @@ library
DMQ.NodeToClient.Version
DMQ.NodeToNode
DMQ.NodeToNode.Version
DMQ.PeerSelection.PeerMetric
DMQ.Policy
DMQ.Protocol.LocalMsgNotification.Client
DMQ.Protocol.LocalMsgNotification.Codec
Expand Down Expand Up @@ -131,6 +132,7 @@ library
optparse-applicative >=0.18 && <0.20,
ouroboros-consensus:{ouroboros-consensus, cardano, diffusion},
ouroboros-network:{ouroboros-network, api, framework, framework-tracing, orphan-instances, protocols, tracing} ^>=1.1.0.0,
psqueues,
quiet,
random ^>=1.3,
singletons,
Expand Down
4 changes: 4 additions & 0 deletions dmq-node/src/DMQ/Diffusion/NodeKernel.hs
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ import Ouroboros.Network.TxSubmission.Mempool.Simple qualified as Mempool

import DMQ.Configuration
import DMQ.Diffusion.NodeKernel.Types
import DMQ.PeerSelection.PeerMetric (mkPeerMetrics)
import DMQ.Policy qualified as Policy
import DMQ.Protocol.SigSubmission.Type (Sig (sigExpiresAt, sigId), SigId)
import DMQ.Tracer
Expand Down Expand Up @@ -94,6 +95,8 @@ newNodeKernel rng = do
ps_POLICY_PEER_SHARE_STICKY_TIME
ps_POLICY_PEER_SHARE_MAX_PEERS

peerMetrics <- mkPeerMetrics

pure NodeKernel { fetchClientRegistry
, peerSharingRegistry
, peerSharingAPI
Expand All @@ -103,6 +106,7 @@ newNodeKernel rng = do
, sigSharedTxStateVar
, nextEpochVar
, stakePools
, peerMetrics
}


Expand Down
2 changes: 2 additions & 0 deletions dmq-node/src/DMQ/Diffusion/NodeKernel/Types.hs
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ import Ouroboros.Network.PeerSharing (PeerSharingAPI, PeerSharingRegistry)
import Ouroboros.Network.TxSubmission.Inbound.V2
import Ouroboros.Network.TxSubmission.Mempool.Simple (Mempool (..))

import DMQ.PeerSelection.PeerMetric (PeerMetrics)
import DMQ.Protocol.SigSubmission.Type (Sig, SigId)


Expand All @@ -44,6 +45,7 @@ data NodeKernel crypto ntnAddr m =
, sigSharedTxStateVar :: !(SharedTxStateVar m ntnAddr SigId (Sig crypto))
, stakePools :: !(StakePools m)
, nextEpochVar :: !(StrictTVar m (Maybe UTCTime))
, peerMetrics :: !(PeerMetrics m SigId ntnAddr)
}


Expand Down
14 changes: 10 additions & 4 deletions dmq-node/src/DMQ/Diffusion/PeerSelection.hs
Original file line number Diff line number Diff line change
Expand Up @@ -5,18 +5,23 @@ import Data.List (sortOn, unfoldr)
import Data.Map.Strict qualified as Map
import Data.Set qualified as Set
import Data.Word (Word32)
import Ouroboros.Network.PeerSelection
import System.Random (Random (..), StdGen, splitGen)

import DMQ.PeerSelection.PeerMetric (PeerMetrics)
import DMQ.PeerSelection.PeerMetric qualified as PeerMetric

import Ouroboros.Network.PeerSelection hiding (PeerMetrics)

-- | Trivial peer selection policy used as dummy value
--
policy :: forall peerAddr m.
policy :: forall sigId peerAddr m.
( MonadSTM m
, Ord peerAddr
)
=> StrictTVar m StdGen
-> PeerMetrics m sigId peerAddr
-> PeerSelectionPolicy peerAddr m
policy rngVar =
policy rngVar peerMetrics =
PeerSelectionPolicy {
policyPickKnownPeersForPeerShare = simplePromotionPolicy,
policyPickColdPeersToPromote = simplePromotionPolicy,
Expand All @@ -40,10 +45,11 @@ policy rngVar =
hotDemotionPolicy :: PickPolicy peerAddr (STM m)
hotDemotionPolicy _ _ _ available pickNum = do
available' <- addRand rngVar available (,)
scores <- PeerMetric.announciness peerMetrics
return $ Set.fromList
. map fst
. take pickNum
. sortOn snd
. sortOn (\(peer, rn) -> (Map.findWithDefault 0 peer scores , rn))
. Map.assocs
$ available'

Expand Down
38 changes: 34 additions & 4 deletions dmq-node/src/DMQ/NodeToNode.hs
Original file line number Diff line number Diff line change
Expand Up @@ -33,13 +33,15 @@ import Control.Monad.Class.MonadFork
import Control.Monad.Class.MonadST
import Control.Monad.Class.MonadThrow
import Control.Monad.Class.MonadTimer.SI
import "contra-tracer" Control.Tracer (Tracer, nullTracer)
import "contra-tracer" Control.Tracer (Tracer, nullTracer, traceWith)

import Codec.CBOR.Decoding qualified as CBOR
import Codec.CBOR.Encoding qualified as CBOR
import Codec.CBOR.Read qualified as CBOR
import Data.ByteString.Lazy qualified as BL
import Data.Foldable (traverse_)
import Data.Functor.Contravariant ((>$<))
import Data.Functor.Identity (Identity (..))
import Data.Hashable (Hashable)
import Data.Typeable
import Data.Void (Void)
Expand All @@ -55,6 +57,8 @@ import Cardano.KESAgent.KES.Crypto (Crypto (..))
import DMQ.Configuration (Configuration)
import DMQ.Diffusion.NodeKernel.Types (NodeKernel (..))
import DMQ.NodeToNode.Version
import DMQ.PeerSelection.PeerMetric (ReportPeerMetrics' (..))
import DMQ.PeerSelection.PeerMetric qualified as PeerMetric
import DMQ.Policy qualified as Policy
import DMQ.Protocol.SigSubmission.Codec (byteLimitsSigSubmission,
codecSigSubmission, timeLimitsSigSubmission)
Expand Down Expand Up @@ -207,6 +211,7 @@ ntnApps
, sigChannelVar
, sigMempoolSem
, sigSharedTxStateVar
, peerMetrics
}
Codecs {
sigSubmissionCodecV1
Expand Down Expand Up @@ -247,17 +252,37 @@ ntnApps
eicConnectionId = connId,
eicControlMessage = controlMessage
} channel =
let reportPeerMetrics@ReportPeerMetrics { reportSig } =
PeerMetric.hoist (Mx.TraceLabelPeer (remoteAddress connId) . runIdentity)
. PeerMetric.reportMetric
Policy.peerMetricsConfiguration
$ peerMetrics

-- Modified mempool writer which reports signatures to `PeerMetric`.
mempoolWriter' = mempoolWriter
{ mempoolAddTxs = \sigs -> do
res@(validSigIds, invalidSigIds) <- mempoolAddTxs mempoolWriter sigs
atomically $ do
traverse_ (\sigid ->
traceWith reportSig $ Identity (sigid, PeerMetric.Valid))
validSigIds
traverse_ (\(sigid, _) ->
traceWith reportSig $ Identity (sigid, PeerMetric.NotValidOrNotReceived))
invalidSigIds
return res
}
in
withPeer
(Mx.WithBearer connId >$< sigSubmissionLogicPeerTracer)
sigChannelVar
sigMempoolSem
sigDecisionPolicy
sigSharedTxStateVar
mempoolReader
mempoolWriter
mempoolWriter'
sigSize
(remoteAddress connId)
$ \(peerSigAPI :: PeerTxAPI m SigId (Sig crypto)) ->
( \(peerSigAPI :: PeerTxAPI m SigId (Sig crypto)) ->
runPipelinedAnnotatedPeerWithLimits
(Mx.WithBearer connId >$< sigSubmissionV2ProtocolTracer)
sigSubmissionCodecV2
Expand All @@ -267,9 +292,14 @@ ntnApps
$ sigSubmissionV2InboundPeerPipelined
$ sigSubmissionInbound
(Mx.WithBearer connId >$< sigSubmissionInboundTracer)
mempoolWriter
mempoolWriter'
peerSigAPI
reportPeerMetrics
controlMessage
)
`finally`
-- Remove the peer from `PeerMetric`.
PeerMetric.erasePeer (remoteAddress connId) peerMetrics

aSigSubmissionV1Client
:: NodeToNodeVersion
Expand Down
Loading
Loading