From a5dde117a9725baf95379853746ee7ab1077c8a3 Mon Sep 17 00:00:00 2001 From: Francisco Guerrero Date: Tue, 16 Jun 2026 16:09:35 -0500 Subject: [PATCH] CEP-45: Avoid spamming with the same request during background reconciliation patch by Francisco Guerrero; reviewed by TBD for CASSANDRA-21387 --- .../config/MutationTrackingSpec.java | 6 + .../metrics/MutationTrackingMetrics.java | 6 + .../replication/MutationTrackingService.java | 126 ++++++++++++++++-- .../MutationTrackingServiceMBean.java | 13 ++ .../org/apache/cassandra/tools/NodeProbe.java | 10 ++ .../cassandra/tools/nodetool/MTAdmin.java | 6 +- .../test/tracking/MutationTrackingTest.java | 123 +++++++++++++++++ 7 files changed, 279 insertions(+), 11 deletions(-) diff --git a/src/java/org/apache/cassandra/config/MutationTrackingSpec.java b/src/java/org/apache/cassandra/config/MutationTrackingSpec.java index 30f8acaef52..8d2da0d2fa8 100644 --- a/src/java/org/apache/cassandra/config/MutationTrackingSpec.java +++ b/src/java/org/apache/cassandra/config/MutationTrackingSpec.java @@ -30,4 +30,10 @@ public class MutationTrackingSpec * The interval in which the backgroun reconciliation process runs */ public volatile DurationSpec.LongMillisecondsBound background_reconciliation_interval = new DurationSpec.LongMillisecondsBound("1s"); + /** + * Minimum time to wait before re-issuing a pull request for the same coordinator log + * during background reconciliation. Decoupled from {@link #background_reconciliation_interval} + * so that suppression spans multiple scheduler ticks. + */ + public volatile DurationSpec.LongMillisecondsBound background_reconciliation_request_cooldown = new DurationSpec.LongMillisecondsBound("3s"); } diff --git a/src/java/org/apache/cassandra/metrics/MutationTrackingMetrics.java b/src/java/org/apache/cassandra/metrics/MutationTrackingMetrics.java index 9935a42bd4c..7ea4dbefa54 100644 --- a/src/java/org/apache/cassandra/metrics/MutationTrackingMetrics.java +++ b/src/java/org/apache/cassandra/metrics/MutationTrackingMetrics.java @@ -45,6 +45,9 @@ public static MutationTrackingMetrics instance() public final Counter broadcastOffsetsDiscovered; // Newly-witnessed offsets discovered via broadcast public final Counter writeTimeOffsetsDiscovered; // Newly-witnessed offsets discovered at write time + public final Counter backgroundPullRequestsSent; // PullMutationsRequest messages issued by background reconciliation + public final Counter backgroundPullRequestsSuppressed; // PullMutationsRequest sends skipped by the per-coordinator-log cooldown + public final Counter backgroundPullRequestsFailed; // PullMutationsRequest sends that failed before transmission (overload, serialization, closed connection) public final Histogram readSummarySize; // Read summary sizes public final Gauge unreconciledMutationCount; // Number of unreconciled mutations public final Gauge journalDiskSpaceUsed; // Size of MutationJournal on disk @@ -55,6 +58,9 @@ private MutationTrackingMetrics() { broadcastOffsetsDiscovered = Metrics.counter(factory.createMetricName("BroadcastOffsetsDiscovered")); writeTimeOffsetsDiscovered = Metrics.counter(factory.createMetricName("WriteTimeOffsetsDiscovered")); + backgroundPullRequestsSent = Metrics.counter(factory.createMetricName("BackgroundPullRequestsSent")); + backgroundPullRequestsSuppressed = Metrics.counter(factory.createMetricName("BackgroundPullRequestsSuppressed")); + backgroundPullRequestsFailed = Metrics.counter(factory.createMetricName("BackgroundPullRequestsFailed")); readSummarySize = Metrics.histogram(factory.createMetricName("ReadSummarySize"), true); unreconciledMutationCount = Metrics.register( factory.createMetricName("UnreconciledMutationCount"), diff --git a/src/java/org/apache/cassandra/replication/MutationTrackingService.java b/src/java/org/apache/cassandra/replication/MutationTrackingService.java index db87a88f010..b4a11e98744 100644 --- a/src/java/org/apache/cassandra/replication/MutationTrackingService.java +++ b/src/java/org/apache/cassandra/replication/MutationTrackingService.java @@ -65,13 +65,17 @@ import org.apache.cassandra.dht.Splitter; import org.apache.cassandra.dht.Token; import org.apache.cassandra.exceptions.RequestFailure; +import org.apache.cassandra.exceptions.RequestFailureReason; import org.apache.cassandra.gms.FailureDetector; import org.apache.cassandra.io.sstable.format.SSTableReader; import org.apache.cassandra.locator.EndpointsForRange; import org.apache.cassandra.locator.InetAddressAndPort; import org.apache.cassandra.metrics.MutationTrackingMetrics; import org.apache.cassandra.net.Message; +import org.apache.cassandra.net.MessageFlag; import org.apache.cassandra.net.MessagingService; +import org.apache.cassandra.net.NoPayload; +import org.apache.cassandra.net.RequestCallback; import org.apache.cassandra.net.Verb; import org.apache.cassandra.repair.SyncTask; import org.apache.cassandra.repair.SyncTasks; @@ -89,6 +93,7 @@ import org.apache.cassandra.tcm.membership.NodeId; import org.apache.cassandra.tcm.ownership.ReplicaGroups; import org.apache.cassandra.tcm.ownership.VersionedEndpoints; +import org.apache.cassandra.utils.Clock; import org.apache.cassandra.utils.FBUtilities; import org.apache.cassandra.utils.MBeanWrapper; @@ -255,6 +260,8 @@ private synchronized void startInternal(Function lastRequestedAt = new ConcurrentHashMap<>(); + void start() { scheduleNext(); @@ -1492,16 +1533,28 @@ private void runAndReschedule() void run() { - MutationTrackingService.instance().forEachKeyspace(this::run); - } + long now = Clock.Global.nanoTime(); + long cooldownNanos = TimeUnit.MILLISECONDS.toNanos(config.background_reconciliation_request_cooldown.toMilliseconds()); - private void run(KeyspaceShards shards) - { if (config.background_reconciliation_enabled) - shards.forEachShard(this::run); + { + MutationTrackingService.instance() + .forEachKeyspace(shards -> shards.forEachShard(shard -> run(shard, now, cooldownNanos))); + } + else if (!lastRequestedAt.isEmpty()) + { + // When the background reconciliation is disabled, we clean up any pending + // requests being tracked for the cool down period + lastRequestedAt.clear(); + } + + if (!lastRequestedAt.isEmpty()) + { + lastRequestedAt.values().removeIf(timestamp -> now - timestamp > cooldownNanos); + } } - private void run(Shard shard) + private void run(Shard shard, long now, long cooldownNanos) { try { @@ -1510,8 +1563,16 @@ private void run(Shard shard) for (Offsets.Immutable offsets : missing) { - // Prefer pulling from the coordinator - int coordinatorHostId = offsets.logId().hostId(); + CoordinatorLogId logId = offsets.logId(); + + Long lastRequested = lastRequestedAt.get(logId); + if (lastRequested != null && now - lastRequested < cooldownNanos) + { + MutationTrackingMetrics.instance().backgroundPullRequestsSuppressed.inc(); + continue; + } + + int coordinatorHostId = logId.hostId(); InetAddressAndPort coordinator = ClusterMetadata.current().directory.endpoint(new NodeId(coordinatorHostId)); InetAddressAndPort pullFrom = FailureDetector.instance.isAlive(coordinator) ? coordinator @@ -1523,10 +1584,12 @@ private void run(Shard shard) continue; // No reachable source } - // TODO (expected): backoff, rate limits, per host and total PullMutationsRequest request = new PullMutationsRequest(offsets, ActiveLogReconciler.Priority.REGULAR); logger.trace("Requesting pull mutation request from replica {} for missing offset {}", pullFrom, offsets); - MessagingService.instance().send(Message.out(Verb.MT_PULL_MUTATIONS_REQ, request), pullFrom); + Message message = Message.outWithFlag(Verb.MT_PULL_MUTATIONS_REQ, request, MessageFlag.CALL_BACK_ON_FAILURE); + MessagingService.instance().sendWithCallback(message, pullFrom, new PullRequestFailureCallback(logId, now)); + lastRequestedAt.put(logId, now); + MutationTrackingMetrics.instance().backgroundPullRequestsSent.inc(); } } catch (Throwable throwable) @@ -1550,6 +1613,49 @@ private InetAddressAndPort findAliveReplica(Shard shard, int excludeHostId) } return null; } + + /** + * MT_PULL_MUTATIONS_REQ is a one-way verb (no response). The callback registered with + * {@link MessagingService#sendWithCallback} fires on send-layer failures (queue overload, + * serialization error, closed connection) and on the eventual TIMEOUT after + * {@code readTimeout} expires. We treat TIMEOUT as benign (a one-way verb never produces + * a response) and only react to genuine send failures by clearing the cooldown entry, + * so the next reconciliation tick can retry without waiting for the cooldown to expire. + */ + private final class PullRequestFailureCallback implements RequestCallback + { + private final CoordinatorLogId logId; + private final long sentAt; + + PullRequestFailureCallback(CoordinatorLogId logId, long sentAt) + { + this.logId = logId; + this.sentAt = sentAt; + } + + @Override + public void onResponse(Message msg) + { + // MT_PULL_MUTATIONS_REQ is one-way; no response is expected. + } + + @Override + public boolean invokeOnFailure() + { + return true; + } + + @Override + public void onFailure(InetAddressAndPort from, RequestFailure failure) + { + if (failure.reason == RequestFailureReason.TIMEOUT) + return; // expected for a one-way verb + // Only remove if our entry is still the one in place; a newer reconcile may have + // already overwritten it. + lastRequestedAt.remove(logId, sentAt); + MutationTrackingMetrics.instance().backgroundPullRequestsFailed.inc(); + } + } } // TODO (later): a more intelligent heuristic for offsets included in broadcasts diff --git a/src/java/org/apache/cassandra/replication/MutationTrackingServiceMBean.java b/src/java/org/apache/cassandra/replication/MutationTrackingServiceMBean.java index a64f09da2be..9d3b5f8b57d 100644 --- a/src/java/org/apache/cassandra/replication/MutationTrackingServiceMBean.java +++ b/src/java/org/apache/cassandra/replication/MutationTrackingServiceMBean.java @@ -46,4 +46,17 @@ public interface MutationTrackingServiceMBean * @return the interval, in milliseconds, in which the background reconciliation runs when enabled */ long getMutationTrackingBackgroundReconciliationIntervalMilliseconds(); + + /** + * Sets the minimum delay (in milliseconds) between successive pull requests issued by + * background reconciliation for the same coordinator log. + * + * @param cooldownMilliseconds the cooldown value in milliseconds + */ + void setMutationTrackingBackgroundReconciliationRequestCooldownMilliseconds(long cooldownMilliseconds); + + /** + * @return the per-coordinator-log request cooldown, in milliseconds, used by background reconciliation + */ + long getMutationTrackingBackgroundReconciliationRequestCooldownMilliseconds(); } diff --git a/src/java/org/apache/cassandra/tools/NodeProbe.java b/src/java/org/apache/cassandra/tools/NodeProbe.java index c4d0d174005..1d4c84c5847 100644 --- a/src/java/org/apache/cassandra/tools/NodeProbe.java +++ b/src/java/org/apache/cassandra/tools/NodeProbe.java @@ -2879,6 +2879,16 @@ public void setMutationTrackingBackgroundReconciliationIntervalMilliseconds(long { mutationTrackingProxy.setMutationTrackingBackgroundReconciliationIntervalMilliseconds(intervalMilliseconds); } + + public long getMutationTrackingBackgroundReconciliationRequestCooldownMilliseconds() + { + return mutationTrackingProxy.getMutationTrackingBackgroundReconciliationRequestCooldownMilliseconds(); + } + + public void setMutationTrackingBackgroundReconciliationRequestCooldownMilliseconds(long cooldownMilliseconds) + { + mutationTrackingProxy.setMutationTrackingBackgroundReconciliationRequestCooldownMilliseconds(cooldownMilliseconds); + } } class ColumnFamilyStoreMBeanIterator implements Iterator> diff --git a/src/java/org/apache/cassandra/tools/nodetool/MTAdmin.java b/src/java/org/apache/cassandra/tools/nodetool/MTAdmin.java index 5e8e3ea50ab..3c6d1c715ce 100644 --- a/src/java/org/apache/cassandra/tools/nodetool/MTAdmin.java +++ b/src/java/org/apache/cassandra/tools/nodetool/MTAdmin.java @@ -62,6 +62,7 @@ public void execute(NodeProbe probe) out.println("background_reconciliation_enabled: " + probe.getMutationTrackingBackgroundReconciliationEnabled()); out.println("background_reconciliation_interval_ms: " + probe.getMutationTrackingBackgroundReconciliationIntervalMilliseconds()); + out.println("background_reconciliation_request_cooldown_ms: " + probe.getMutationTrackingBackgroundReconciliationRequestCooldownMilliseconds()); } } @@ -73,7 +74,7 @@ public static class SetConfig extends AbstractCommand @Parameters(index = "0", arity = "0..1", description = { "Mutation tracking param type.", "Possible parameters: " + - "[background_reconciliation_enabled|background_reconciliation_interval_ms]" }) + "[background_reconciliation_enabled|background_reconciliation_interval_ms|background_reconciliation_request_cooldown_ms]" }) public String paramType; @Parameters(index = "1", description = "Mutation tracking param value", arity = "0..1") @@ -104,6 +105,9 @@ public void execute(NodeProbe probe) case "background_reconciliation_interval_ms": probe.setMutationTrackingBackgroundReconciliationIntervalMilliseconds(Long.parseLong(value)); break; + case "background_reconciliation_request_cooldown_ms": + probe.setMutationTrackingBackgroundReconciliationRequestCooldownMilliseconds(Long.parseLong(value)); + break; default: throw new IllegalArgumentException("Unknown parameter: " + type); } diff --git a/test/distributed/org/apache/cassandra/distributed/test/tracking/MutationTrackingTest.java b/test/distributed/org/apache/cassandra/distributed/test/tracking/MutationTrackingTest.java index b95be6c1464..c8eb9fb7415 100644 --- a/test/distributed/org/apache/cassandra/distributed/test/tracking/MutationTrackingTest.java +++ b/test/distributed/org/apache/cassandra/distributed/test/tracking/MutationTrackingTest.java @@ -20,6 +20,7 @@ import java.util.UUID; import java.util.concurrent.TimeUnit; +import java.util.function.LongSupplier; import org.junit.Assert; import org.junit.Ignore; @@ -38,6 +39,7 @@ import org.apache.cassandra.gms.Gossiper; import org.apache.cassandra.hints.HintsService; import org.apache.cassandra.locator.InetAddressAndPort; +import org.apache.cassandra.metrics.MutationTrackingMetrics; import org.apache.cassandra.metrics.StorageMetrics; import org.apache.cassandra.net.Verb; import org.apache.cassandra.replication.CoordinatorLogId; @@ -857,4 +859,125 @@ public void testBackgroundPullReconciliationWhenCoordinatorDown() throws Throwab assertEquals(0, numLogReconciliations(cluster.get(3))); } } + + @Test + public void testBackgroundReconciliationCooldown() throws Throwable + { + try (Cluster cluster = Cluster.build(3) + .withConfig(cfg -> cfg.with(Feature.NETWORK) + .with(Feature.GOSSIP) + .set("write_request_timeout", "1000ms")) + .start()) + { + cluster.schemaChange(withKeyspace("CREATE KEYSPACE %s WITH replication = " + + "{'class': 'SimpleStrategy', 'replication_factor': 3} " + + "AND replication_type='tracked'")); + cluster.schemaChange(withKeyspace("CREATE TABLE %s.tbl (k int PRIMARY KEY, v int)")); + + // 1. Create a missing offset on node 3 using the same recipe as + // testBackgroundPullReconciliation: partition node 3, write at QUORUM, then + // reconnect and broadcast offsets so node 3 learns about the gap. + cluster.filters().allVerbs().to(3).drop(); + cluster.filters().allVerbs().from(3).drop(); + for (int i = 1; i <= 2; i++) + cluster.get(i).runOnInstance(() -> Gossiper.instance.convict(InetAddressAndPort.getByNameUnchecked("127.0.0.3"), Double.MAX_VALUE)); + + for (int i = 1; i <= 3; i++) + cluster.get(i).runOnInstance(() -> { + MutationTrackingService.instance().pauseActiveReconciler(); + MutationTrackingService.instance().pauseBackgroundReconciler(); + }); + + awaitNodeDead(cluster.get(1), cluster.get(3)); + + cluster.coordinator(1).execute(withKeyspace("INSERT INTO %s.tbl (k, v) VALUES (1, 1)"), + ConsistencyLevel.QUORUM); + TimeUnit.SECONDS.sleep(1); + + cluster.filters().reset(); + awaitNodeAlive(cluster.get(1), cluster.get(3)); + awaitNodeAlive(cluster.get(3), cluster.get(1)); + + for (int i = 1; i <= 3; i++) + cluster.get(i).runOnInstance(() -> MutationTrackingService.instance().broadcastOffsetsForTesting()); + + // 2. Use a short request cooldown on node 3 so the test is fast, and keep it + // distinct from the schedule interval so the cooldown is actually exercised + // (not aliased onto the scheduling cadence). + // + // NOTE: we deliberately leave the active reconciler PAUSED on node 1 (and 2) + // so the pull requests we send from node 3 are never served — the missing + // offsets stay missing across phases, and each reconcileForTesting() that + // isn't suppressed by the cooldown produces a fresh outbound pull request. + long cooldownMs = 500; + cluster.get(3).runOnInstance(() -> + MutationTrackingService.instance().setMutationTrackingBackgroundReconciliationRequestCooldownMilliseconds(cooldownMs)); + + // 3. Use the BackgroundPullRequestsSent metric on node 3 as the source of truth + // for outbound pull request counts — it's incremented in lock-step with each + // actual send and avoids needing a separate counting message filter. + IInvokableInstance node3 = cluster.get(3); + LongSupplier sentCount = () -> + node3.callOnInstance(() -> MutationTrackingMetrics.instance().backgroundPullRequestsSent.getCount()); + + // === Phase 1: rapid-fire dedup === + // Two reconcileForTesting calls back-to-back within the cooldown window must + // produce only ONE outbound pull request. + node3.runOnInstance(() -> { + MutationTrackingService.instance().resumeBackgroundReconciler(); + MutationTrackingService.instance().reconcileForTesting(); + MutationTrackingService.instance().reconcileForTesting(); + MutationTrackingService.instance().pauseBackgroundReconciler(); + }); + TimeUnit.MILLISECONDS.sleep(200); + assertEquals("Rapid-fire dedup: only the first reconcile should send a request", + 1L, sentCount.getAsLong()); + // Verify the suppression code path was actually taken on the second call. + long suppressedAfterPhase1 = node3.callOnInstance(() -> + MutationTrackingMetrics.instance().backgroundPullRequestsSuppressed.getCount()); + assertTrue("Cooldown suppression metric should advance when a duplicate reconcile is suppressed", + suppressedAfterPhase1 >= 1); + + // === Phase 2: cooldown expires, allowing a fresh request === + // Sleep longer than the configured cooldown. While paused, the scheduled task + // fires and exercises the disable-clear branch (and the time-based removeIf), + // so the previous entry is gone by the time we manually reconcile again. + TimeUnit.MILLISECONDS.sleep(cooldownMs + 200); + node3.runOnInstance(() -> { + MutationTrackingService.instance().resumeBackgroundReconciler(); + MutationTrackingService.instance().reconcileForTesting(); + MutationTrackingService.instance().pauseBackgroundReconciler(); + }); + assertEquals("Post-cooldown: a fresh reconcile should send a request once the cooldown elapses", + 2L, sentCount.getAsLong()); + + // === Phase 3: disabling reconciliation clears tracked state === + // Phase 2 left a fresh entry in lastRequestedAt. Without sleeping past the + // cooldown, invoking run() while disabled takes the disable-clear branch and + // wipes the map. The next reconcile (after re-enabling) should then send a + // request even though we are still inside Phase 2's cooldown window. + node3.runOnInstance(() -> { + // Already paused at the end of Phase 2 — explicitly run() to exercise the + // disable-clear branch with the entry still inside the cooldown window. + MutationTrackingService.instance().reconcileForTesting(); + MutationTrackingService.instance().resumeBackgroundReconciler(); + MutationTrackingService.instance().reconcileForTesting(); + MutationTrackingService.instance().pauseBackgroundReconciler(); + }); + TimeUnit.MILLISECONDS.sleep(200); + // Without the disable-clear branch, the Phase 2 entry would still be within + // its cooldown and would suppress this reconcile. + assertEquals("Disabling should clear tracked-request state and allow a fresh send within the cooldown window", + 3L, sentCount.getAsLong()); + + // Sanity: the happy path should never count a send-layer failure. The callback + // we install on each pull request only counts non-TIMEOUT failures (queue overload, + // serialization, closed connection); the eventual TIMEOUT for an unanswered one-way + // request is benign and must not advance this counter. + long failedAtEnd = node3.callOnInstance(() -> + MutationTrackingMetrics.instance().backgroundPullRequestsFailed.getCount()); + assertEquals("Happy-path test should not record any send-layer failures", + 0, failedAtEnd); + } + } }