diff --git a/dmq-node/app/Main.hs b/dmq-node/app/Main.hs index b43819d..bdb62b7 100644 --- a/dmq-node/app/Main.hs +++ b/dmq-node/app/Main.hs @@ -46,7 +46,7 @@ import DMQ.Configuration.CLIOptions (parseCLIOptions) import DMQ.Configuration.Topology (readTopologyFile) import DMQ.Diffusion.Applications (diffusionApplications) import DMQ.Diffusion.Arguments -import DMQ.Diffusion.NodeKernel +import DMQ.Diffusion.NodeKernel as NodeKernel import DMQ.Diffusion.PeerSelection (policy) import DMQ.Handlers.TopLevel (toplevelExceptionHandler) import DMQ.NodeToClient qualified as NtC @@ -247,7 +247,7 @@ runDMQ commandLineConfig = do dmqLimitsAndTimeouts dmqNtNApps dmqNtCApps - (policy policyRngVar) + (policy policyRngVar nodeKernel.peerMetrics) Diffusion.run dmqDiffusionArguments dmqDiffusionTracers diff --git a/dmq-node/dmq-node.cabal b/dmq-node/dmq-node.cabal index 70e4f22..b57917f 100644 --- a/dmq-node/dmq-node.cabal +++ b/dmq-node/dmq-node.cabal @@ -74,6 +74,7 @@ library DMQ.NodeToClient.Version DMQ.NodeToNode DMQ.NodeToNode.Version + DMQ.PeerSelection.PeerMetric DMQ.Policy DMQ.Protocol.LocalMsgNotification.Client DMQ.Protocol.LocalMsgNotification.Codec @@ -131,6 +132,7 @@ library optparse-applicative >=0.18 && <0.20, ouroboros-consensus:{ouroboros-consensus, cardano, diffusion}, ouroboros-network:{ouroboros-network, api, framework, framework-tracing, orphan-instances, protocols, tracing} ^>=1.1.0.0, + psqueues, quiet, random ^>=1.3, singletons, diff --git a/dmq-node/src/DMQ/Diffusion/NodeKernel.hs b/dmq-node/src/DMQ/Diffusion/NodeKernel.hs index fe8980d..b3ff3ba 100644 --- a/dmq-node/src/DMQ/Diffusion/NodeKernel.hs +++ b/dmq-node/src/DMQ/Diffusion/NodeKernel.hs @@ -41,6 +41,7 @@ import Ouroboros.Network.TxSubmission.Mempool.Simple qualified as Mempool import DMQ.Configuration import DMQ.Diffusion.NodeKernel.Types +import DMQ.PeerSelection.PeerMetric (mkPeerMetrics) import DMQ.Policy qualified as Policy import DMQ.Protocol.SigSubmission.Type (Sig (sigExpiresAt, sigId), SigId) import DMQ.Tracer @@ -94,6 +95,8 @@ newNodeKernel rng = do ps_POLICY_PEER_SHARE_STICKY_TIME ps_POLICY_PEER_SHARE_MAX_PEERS + peerMetrics <- mkPeerMetrics + pure NodeKernel { fetchClientRegistry , peerSharingRegistry , peerSharingAPI @@ -103,6 +106,7 @@ newNodeKernel rng = do , sigSharedTxStateVar , nextEpochVar , stakePools + , peerMetrics } diff --git a/dmq-node/src/DMQ/Diffusion/NodeKernel/Types.hs b/dmq-node/src/DMQ/Diffusion/NodeKernel/Types.hs index 0a7149d..d020a43 100644 --- a/dmq-node/src/DMQ/Diffusion/NodeKernel/Types.hs +++ b/dmq-node/src/DMQ/Diffusion/NodeKernel/Types.hs @@ -26,6 +26,7 @@ import Ouroboros.Network.PeerSharing (PeerSharingAPI, PeerSharingRegistry) import Ouroboros.Network.TxSubmission.Inbound.V2 import Ouroboros.Network.TxSubmission.Mempool.Simple (Mempool (..)) +import DMQ.PeerSelection.PeerMetric (PeerMetrics) import DMQ.Protocol.SigSubmission.Type (Sig, SigId) @@ -44,6 +45,7 @@ data NodeKernel crypto ntnAddr m = , sigSharedTxStateVar :: !(SharedTxStateVar m ntnAddr SigId (Sig crypto)) , stakePools :: !(StakePools m) , nextEpochVar :: !(StrictTVar m (Maybe UTCTime)) + , peerMetrics :: !(PeerMetrics m SigId ntnAddr) } diff --git a/dmq-node/src/DMQ/Diffusion/PeerSelection.hs b/dmq-node/src/DMQ/Diffusion/PeerSelection.hs index 39996fb..e63c0ec 100644 --- a/dmq-node/src/DMQ/Diffusion/PeerSelection.hs +++ b/dmq-node/src/DMQ/Diffusion/PeerSelection.hs @@ -5,18 +5,23 @@ import Data.List (sortOn, unfoldr) import Data.Map.Strict qualified as Map import Data.Set qualified as Set import Data.Word (Word32) -import Ouroboros.Network.PeerSelection import System.Random (Random (..), StdGen, splitGen) +import DMQ.PeerSelection.PeerMetric (PeerMetrics) +import DMQ.PeerSelection.PeerMetric qualified as PeerMetric + +import Ouroboros.Network.PeerSelection hiding (PeerMetrics) + -- | Trivial peer selection policy used as dummy value -- -policy :: forall peerAddr m. +policy :: forall sigId peerAddr m. ( MonadSTM m , Ord peerAddr ) => StrictTVar m StdGen + -> PeerMetrics m sigId peerAddr -> PeerSelectionPolicy peerAddr m -policy rngVar = +policy rngVar peerMetrics = PeerSelectionPolicy { policyPickKnownPeersForPeerShare = simplePromotionPolicy, policyPickColdPeersToPromote = simplePromotionPolicy, @@ -40,10 +45,11 @@ policy rngVar = hotDemotionPolicy :: PickPolicy peerAddr (STM m) hotDemotionPolicy _ _ _ available pickNum = do available' <- addRand rngVar available (,) + scores <- PeerMetric.announciness peerMetrics return $ Set.fromList . map fst . take pickNum - . sortOn snd + . sortOn (\(peer, rn) -> (Map.findWithDefault 0 peer scores , rn)) . Map.assocs $ available' diff --git a/dmq-node/src/DMQ/NodeToNode.hs b/dmq-node/src/DMQ/NodeToNode.hs index bd26f1f..f93815d 100644 --- a/dmq-node/src/DMQ/NodeToNode.hs +++ b/dmq-node/src/DMQ/NodeToNode.hs @@ -33,13 +33,15 @@ import Control.Monad.Class.MonadFork import Control.Monad.Class.MonadST import Control.Monad.Class.MonadThrow import Control.Monad.Class.MonadTimer.SI -import "contra-tracer" Control.Tracer (Tracer, nullTracer) +import "contra-tracer" Control.Tracer (Tracer, nullTracer, traceWith) import Codec.CBOR.Decoding qualified as CBOR import Codec.CBOR.Encoding qualified as CBOR import Codec.CBOR.Read qualified as CBOR import Data.ByteString.Lazy qualified as BL +import Data.Foldable (traverse_) import Data.Functor.Contravariant ((>$<)) +import Data.Functor.Identity (Identity (..)) import Data.Hashable (Hashable) import Data.Typeable import Data.Void (Void) @@ -55,6 +57,8 @@ import Cardano.KESAgent.KES.Crypto (Crypto (..)) import DMQ.Configuration (Configuration) import DMQ.Diffusion.NodeKernel.Types (NodeKernel (..)) import DMQ.NodeToNode.Version +import DMQ.PeerSelection.PeerMetric (ReportPeerMetrics' (..)) +import DMQ.PeerSelection.PeerMetric qualified as PeerMetric import DMQ.Policy qualified as Policy import DMQ.Protocol.SigSubmission.Codec (byteLimitsSigSubmission, codecSigSubmission, timeLimitsSigSubmission) @@ -207,6 +211,7 @@ ntnApps , sigChannelVar , sigMempoolSem , sigSharedTxStateVar + , peerMetrics } Codecs { sigSubmissionCodecV1 @@ -247,6 +252,26 @@ ntnApps eicConnectionId = connId, eicControlMessage = controlMessage } channel = + let reportPeerMetrics@ReportPeerMetrics { reportSig } = + PeerMetric.hoist (Mx.TraceLabelPeer (remoteAddress connId) . runIdentity) + . PeerMetric.reportMetric + Policy.peerMetricsConfiguration + $ peerMetrics + + -- Modified mempool writer which reports signatures to `PeerMetric`. + mempoolWriter' = mempoolWriter + { mempoolAddTxs = \sigs -> do + res@(validSigIds, invalidSigIds) <- mempoolAddTxs mempoolWriter sigs + atomically $ do + traverse_ (\sigid -> + traceWith reportSig $ Identity (sigid, PeerMetric.Valid)) + validSigIds + traverse_ (\(sigid, _) -> + traceWith reportSig $ Identity (sigid, PeerMetric.NotValidOrNotReceived)) + invalidSigIds + return res + } + in withPeer (Mx.WithBearer connId >$< sigSubmissionLogicPeerTracer) sigChannelVar @@ -254,10 +279,10 @@ ntnApps sigDecisionPolicy sigSharedTxStateVar mempoolReader - mempoolWriter + mempoolWriter' sigSize (remoteAddress connId) - $ \(peerSigAPI :: PeerTxAPI m SigId (Sig crypto)) -> + ( \(peerSigAPI :: PeerTxAPI m SigId (Sig crypto)) -> runPipelinedAnnotatedPeerWithLimits (Mx.WithBearer connId >$< sigSubmissionV2ProtocolTracer) sigSubmissionCodecV2 @@ -267,9 +292,14 @@ ntnApps $ sigSubmissionV2InboundPeerPipelined $ sigSubmissionInbound (Mx.WithBearer connId >$< sigSubmissionInboundTracer) - mempoolWriter + mempoolWriter' peerSigAPI + reportPeerMetrics controlMessage + ) + `finally` + -- Remove the peer from `PeerMetric`. + PeerMetric.erasePeer (remoteAddress connId) peerMetrics aSigSubmissionV1Client :: NodeToNodeVersion diff --git a/dmq-node/src/DMQ/PeerSelection/PeerMetric.hs b/dmq-node/src/DMQ/PeerSelection/PeerMetric.hs new file mode 100644 index 0000000..b3b9124 --- /dev/null +++ b/dmq-node/src/DMQ/PeerSelection/PeerMetric.hs @@ -0,0 +1,304 @@ +{-# LANGUAGE FlexibleContexts #-} +{-# LANGUAGE PackageImports #-} +{-# LANGUAGE RankNTypes #-} +{-# LANGUAGE ScopedTypeVariables #-} + +module DMQ.PeerSelection.PeerMetric + ( ReportPeerMetrics' (..) + , ReportPeerMetricsI + , ReportPeerMetrics + , hoist + , nullMetrics + , reportMetric + , IsValid (..) + , PeerMetricsConfiguration (..) + , PeerMetrics + , mkPeerMetrics + , erasePeer + , announciness + ) where + +import Control.Concurrent.Class.MonadSTM.Strict +import Control.Monad.Class.MonadTime.SI +import "contra-tracer" Control.Tracer +import Data.Functor.Contravariant ((>$<)) +import Data.Functor.Identity (Identity) +import Data.List qualified as List +import Data.Map.Strict (Map) +import Data.Map.Strict qualified as Map +import Data.OrdPSQ (OrdPSQ) +import Data.OrdPSQ qualified as OrdPSQ + +import Network.Mux.Trace (TraceLabelPeer (..)) + + +newtype PeerMetricsConfiguration = PeerMetricsConfiguration { + timeWindowToKeep :: DiffTime + } + + +-- | Internal state of the metric. +-- +-- SigId enters `sigUnchecked` when we first hear about it from an upstream +-- peer. It's evacuated to `sigMetric` when we acknowledge (it could be either +-- valid, non-valid or unknown - e.g. we didn't receive it). If it was unknown +-- or invalid, it will be removed from `sigUnchecked` for that peer. +-- We only keep `timwWindowToKeep` oldest signatures in the state. +data PeerMetricsState sigid peeraddr = PeerMetricsState { + -- | We store all `SigId`s received `Time` when each peer received it. At + -- this point we don't know yet if it's a valid signature. + sigUnchecked :: Map sigid (OrdPSQ peeraddr Time ()) + + , -- | When a `Sig` for a given `SigId` was fetched, we will move it from + -- `sigUncheked` to `sigMetric`. We can also remove all `peeraddr` which + -- `Time` is greater than the received one, since none of them can win the + -- race. If `Sig` turned out to be invalid or we didn't receive it we + -- remove it from `sigUnchecked`. + sigMetric :: OrdPSQ sigid Time peeraddr + } + +-- | Mutable peer metrics state accessible via 'STM'. +-- +newtype PeerMetrics m sigid peeraddr = PeerMetrics { + peerMetricsVar :: StrictTVar m (PeerMetricsState sigid peeraddr) + } + +mkPeerMetrics :: MonadSTM m => m (PeerMetrics m sigid peeraddr) +mkPeerMetrics = + PeerMetrics <$> + newTVarIO PeerMetricsState { + sigUnchecked = Map.empty, + sigMetric = OrdPSQ.empty + } + +data IsValid = + -- | Sig was valid + Valid + | -- | Sig was not valid or was not received when we requested it. + NotValidOrNotReceived + +data ReportPeerMetrics' m sigid f = ReportPeerMetrics { + -- | report a new `sigid` + reportSigId :: Tracer (STM m) (f (sigid, Time)), + + -- | report a received `sig` + reportSig :: Tracer (STM m) (f (sigid, IsValid)) + } + +type ReportPeerMetricsI m sigid = ReportPeerMetrics' m sigid Identity +type ReportPeerMetrics m sigid peeraddr = ReportPeerMetrics' m sigid (TraceLabelPeer peeraddr) + +hoist :: (forall a. f a -> g a) + -> ReportPeerMetrics' m sigid g + -> ReportPeerMetrics' m sigid f +hoist nat + ReportPeerMetrics { + reportSigId, + reportSig + } + = + ReportPeerMetrics { + reportSigId = nat >$< reportSigId, + reportSig = nat >$< reportSig + } + + +nullMetrics :: Applicative (STM m) => ReportPeerMetrics' m sigid f +nullMetrics = ReportPeerMetrics { + reportSigId = nullTracer, + reportSig = nullTracer + } + +reportMetric + :: forall m sigid peeraddr. + ( MonadSTM m + , Ord sigid + , Ord peeraddr + ) + => PeerMetricsConfiguration + -> PeerMetrics m sigid peeraddr + -> ReportPeerMetrics m sigid peeraddr +reportMetric config peerMetrics = + ReportPeerMetrics { + reportSigId = sigIdMetricTracer peerMetrics, + reportSig = sigMetricTracer config peerMetrics + } + + +sigIdMetricTracer + :: forall m sigid peeraddr. + ( MonadSTM m + , Ord sigid + , Ord peeraddr + ) + => PeerMetrics m sigid peeraddr + -> Tracer (STM m) (TraceLabelPeer peeraddr (sigid, Time)) +sigIdMetricTracer + PeerMetrics { peerMetricsVar } + = + Tracer $ \(TraceLabelPeer peeraddr (sigid, time)) -> do + st@PeerMetricsState { sigUnchecked } <- readTVar peerMetricsVar + let sigUnchecked' = Map.alter (Just . fn peeraddr time) sigid sigUnchecked + writeTVar peerMetricsVar st { sigUnchecked = sigUnchecked' } + where + fn :: peeraddr + -> Time + -> Maybe (OrdPSQ peeraddr Time ()) + -> OrdPSQ peeraddr Time () + fn peeraddr time Nothing = OrdPSQ.singleton peeraddr time () + fn peeraddr time (Just psq) = snd $ OrdPSQ.alter gn peeraddr psq + where + gn :: Maybe (Time, ()) -> ((), Maybe (Time, ())) + gn Nothing = ((), Just (time, ())) + -- we store only the earliest time from the given peer + gn (Just (time', ())) = ((), Just (time' `min` time, ())) + + +sigMetricTracer + :: forall m sigid peeraddr. + ( MonadSTM m + , Ord sigid + , Ord peeraddr + ) + => PeerMetricsConfiguration + -> PeerMetrics m sigid peeraddr + -> Tracer (STM m) + (TraceLabelPeer peeraddr + ( sigid + , IsValid + )) +sigMetricTracer + PeerMetricsConfiguration { timeWindowToKeep } + PeerMetrics { peerMetricsVar } + = + Tracer $ \(TraceLabelPeer peeraddr (sigid, isValid)) -> do + st@PeerMetricsState { sigUnchecked, sigMetric } <- readTVar peerMetricsVar + case isValid of + NotValidOrNotReceived -> do + let notValidFn :: Maybe (OrdPSQ peeraddr Time v) + -> Maybe (OrdPSQ peeraddr Time v) + notValidFn Nothing = Nothing + notValidFn (Just psq) = + -- delete the entry and make sure we don't store empty `psq` in + -- the outer map + let psq' = OrdPSQ.delete peeraddr psq + in case OrdPSQ.findMin psq' of + Nothing -> Nothing + Just {} -> Just psq' + sigUnchecked' = Map.alter notValidFn sigid sigUnchecked + writeTVar peerMetricsVar st { sigUnchecked = sigUnchecked' } + + Valid -> do + let validFn :: Maybe (OrdPSQ peeraddr Time ()) + -> (Maybe Time, Maybe (OrdPSQ peeraddr Time ())) + validFn Nothing = (Nothing, Nothing) + validFn (Just psq) = + -- delete the entry, return time and make sure we don't store + -- empty `psq` in the outer map + let (mbTime', psq') = OrdPSQ.alter (\case + Nothing -> (Nothing, Nothing) + Just (time, _) -> (Just time, Nothing) + ) + peeraddr + psq + in case OrdPSQ.findMin psq' of + Nothing -> (mbTime', Nothing) + Just {} -> (mbTime', Just psq') + (mbTime, sigUnchecked') = Map.alterF validFn sigid sigUnchecked + + sigMetric' = case mbTime of + Nothing -> sigMetric + Just time -> snd $ OrdPSQ.alter + (\case + Nothing -> ((), Just (time, peeraddr)) + Just a@(time', _) -> + -- keep the earliest entry + if time' < time + then ((), Just a) + else ((), Just (time, peeraddr)) + ) + sigid + sigMetric + + sigMetric'' = + case mbTime of + Nothing -> sigMetric' + Just t -> snd $ OrdPSQ.atMostView ((- timeWindowToKeep) `addTime` t) sigMetric' + + writeTVar peerMetricsVar PeerMetricsState { + sigUnchecked = sigUnchecked', + sigMetric = sigMetric'' + } + + +erasePeerImpl + :: ( Ord peeraddr + , Ord sigid + ) + => peeraddr + -> PeerMetricsState sigid peeraddr + -> PeerMetricsState sigid peeraddr +erasePeerImpl + peeraddr + PeerMetricsState { + sigUnchecked, + sigMetric + } + = + PeerMetricsState { + sigUnchecked = + Map.map + (OrdPSQ.delete peeraddr) + sigUnchecked, + sigMetric = + OrdPSQ.fromList + . List.filter (\(_, _, peeraddr') -> peeraddr' /= peeraddr) + . OrdPSQ.toAscList + $ sigMetric + } + +-- | Erase a peer from `PeerMetric`. +-- +erasePeer + :: ( MonadSTM m + , Ord peeraddr + , Ord sigid + ) + => peeraddr + -> PeerMetrics m sigid peeraddr + -> m () +erasePeer peeraddr PeerMetrics { peerMetricsVar } = + atomically $ modifyTVar peerMetricsVar (erasePeerImpl peeraddr) + + +announcinessImpl + :: forall sigid peeraddr. Ord peeraddr + => PeerMetricsState sigid peeraddr + -> Map peeraddr Int +announcinessImpl PeerMetricsState { sigMetric } + = OrdPSQ.fold' count Map.empty sigMetric + where + count :: sigid + -> Time + -> peeraddr + -> Map peeraddr Int + -> Map peeraddr Int + count _ _ peeraddr m = + Map.alter fn peeraddr m + + fn :: Maybe Int -> Maybe Int + fn Nothing = Just 1 + fn (Just n) = Just (n + 1) + + +-- | Metric counters. +-- +announciness + :: forall m sigid peeraddr. + ( Ord peeraddr + , MonadSTM m + ) + => PeerMetrics m sigid peeraddr + -> STM m (Map peeraddr Int) +announciness PeerMetrics { peerMetricsVar } = + announcinessImpl <$> readTVar peerMetricsVar diff --git a/dmq-node/src/DMQ/Policy.hs b/dmq-node/src/DMQ/Policy.hs index 9a0a4df..d4ad9d4 100644 --- a/dmq-node/src/DMQ/Policy.hs +++ b/dmq-node/src/DMQ/Policy.hs @@ -1,12 +1,14 @@ -{-# LANGUAGE NumericUnderscores #-} - module DMQ.Policy ( sigDecisionPolicy , sigSubmissionIngressLimit + , peerMetricsConfiguration ) where +import DMQ.PeerSelection.PeerMetric (PeerMetricsConfiguration (..)) import DMQ.Protocol.SigSubmission.Type (NumTxIdsToReq) + import Network.Mux.Types (MiniProtocolLimits (..)) + import Ouroboros.Network.SizeInBytes (SizeInBytes) import Ouroboros.Network.TxSubmission.Inbound.V2 @@ -56,3 +58,6 @@ sigSubmissionIngressLimit = MiniProtocolLimits { addMargin :: Int -> Int addMargin = \x -> x + x `div` 10 + +peerMetricsConfiguration :: PeerMetricsConfiguration +peerMetricsConfiguration = PeerMetricsConfiguration { timeWindowToKeep = 3600 } diff --git a/dmq-node/src/DMQ/SigSubmissionV2/Inbound.hs b/dmq-node/src/DMQ/SigSubmissionV2/Inbound.hs index aab4532..58c9c18 100644 --- a/dmq-node/src/DMQ/SigSubmissionV2/Inbound.hs +++ b/dmq-node/src/DMQ/SigSubmissionV2/Inbound.hs @@ -6,12 +6,15 @@ {-# LANGUAGE NamedFieldPuns #-} {-# LANGUAGE PackageImports #-} {-# LANGUAGE ScopedTypeVariables #-} +{-# LANGUAGE TupleSections #-} module DMQ.SigSubmissionV2.Inbound ( -- * SigSubmision Inbound client sigSubmissionInbound ) where +import Data.Foldable (traverse_) +import Data.Functor.Identity (Identity (..)) import Data.Map.Strict qualified as Map import Data.Sequence.Strict qualified as StrictSeq import Data.Set qualified as Set @@ -21,6 +24,7 @@ import Control.Exception (assert) import Control.Monad (unless, when) import Control.Monad.Class.MonadAsync (MonadAsync (..)) import Control.Monad.Class.MonadThrow +import Control.Monad.Class.MonadTime.SI import "contra-tracer" Control.Tracer (Tracer, traceWith) import Network.TypedProtocol @@ -33,6 +37,8 @@ import Ouroboros.Network.TxSubmission.Inbound.V2 (PeerTxAPI (..), import Ouroboros.Network.TxSubmission.Inbound.V2.Types (TxSubmissionMempoolWriter (..)) +import DMQ.PeerSelection.PeerMetric (IsValid (..), ReportPeerMetrics' (..), + ReportPeerMetricsI) import DMQ.Protocol.SigSubmissionV2.Inbound import DMQ.Protocol.SigSubmissionV2.Type (NumIdsAck (NumIdsAck), NumIdsReq (..)) import DMQ.SigSubmissionV2.Types @@ -49,11 +55,12 @@ sigSubmissionInbound :: forall sigid sig idx m failure. ( MonadThrow m , MonadAsync m - , Ord sigid + , Ord sigid, MonadMonotonicTime m ) => Tracer m (TraceTxSubmissionInbound sigid sig) -> TxSubmissionMempoolWriter sigid sig idx m failure -> PeerTxAPI m sigid sig + -> ReportPeerMetricsI m sigid -> ControlMessageSTM m -> SigSubmissionInboundPipelined sigid sig m () sigSubmissionInbound @@ -65,6 +72,10 @@ sigSubmissionInbound handleReceivedTxs, submitTxToMempool } + ReportPeerMetrics { + reportSigId, + reportSig + } controlMessageSTM = SigSubmissionInboundPipelined inboundIdle @@ -110,6 +121,9 @@ sigSubmissionInbound -- * `TraceTxInboundAddedToMempool`, and -- * `TraceTxInboundRejectedFromMempool` -- events. + -- + -- NOTE: submitTxToMempool applies `reportSig` to validated + -- signatures mapM_ (uncurry $ submitTxToMempool tracer) listOfTxsToMempool -- TODO: @@ -152,10 +166,12 @@ sigSubmissionInbound (NumIdsAck . getNumTxIdsToAck $ sigIdsToAck) (NumIdsReq . getNumTxIdsToReq $ sigIdsToReq) (\sigids -> do + time <- getMonotonicTime let sigidsSeq = StrictSeq.fromList $ fst <$> sigids sigidsMap = Map.fromList sigids unless (StrictSeq.length sigidsSeq <= fromIntegral sigIdsToReq) $ throwIO ProtocolErrorSigIdsNotRequested + atomically $ traverse_ (\(sigid, _) -> traceWith reportSigId (Identity (sigid, time))) sigids handleReceivedTxIds sigIdsToReq sigidsSeq sigidsMap inboundIdle ) @@ -206,19 +222,26 @@ sigSubmissionInbound -> m (InboundStIdle n sigid sig m ()) handleReply k = \case CollectSigIds sigIdsToReq sigids -> do + time <- getMonotonicTime let sigidsSeq = StrictSeq.fromList $ fst <$> sigids sigidsMap = Map.fromList sigids unless (StrictSeq.length sigidsSeq <= fromIntegral sigIdsToReq) $ throwIO ProtocolErrorSigIdsNotRequested + atomically $ traverse_ (\(sigid, _) -> traceWith reportSigId (Identity (sigid, time))) sigids handleReceivedTxIds (NumTxIdsToReq . getNumIdsReq $ sigIdsToReq) sigidsSeq sigidsMap k CollectSigs sigids sigs -> do - let requested = Map.keysSet sigids - received = Map.fromList [ (txId sig, sig) | sig <- sigs ] + let requested = Map.keysSet sigids + received = Map.fromList [ (txId sig, sig) | sig <- sigs ] + notReceived = requested Set.\\ Map.keysSet received unless (Map.keysSet received `Set.isSubsetOf` requested) $ throwIO ProtocolErrorSigNotRequested + atomically do + traverse_ (traceWith reportSig . Identity . (,NotValidOrNotReceived)) + (Set.toList notReceived) + mbe <- handleReceivedTxs sigids received traceWith tracer $ TraceTxSubmissionCollected (txId `map` sigs) case mbe of diff --git a/dmq-node/test/Test/DMQ/SigSubmission/App.hs b/dmq-node/test/Test/DMQ/SigSubmission/App.hs index 1d9a2cb..66a6aee 100644 --- a/dmq-node/test/Test/DMQ/SigSubmission/App.hs +++ b/dmq-node/test/Test/DMQ/SigSubmission/App.hs @@ -46,6 +46,7 @@ import Ouroboros.Network.Protocol.TxSubmission2.Type (NumTxIdsToReq (..)) import Ouroboros.Network.TxSubmission.Inbound.V2 import Ouroboros.Network.Util.ShowProxy +import DMQ.PeerSelection.PeerMetric qualified as PeerMetric import DMQ.Protocol.SigSubmissionV2.Codec (byteLimitsSigSubmissionV2, timeLimitsSigSubmissionV2) import DMQ.Protocol.SigSubmissionV2.Inbound @@ -314,6 +315,7 @@ runSigSubmissionV2 tracer tracerSigLogic st0 sigDecisionPolicy = do verboseTracer (getMempoolWriter duplicateSigsVar inboundMempool) api + PeerMetric.nullMetrics ctrlMsgSTM runPipelinedPeerWithLimits (("INBOUND " ++ show addr,) `contramap` verboseTracer)