From 6e5e2b408b705bbfe06903eb649d1ef2005dc717 Mon Sep 17 00:00:00 2001 From: Jon Meredith Date: Mon, 30 Mar 2026 17:20:58 -0600 Subject: [PATCH 1/4] Move long running TCM operations to a longer timout Replaces the fixed-retry commit loop with deadline-based exponential backoff for long running CMS commit operations (cms_commit_timeout=1h, 5s-60s jitter) to allow heavily contended CMS nodes time to commit transforms. Patch by Jon Meredith and Sam Tunnicliffe; reviewed by Jon Meredith and Sam Tunnicliffe for CASSANDRA-21453 Co-authored-by: Sam Tunnicliffe --- .../org/apache/cassandra/config/Config.java | 45 ++++ .../cassandra/config/DatabaseDescriptor.java | 51 ++++ src/java/org/apache/cassandra/net/Verb.java | 1 + .../cassandra/service/RetryStrategy.java | 2 + .../cassandra/tcm/AbstractLocalProcessor.java | 23 +- .../apache/cassandra/tcm/CMSOperations.java | 62 +++++ .../cassandra/tcm/CMSOperationsMBean.java | 26 +- .../apache/cassandra/tcm/ClusterMetadata.java | 13 + .../cassandra/tcm/ClusterMetadataService.java | 73 +++++- src/java/org/apache/cassandra/tcm/Commit.java | 34 ++- .../org/apache/cassandra/tcm/Processor.java | 40 --- .../apache/cassandra/tcm/RemoteProcessor.java | 22 +- src/java/org/apache/cassandra/tcm/Retry.java | 23 ++ .../tcm/sequences/BootstrapAndJoin.java | 4 +- .../tcm/sequences/BootstrapAndReplace.java | 4 +- .../apache/cassandra/tcm/sequences/Move.java | 3 + .../tcm/sequences/UnbootstrapAndLeave.java | 4 +- .../InProgressSequenceCoordinationTest.java | 7 +- .../test/tcm/LostCommitReqResTest.java | 33 ++- ...teCmsCommitTimeoutDifferentiationTest.java | 240 +++++++++++++++++ .../cassandra/tcm/CommitTimeoutPathsTest.java | 242 ++++++++++++++++++ .../org/apache/cassandra/tcm/RetryTest.java | 111 ++++++-- 22 files changed, 971 insertions(+), 92 deletions(-) create mode 100644 test/distributed/org/apache/cassandra/distributed/test/tcm/RemoteCmsCommitTimeoutDifferentiationTest.java create mode 100644 test/unit/org/apache/cassandra/tcm/CommitTimeoutPathsTest.java diff --git a/src/java/org/apache/cassandra/config/Config.java b/src/java/org/apache/cassandra/config/Config.java index 82ee06480298..5a45073ac548 100644 --- a/src/java/org/apache/cassandra/config/Config.java +++ b/src/java/org/apache/cassandra/config/Config.java @@ -183,6 +183,18 @@ public static Set splitCommaDelimited(String src) public volatile DurationSpec.LongMillisecondsBound stream_transfer_task_timeout = new DurationSpec.LongMillisecondsBound("12h"); + /** + * Timeout for the per-message window when a non-CMS node sends a TCM_COMMIT_REQ to a CMS node. + * The CMS will attempt Paxos retries for (cms_await_timeout - write_request_timeout) before + * returning an explicit failure, giving the sender time to reschedule before its message callback fires. + * + * WARNING: cms_await_timeout should be substantially larger than write_request_timeout. + * A single Paxos CAS attempt can take up to (cas_contention_timeout + write_request_timeout) to + * complete. If cms_await_timeout is set close to write_request_timeout the deadline reduction has + * no effect and many concurrent CMS operations timing out at the same time may create a thundering herd, + * all retrying against the CMS. + * Default 2 minutes to match "nodetool cms initialize". + */ public volatile DurationSpec.LongMillisecondsBound cms_await_timeout = new DurationSpec.LongMillisecondsBound("120000ms"); public volatile int cms_default_max_retries = 10; @Deprecated(since="6.0") @@ -192,6 +204,39 @@ public static Set splitCommaDelimited(String src) public String cms_retry_delay = "50ms*attempts <= 500ms ... 100ms*attempts <= 1s,retries=10"; public volatile CMSCommitMemberPreferencePolicy cms_commit_member_preference_policy = CMSCommitMemberPreferencePolicy.random; + /** + * Controls the sender-side retry behavior for CMS commits (topology changes, + * CMS reconfiguration, node registration — everything except STARTUP and schema DDL + * with an explicit client deadline). + * + * cms_commit_timeout: Overall deadline for the commit to succeed. The sender retries + * with exponential backoff until this deadline expires. Each retry sends a fresh + * TCM_COMMIT_REQ to a (possibly different) CMS node. Set this longer than the + * expected total time for all concurrent operations to drain through the Paxos log. + * + * cms_commit_retry_initial_delay: Base delay for Full Jitter exponential backoff. + * Actual delay per retry = uniform_random(0, min(max_delay, initial_delay * 2^attempt)). + * Higher values reduce Paxos contention at the cost of slower progress when the log + * is lightly loaded. 5s is a good default — it spaces retries enough to avoid + * thundering herd while still making progress within minutes. + * + * cms_commit_retry_max_delay: Cap on the exponential backoff. Once 2^attempt growth + * exceeds this, all subsequent retries draw from uniform_random(0, max_delay). + * 60s keeps retries frequent enough that a freed Paxos slot is filled within + * ~30s on average, while avoiding retry storms. + * + * When to change: + * - If bulk topology ops complete but take too long: reduce max_delay (e.g. 30s) + * to retry more aggressively. Monitor CommitRetries rate for contention impact. + * - If bulk topology ops fail (timeout): increase cms_commit_timeout. + * - If Paxos contention is extremely high (>100 concurrent commits): increase + * initial_delay to 10-15s to spread the retry wavefront. + * - All three are hot-settable via JMX without restart. + */ + public volatile DurationSpec.LongMillisecondsBound cms_commit_timeout = new DurationSpec.LongMillisecondsBound("1h"); + public volatile DurationSpec.LongMillisecondsBound cms_commit_retry_initial_delay = new DurationSpec.LongMillisecondsBound("5s"); + public volatile DurationSpec.LongMillisecondsBound cms_commit_retry_max_delay = new DurationSpec.LongMillisecondsBound("60s"); + public volatile int epoch_aware_debounce_inflight_tracker_max_size = 100; /** diff --git a/src/java/org/apache/cassandra/config/DatabaseDescriptor.java b/src/java/org/apache/cassandra/config/DatabaseDescriptor.java index 1f818a827606..fe855721598e 100644 --- a/src/java/org/apache/cassandra/config/DatabaseDescriptor.java +++ b/src/java/org/apache/cassandra/config/DatabaseDescriptor.java @@ -6167,6 +6167,15 @@ public static Config.CMSCommitMemberPreferencePolicy getCmsCommitMemberPreferenc return conf.cms_commit_member_preference_policy; } + public static void setCmsAwaitTimeout(long timeoutInMillis) + { + if (timeoutInMillis != conf.cms_await_timeout.to(TimeUnit.MILLISECONDS)) + { + logger.info("Setting cms_await_timeout to {}ms", timeoutInMillis); + conf.cms_await_timeout = new DurationSpec.LongMillisecondsBound(timeoutInMillis); + } + } + public static void setCmsCommitMemberPreferencePolicy(Config.CMSCommitMemberPreferencePolicy policy) { conf.cms_commit_member_preference_policy = policy; @@ -6177,6 +6186,48 @@ public static void setCmsCommitMemberPreferencePolicy(String policy) setCmsCommitMemberPreferencePolicy(Config.CMSCommitMemberPreferencePolicy.valueOf(toLowerCaseLocalized(policy))); } + public static DurationSpec getCmsCommitTimeout() + { + return conf.cms_commit_timeout; + } + + public static void setCmsCommitTimeout(long timeoutInMillis) + { + if (timeoutInMillis != conf.cms_commit_timeout.to(TimeUnit.MILLISECONDS)) + { + logger.info("Setting cms_commit_timeout to {}ms", timeoutInMillis); + conf.cms_commit_timeout = new DurationSpec.LongMillisecondsBound(timeoutInMillis); + } + } + + public static DurationSpec getCmsCommitRetryInitialDelay() + { + return conf.cms_commit_retry_initial_delay; + } + + public static void setCmsCommitRetryInitialDelay(long delayInMillis) + { + if (delayInMillis != conf.cms_commit_retry_initial_delay.to(TimeUnit.MILLISECONDS)) + { + logger.info("Setting cms_commit_retry_initial_delay to {}ms", delayInMillis); + conf.cms_commit_retry_initial_delay = new DurationSpec.LongMillisecondsBound(delayInMillis); + } + } + + public static DurationSpec getCmsCommitRetryMaxDelay() + { + return conf.cms_commit_retry_max_delay; + } + + public static void setCmsCommitRetryMaxDelay(long delayInMillis) + { + if (delayInMillis != conf.cms_commit_retry_max_delay.to(TimeUnit.MILLISECONDS)) + { + logger.info("Setting cms_commit_retry_max_delay to {}ms", delayInMillis); + conf.cms_commit_retry_max_delay = new DurationSpec.LongMillisecondsBound(delayInMillis); + } + } + public static int getEpochAwareDebounceInFlightTrackerMaxSize() { return conf.epoch_aware_debounce_inflight_tracker_max_size; diff --git a/src/java/org/apache/cassandra/net/Verb.java b/src/java/org/apache/cassandra/net/Verb.java index 1bb3b7a8900a..f0721fe997ff 100644 --- a/src/java/org/apache/cassandra/net/Verb.java +++ b/src/java/org/apache/cassandra/net/Verb.java @@ -301,6 +301,7 @@ public enum Verb // transactional cluster metadata TCM_COMMIT_RSP (801, P0, rpcTimeout, INTERNAL_METADATA, MessageSerializers::commitResultSerializer, RESPONSE_HANDLER ), + // message timeout is overridden in RemoteProcessor to cmsAwaitTimeout for non-client facing commit requests TCM_COMMIT_REQ (802, P0, rpcTimeout, INTERNAL_METADATA, MessageSerializers::commitSerializer, () -> commitRequestHandler(), TCM_COMMIT_RSP ), TCM_FETCH_CMS_LOG_RSP (803, P0, shortTimeout, FETCH_METADATA, MessageSerializers::logStateSerializer, RESPONSE_HANDLER ), TCM_FETCH_CMS_LOG_REQ (804, P0, rpcTimeout, FETCH_METADATA, () -> FetchCMSLog.serializer, () -> fetchLogRequestHandler(), TCM_FETCH_CMS_LOG_RSP ), diff --git a/src/java/org/apache/cassandra/service/RetryStrategy.java b/src/java/org/apache/cassandra/service/RetryStrategy.java index e11c507bf575..19f85a2c5b8f 100644 --- a/src/java/org/apache/cassandra/service/RetryStrategy.java +++ b/src/java/org/apache/cassandra/service/RetryStrategy.java @@ -241,6 +241,8 @@ public long computeWait(int attempt, TimeUnit units) if (min > maxMinMicros) min = maxMinMicros; long max = this.max.getMicros(attempt); + if (max > maxMaxMicros) + max = maxMaxMicros; result = min >= max ? min : waitRandomizer.wait(min, max, attempt); } diff --git a/src/java/org/apache/cassandra/tcm/AbstractLocalProcessor.java b/src/java/org/apache/cassandra/tcm/AbstractLocalProcessor.java index a245377f9171..41c7ebbeb503 100644 --- a/src/java/org/apache/cassandra/tcm/AbstractLocalProcessor.java +++ b/src/java/org/apache/cassandra/tcm/AbstractLocalProcessor.java @@ -29,8 +29,10 @@ import org.apache.cassandra.utils.FBUtilities; import org.apache.cassandra.utils.JVMStabilityInspector; +import static java.util.concurrent.TimeUnit.NANOSECONDS; import static org.apache.cassandra.exceptions.ExceptionCode.INVALID; import static org.apache.cassandra.exceptions.ExceptionCode.SERVER_ERROR; +import static org.apache.cassandra.utils.Clock.Global.nanoTime; public abstract class AbstractLocalProcessor implements Processor { @@ -50,6 +52,9 @@ public AbstractLocalProcessor(LocalLog log) @Override public final Commit.Result commit(Entry.Id entryId, Transformation transform, final Epoch lastKnown, Retry retryPolicy) { + String transformStr = transform.toString(); // convert once as idempotent and used in multiple logs + logger.debug("Starting local commit of {} with policy {}", transformStr, retryPolicy); + long commitStart = nanoTime(); while (!retryPolicy.hasExpired()) { ClusterMetadata previous = log.waitForHighestConsecutive(); @@ -66,7 +71,7 @@ public final Commit.Result commit(Entry.Id entryId, Transformation transform, fi Transformation.Result result; if (!transform.eligibleToCommit(previous)) { - result = new Transformation.Rejected(INVALID, "Transformation rejected, can't commit " + transform + + result = new Transformation.Rejected(INVALID, "Transformation rejected, can't commit " + transformStr + " it not supported with cluster common serialization version " + previous.directory.commonSerializationVersion + " and min/max serialization versions " + previous.directory.clusterMinVersion + "/" + previous.directory.clusterMaxVersion); } @@ -96,12 +101,18 @@ public final Commit.Result commit(Entry.Id entryId, Transformation transform, fi { Epoch nextEpoch = result.success().metadata.epoch; // If metadata applies, try committing it to the log + long casStart = nanoTime(); boolean applied = tryCommitOne(entryId, transform, previous.epoch, nextEpoch); + long casElapsedUs = NANOSECONDS.toMicros(nanoTime() - casStart); + logger.debug("tryCommitOne for {} epoch {}->{}: applied={}, took {}us", + transform.kind(), previous.epoch, nextEpoch, applied, casElapsedUs); // Application here semantially means "succeeded in committing to the distributed log". if (applied) { - logger.info("Committed {}. New epoch is {}", transform, nextEpoch); + logger.info("Committed {}. New epoch is {}. Took {} attempts in {}us total.", + transformStr, nextEpoch, retryPolicy.attempts(), + NANOSECONDS.toMicros(nanoTime() - commitStart)); log.append(new Entry(entryId, nextEpoch, new Transformation.Executed(transform, result))); log.awaitAtLeast(nextEpoch); @@ -124,8 +135,12 @@ public final Commit.Result commit(Entry.Id entryId, Transformation transform, fi break; } } - return Commit.Result.failed(SERVER_ERROR, - String.format("Could not perform commit; policy %s gave up", retryPolicy)); + long remainingMillis = NANOSECONDS.toMillis(retryPolicy.remainingNanos()); + String failureMsg = String.format("Could not perform commit after %d attempts. Time remaining: %dms", + retryPolicy.attempts(), remainingMillis); + logger.debug("Commit {} failed in {}us total. {}", + transformStr, NANOSECONDS.toMicros(nanoTime() - commitStart), failureMsg); + return Commit.Result.failed(SERVER_ERROR, failureMsg); } public Commit.Result maybeFailure(Entry.Id entryId, Epoch lastKnown, Supplier orElse) diff --git a/src/java/org/apache/cassandra/tcm/CMSOperations.java b/src/java/org/apache/cassandra/tcm/CMSOperations.java index d2b18ce36122..460513fbf58b 100644 --- a/src/java/org/apache/cassandra/tcm/CMSOperations.java +++ b/src/java/org/apache/cassandra/tcm/CMSOperations.java @@ -27,6 +27,8 @@ import java.util.Map; import java.util.stream.Collectors; +import com.google.common.base.Preconditions; + import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -49,6 +51,7 @@ import org.apache.cassandra.utils.FBUtilities; import org.apache.cassandra.utils.MBeanWrapper; +import static java.util.concurrent.TimeUnit.MILLISECONDS; import static org.apache.cassandra.tcm.transformations.cms.PrepareCMSReconfiguration.needsReconfiguration; public class CMSOperations implements CMSOperationsMBean @@ -80,6 +83,65 @@ private CMSOperations(ClusterMetadataService cms) this.cms = cms; } + // TCM CMS await timeout + public long getCmsAwaitTimeoutMillis() + { + return DatabaseDescriptor.getCmsAwaitTimeout().to(MILLISECONDS); + } + + public void setCmsAwaitTimeoutMillis(long timeoutInMillis) + { + Preconditions.checkState(timeoutInMillis > 0); + DatabaseDescriptor.setCmsAwaitTimeout(timeoutInMillis); + } + + // CMS commit timeout with exponential backoff + public long getCmsCommitTimeoutMillis() + { + return DatabaseDescriptor.getCmsCommitTimeout().to(MILLISECONDS); + } + + public void setCmsCommitTimeoutMillis(long timeoutInMillis) + { + Preconditions.checkState(timeoutInMillis > 0); + DatabaseDescriptor.setCmsCommitTimeout(timeoutInMillis); + } + + public long getCmsCommitRetryInitialDelayMillis() + { + return DatabaseDescriptor.getCmsCommitRetryInitialDelay().to(MILLISECONDS); + } + + public void setCmsCommitRetryInitialDelayMillis(long delayInMillis) + { + Preconditions.checkState(delayInMillis > 0); + DatabaseDescriptor.setCmsCommitRetryInitialDelay(delayInMillis); + } + + public long getCmsCommitRetryMaxDelayMillis() + { + return DatabaseDescriptor.getCmsCommitRetryMaxDelay().to(MILLISECONDS); + } + + public void setCmsCommitRetryMaxDelayMillis(long delayInMillis) + { + Preconditions.checkState(delayInMillis > 0); + DatabaseDescriptor.setCmsCommitRetryMaxDelay(delayInMillis); + } + + @Override + public String getCmsCommitMemberPreferencePolicy() + { + return DatabaseDescriptor.getCmsCommitMemberPreferencePolicy().name(); + } + + @Override + public void setCmsCommitMemberPreferencePolicy(String policy) + { + DatabaseDescriptor.setCmsCommitMemberPreferencePolicy(policy); + logger.info("Set cms_commit_member_preference_policy to {}", policy); + } + @Override public void initializeCMS(List ignoredEndpoints) { diff --git a/src/java/org/apache/cassandra/tcm/CMSOperationsMBean.java b/src/java/org/apache/cassandra/tcm/CMSOperationsMBean.java index efdf28267e87..4f53270ab914 100644 --- a/src/java/org/apache/cassandra/tcm/CMSOperationsMBean.java +++ b/src/java/org/apache/cassandra/tcm/CMSOperationsMBean.java @@ -24,6 +24,30 @@ public interface CMSOperationsMBean { + // CMS await timeout + public long getCmsAwaitTimeoutMillis(); + public void setCmsAwaitTimeoutMillis(long timeoutInMillis); + + // CMS commit timeout with exponential backoff + public long getCmsCommitTimeoutMillis(); + public void setCmsCommitTimeoutMillis(long timeoutInMillis); + public long getCmsCommitRetryInitialDelayMillis(); + public void setCmsCommitRetryInitialDelayMillis(long delayInMillis); + public long getCmsCommitRetryMaxDelayMillis(); + public void setCmsCommitRetryMaxDelayMillis(long delayInMillis); + + /** Get the CMS commit member preference policy + * + * @return how to choose the cms member preference order for commits + */ + public String getCmsCommitMemberPreferencePolicy(); + + /** Update the CMS commit member preference policy + * + * @param policy see Config.CMSCommitMemberPreferencePolicy + */ + public void setCmsCommitMemberPreferencePolicy(String policy); + public void initializeCMS(List ignore); public void abortInitialization(String initiator); public void resumeReconfigureCms(); @@ -53,4 +77,4 @@ public interface CMSOperationsMBean public boolean getLegacyStateListenerSyncLocalUpdates(); public void setLegacyStateListenerSyncLocalUpdates(boolean sync); -} + } diff --git a/src/java/org/apache/cassandra/tcm/ClusterMetadata.java b/src/java/org/apache/cassandra/tcm/ClusterMetadata.java index 40dcb7fb4c41..be6d0bd212a9 100644 --- a/src/java/org/apache/cassandra/tcm/ClusterMetadata.java +++ b/src/java/org/apache/cassandra/tcm/ClusterMetadata.java @@ -62,6 +62,7 @@ import org.apache.cassandra.schema.KeyspaceParams; import org.apache.cassandra.schema.Keyspaces; import org.apache.cassandra.schema.ReplicationParams; +import org.apache.cassandra.schema.SchemaConstants; import org.apache.cassandra.schema.TableId; import org.apache.cassandra.service.accord.topology.AccordFastPath; import org.apache.cassandra.service.accord.topology.AccordStaleReplicas; @@ -221,7 +222,11 @@ public Set fullCMSMembers() return Collections.emptySet(); if (fullCMSEndpoints == null) + { + if (schema.maybeGetKeyspaceMetadata(SchemaConstants.METADATA_KEYSPACE_NAME).isEmpty()) + return Collections.emptySet(); this.fullCMSEndpoints = ImmutableSet.copyOf(placements.get(ReplicationParams.meta(this)).reads.byEndpoint().keySet()); + } return fullCMSEndpoints; } @@ -231,7 +236,11 @@ public Set fullCMSMemberIds() return Collections.emptySet(); if (fullCMSIds == null) + { + if (schema.maybeGetKeyspaceMetadata(SchemaConstants.METADATA_KEYSPACE_NAME).isEmpty()) + return Collections.emptySet(); this.fullCMSIds = placements.get(ReplicationParams.meta(this)).reads.byEndpoint().keySet().stream().map(directory::peerId).collect(toImmutableSet()); + } return fullCMSIds; } @@ -241,7 +250,11 @@ public EndpointsForRange fullCMSMembersAsReplicas() return EndpointsForRange.empty(MetaStrategy.entireRange); if (fullCMSReplicas == null) + { + if (schema.maybeGetKeyspaceMetadata(SchemaConstants.METADATA_KEYSPACE_NAME).isEmpty()) + return EndpointsForRange.empty(MetaStrategy.entireRange); fullCMSReplicas = placements.get(ReplicationParams.meta(this)).reads.forRange(MetaStrategy.entireRange).get(); + } return fullCMSReplicas; } diff --git a/src/java/org/apache/cassandra/tcm/ClusterMetadataService.java b/src/java/org/apache/cassandra/tcm/ClusterMetadataService.java index 6caea0001fc1..a61d0e56213d 100644 --- a/src/java/org/apache/cassandra/tcm/ClusterMetadataService.java +++ b/src/java/org/apache/cassandra/tcm/ClusterMetadataService.java @@ -57,6 +57,8 @@ import org.apache.cassandra.schema.DistributedSchema; import org.apache.cassandra.schema.Keyspaces; import org.apache.cassandra.schema.ReplicationParams; +import org.apache.cassandra.service.RetryStrategy; +import org.apache.cassandra.service.TimeoutStrategy; import org.apache.cassandra.service.accord.topology.AccordFastPath; import org.apache.cassandra.service.accord.topology.AccordStaleReplicas; import org.apache.cassandra.service.consensus.migration.ConsensusMigrationState; @@ -562,7 +564,6 @@ public void revertToEpoch(Epoch epoch) /** * dumps the cluster metadata at the given epoch, returns path to the generated file - * * if the given Epoch is EMPTY, we dump the current metadata * * @param epoch dump clustermetadata at this epoch @@ -637,19 +638,41 @@ public ClusterMetadata commit(Transformation transform) /** * Attempt to commit the transformation (with retries). - * + *

* Since we can not rely on reliability of the network or even the fact that the committing node will stay alive * for the duration of commit, we have to allow for subsequent discovery of the transformation effects, which can * be made visible either by replaying the log, or receiving the metadata snapshot. - * + *

* In other words, there is no reliable way to find out whether _this particular_ transformation has been executed * while we are allowing replay from snapshot, since even failure response from the CMS does not guarantee * Paxos re-proposal, which would place the transformation into the log during proposal _by some other_ CMS node. - * + *

* Protocol does foresee the concept of EntryId that would allow discovery of the committed transformations * without changes to binary protocol, but this change was left out from the initial implementation of TCM. + *

+ * TCM Commit Timeouts + *

+ * Timeout passed to the TCM Processor + * - STARTUP: retries indefinitely with Jitter(cms_default_retry_backoff 50ms) + * - SCHEMA_CHANGE: rpc_timeout + Jitter(cms_default_retry_backoff 50ms) - fast retry, fail fast for DDL + * - Everything else (LOCAL/CMS & REMOTE/non-CMS): cms_commit_timeout (1h) + ExponentialJitter(5s-60s) - full jitter exponential backoff + *

+ * The Processor changes behavior based on whether the node is a member of the CMS or not. + * For non-CMS members (REMOTE state / RemoteProcessor): + * - Sends the commit with a TCM_COMMIT_REQ message + * - Message expires after min(cms_await_timeout, remaining sender deadline) to account for CMS node failures. + * - On failure/timeout retries use exponential backoff with full jitter to decorrelate cms await timeouts. + * For CMS members (LOCAL state / AbstractLocalProcessor) for local TCM commits: + * - The main outer retry policy described below for TCM_COMMIT_REQ is used without the message expiry, + * as the commit runs locally through Paxos without a remote message hop. + * For CMS members handling TCM_COMMIT_REQ messages (Commit.Handler): + * - Deadline is max(now + write_rpc_timeout, message.expiresAtNanos() - write_rpc_timeout), with exponential + * backoff and full jitter using the TCM admin initial/max delay. The floor guarantees at least one attempt + * window even when cms_await_timeout is misconfigured close to write_rpc_timeout. + * - The CMS member reduces its own retry deadline by write_rpc_timeout before the message expiry so it exhausts + * retries and returns an explicit failure before the sender's per-message callback fires (see Commit.Handler). * - * @param onFailure handler checks if rejection has resulted from a retry of the same trasformation. + * @param onFailure handler checks if rejection has resulted from a retry of the same trasformation. */ public T1 commit(Transformation transform, CommitSuccessHandler onSuccess, CommitFailureHandler onFailure) { @@ -662,7 +685,9 @@ public T1 commit(Transformation transform, CommitSuccessHandler onSucce // discover-own-commits via entry id in case of lost messages (in remote case) and Paxos re-proposals (in local case) Epoch highestConsecutive = log.waitForHighestConsecutive().epoch; - Commit.Result result = processor.commit(entryIdGen.get(), transform, highestConsecutive); + Retry retryPolicy = getRetryPolicy(transform.kind()); + logger.info("Committing {} with {}", transform.kind(), retryPolicy); + Commit.Result result = processor.commit(entryIdGen.get(), transform, highestConsecutive, retryPolicy); try { @@ -674,6 +699,9 @@ public T1 commit(Transformation transform, CommitSuccessHandler onSucce else { TCMMetrics.instance.recordCommitFailureLatency(nanoTime() - startTime, NANOSECONDS, result.failure().rejected); + logger.debug("Failed to commit {} after {} attempts ({}): {} {}", + transform.kind(), retryPolicy.attempts(), retryPolicy, + result.failure().code, result.failure().message); return onFailure.accept(result.failure().code, result.failure().message); } } @@ -687,6 +715,36 @@ public T1 commit(Transformation transform, CommitSuccessHandler onSucce } } + private static Retry getRetryPolicy(Transformation.Kind kind) + { + Retry retryPolicy; + if (kind == Transformation.Kind.STARTUP) + { + retryPolicy = Retry.unsafeRetryIndefinitely(); + } + else if (kind == Transformation.Kind.SCHEMA_CHANGE) + { + long deadlineNanos = nanoTime() + DatabaseDescriptor.getRpcTimeout(TimeUnit.NANOSECONDS); + retryPolicy = Retry.until(deadlineNanos, TCMMetrics.instance.commitRetries); + } + else + { + // On non-CMS members, which send commit requests via messaging to the CMS members, the exponential backoff + // with jitter works to desynchronize retry waves after a CMS await timeout. For CMS members committing + // locally, it helps to space Paxos CAS retries in the local commit loop. + long deadlineNanos = nanoTime() + DatabaseDescriptor.getCmsCommitTimeout().to(TimeUnit.NANOSECONDS); + long initialDelayMs = DatabaseDescriptor.getCmsCommitRetryInitialDelay().to(TimeUnit.MILLISECONDS); + long maxDelayMs = DatabaseDescriptor.getCmsCommitRetryMaxDelay().to(TimeUnit.MILLISECONDS); + // range of backoff wait time starts at 0ms backing off exponentially at initialDelayMs * 2^attempts + String spec = String.format("0ms ... %dms * 2^attempts <= %dms", initialDelayMs, maxDelayMs); + RetryStrategy backoffWithJitter = RetryStrategy.parse(spec, + TimeoutStrategy.LatencySourceFactory.none(), + RetryStrategy.randomizers.uniform()); + retryPolicy = Retry.until(deadlineNanos, TCMMetrics.instance.commitRetries, backoffWithJitter); + } + return retryPolicy; + } + /** * Accessors */ @@ -760,7 +818,6 @@ public ClusterMetadata metadata() /** * Fetches log entries from directly from CMS, at least to the specified epoch. - * * This operation is blocking and also waits for all retrieved log entries to be * enacted, so on return all transformations to ClusterMetadata will be visible. * @return metadata with all currently committed entries enacted. @@ -814,9 +871,7 @@ public Future fetchLogFromPeerAsync(InetAddressAndPort from, Ep } /** - * * IMPORTANT: this call can return _without_ catching us up, so should only be used privately. - * * Attempts to synchronously retrieve log entries from a non-CMS peer. * Fetches the log state representing the delta between the current local epoch and the one supplied. * This is to be used when a message from a peer contains an epoch higher than the current local epoch. As diff --git a/src/java/org/apache/cassandra/tcm/Commit.java b/src/java/org/apache/cassandra/tcm/Commit.java index 6baf00b6aaca..7550ce8ac452 100644 --- a/src/java/org/apache/cassandra/tcm/Commit.java +++ b/src/java/org/apache/cassandra/tcm/Commit.java @@ -19,6 +19,7 @@ package org.apache.cassandra.tcm; import java.io.IOException; +import java.util.concurrent.TimeUnit; import java.util.function.BiConsumer; import java.util.function.Supplier; @@ -29,6 +30,7 @@ import accord.utils.Invariants; +import org.apache.cassandra.config.DatabaseDescriptor; import org.apache.cassandra.db.TypeSizes; import org.apache.cassandra.exceptions.ExceptionCode; import org.apache.cassandra.io.IVersionedSerializer; @@ -40,6 +42,8 @@ import org.apache.cassandra.net.Message; import org.apache.cassandra.net.MessagingService; import org.apache.cassandra.net.Verb; +import org.apache.cassandra.service.RetryStrategy; +import org.apache.cassandra.service.TimeoutStrategy; import org.apache.cassandra.tcm.log.Entry; import org.apache.cassandra.tcm.log.LogState; import org.apache.cassandra.tcm.membership.Directory; @@ -47,6 +51,7 @@ import org.apache.cassandra.tcm.membership.NodeVersion; import org.apache.cassandra.tcm.serialization.Version; import org.apache.cassandra.utils.FBUtilities; +import org.apache.cassandra.utils.MonotonicClock; import org.apache.cassandra.utils.vint.VIntCoding; import static org.apache.cassandra.tcm.ClusterMetadataService.State.LOCAL; @@ -371,7 +376,34 @@ public void doVerb(Message message) throws IOException { checkCMSState(); logger.info("Received commit request {} from {}", message.payload, message.from()); - Retry retryPolicy = Retry.until(message.expiresAtNanos(), TCMMetrics.instance.commitRetries); + // Reduce our local retry deadline by write_rpc_timeout so we exhaust retries and return an + // explicit failure response to the sender before their per-message callback fires to avoid + // all the non-CMS nodes being synchronized on their CMS await timeouts expiring at the same time. + // + // A single tryCommitOne (Paxos CAS) can overshoot its deadline by up to + // (cas_contention_timeout + write_rpc_timeout) because reachedMax() is only checked at the top + // of the AbstractLocalProcessor retry loop, not mid-CAS so subtracting write_rpc_timeout covers the + // overshoot and leaves a window for the failure response to propagate back over the network. + // + // Explicit failures allow non-CMS senders to immediately reschedule retries, replacing the + // thundering herd pattern (all senders TIMEOUT and retry simultaneously every ~cms_await_timeout) + // with fast individual retries paced by CMS response speed. + // + // The floor of casWriteRpcTimeoutNanos ensures at least one attempt window when cms_await_timeout is + // misconfigured close to write_rpc_timeout. + long casWriteRpcTimeoutNanos = DatabaseDescriptor.getCasContentionTimeout(TimeUnit.NANOSECONDS) + + DatabaseDescriptor.getWriteRpcTimeout(TimeUnit.NANOSECONDS); + long now = MonotonicClock.Global.preciseTime.now(); + long localDeadlineNanos = Math.max(now + casWriteRpcTimeoutNanos, + message.expiresAtNanos() - casWriteRpcTimeoutNanos); + long initialDelayMs = DatabaseDescriptor.getCmsCommitRetryInitialDelay().to(TimeUnit.MILLISECONDS); + long maxDelayMs = DatabaseDescriptor.getCmsCommitRetryMaxDelay().to(TimeUnit.MILLISECONDS); + + String spec = String.format("... %dms^attempts <= %dms", initialDelayMs, maxDelayMs); + RetryStrategy backoffWithJitter = RetryStrategy.parse(spec, + TimeoutStrategy.LatencySourceFactory.none(), + RetryStrategy.randomizers.uniform()); + Retry retryPolicy = Retry.until(localDeadlineNanos, TCMMetrics.instance.commitRetries, backoffWithJitter); Result result = processor.commit(message.payload.entryId, message.payload.transform, message.payload.lastKnown, retryPolicy); if (result.isSuccess()) { diff --git a/src/java/org/apache/cassandra/tcm/Processor.java b/src/java/org/apache/cassandra/tcm/Processor.java index b93adfcd5bb1..3deaa96b3cce 100644 --- a/src/java/org/apache/cassandra/tcm/Processor.java +++ b/src/java/org/apache/cassandra/tcm/Processor.java @@ -18,14 +18,7 @@ package org.apache.cassandra.tcm; -import com.codahale.metrics.Meter; - -import org.apache.cassandra.config.DatabaseDescriptor; -import org.apache.cassandra.config.DurationSpec; import org.apache.cassandra.metrics.TCMMetrics; -import org.apache.cassandra.service.RetryStrategy; -import org.apache.cassandra.service.TimeoutStrategy; -import org.apache.cassandra.service.WaitStrategy; import org.apache.cassandra.tcm.log.Entry; import static java.util.concurrent.TimeUnit.NANOSECONDS; @@ -33,39 +26,6 @@ public interface Processor { - /** - * Method is _only_ responsible to commit the transformation to the cluster metadata. Implementers _have to ensure_ - * local visibility and enactment of the metadata! - */ - default Commit.Result commit(Entry.Id entryId, Transformation transform, Epoch lastKnown) - { - // When the cluster is bounced, it may happen that regular nodes come up earlier than CMS nodes, or CMS - // nodes come up and fail to finish the startup since other CMS nodes are not up yet, and therefore can not - // submit the STARTUP message. This allows the bounces affecting majority of CMS nodes to finish successfully. - if (transform.kind() == Transformation.Kind.STARTUP) - { - return commit(entryId, transform, lastKnown, unsafeRetryIndefinitely()); - } - - return commit(entryId, transform, lastKnown, - Retry.untilElapsed(getCmsAwaitTimeout().to(NANOSECONDS), TCMMetrics.instance.commitRetries)); - } - - /** - * To be used only when submitting a STARTUP transformation when a node is restarted with a new set of addresses or - * running a new release version. - */ - static Retry unsafeRetryIndefinitely() - { - Meter retryMeter = TCMMetrics.instance.commitRetries; - DurationSpec.IntMillisecondsBound defaultBackoff = DatabaseDescriptor.getDefaultRetryBackoff(); - DurationSpec.IntMillisecondsBound defaultMaxBackoff = DatabaseDescriptor.getDefaultMaxRetryBackoff(); - String spec = (defaultBackoff == null ? "100ms" : defaultBackoff.toMilliseconds() + "ms") - + "*attempts <=" + (defaultMaxBackoff == null ? "10s" : defaultMaxBackoff.toMilliseconds() + "ms"); - WaitStrategy wait = RetryStrategy.parse(spec, TimeoutStrategy.LatencySourceFactory.none()); - return Retry.withNoTimeLimit(retryMeter, wait); - } - Commit.Result commit(Entry.Id entryId, Transformation transform, Epoch lastKnown, Retry retryPolicy); /** diff --git a/src/java/org/apache/cassandra/tcm/RemoteProcessor.java b/src/java/org/apache/cassandra/tcm/RemoteProcessor.java index 69830b64da04..7cfc1965ac41 100644 --- a/src/java/org/apache/cassandra/tcm/RemoteProcessor.java +++ b/src/java/org/apache/cassandra/tcm/RemoteProcessor.java @@ -57,8 +57,8 @@ import org.apache.cassandra.tcm.log.LogState; import org.apache.cassandra.tcm.membership.Location; import org.apache.cassandra.utils.AbstractIterator; -import org.apache.cassandra.utils.Clock; import org.apache.cassandra.utils.FBUtilities; +import org.apache.cassandra.utils.MonotonicClock; import org.apache.cassandra.utils.concurrent.AsyncPromise; import org.apache.cassandra.utils.concurrent.Future; import org.apache.cassandra.utils.concurrent.Promise; @@ -305,8 +305,24 @@ else if (retry.hasExpired()) } InetAddressAndPort candidate = candidates.next(); - long waitNanos = Math.min(verb.expiresAfterNanos(), Math.max(0, retry.remainingNanos())); - Message msg = Message.outWithFlag(verb, request, MessageFlag.CALL_BACK_ON_FAILURE, Clock.Global.nanoTime() + waitNanos); + long msgExpiresAfterNanos; + if (verb == Verb.TCM_COMMIT_REQ) + { + long cmsAwaitNanos = DatabaseDescriptor.getCmsAwaitTimeout().to(TimeUnit.NANOSECONDS); + long remainingNanos = retry.remainingNanos(); + msgExpiresAfterNanos = Math.min(cmsAwaitNanos, remainingNanos); + logger.debug("Sending {} to {}, per-message expiry {}ms (cmsAwait={}ms, remaining={}ms)", + verb, candidate, + TimeUnit.NANOSECONDS.toMillis(msgExpiresAfterNanos), + TimeUnit.NANOSECONDS.toMillis(cmsAwaitNanos), + TimeUnit.NANOSECONDS.toMillis(remainingNanos)); + } + else + { + msgExpiresAfterNanos = verb.expiresAfterNanos(); + } + long msgExpiresAtNanos = MonotonicClock.Global.preciseTime.now() + msgExpiresAfterNanos; + Message msg = Message.outWithFlag(verb, request, MessageFlag.CALL_BACK_ON_FAILURE, msgExpiresAtNanos); MessagingService.instance().sendWithCallback(msg, candidate, new RequestCallbackWithFailure() { @Override diff --git a/src/java/org/apache/cassandra/tcm/Retry.java b/src/java/org/apache/cassandra/tcm/Retry.java index be79f10f268b..7b46029060bf 100644 --- a/src/java/org/apache/cassandra/tcm/Retry.java +++ b/src/java/org/apache/cassandra/tcm/Retry.java @@ -24,7 +24,9 @@ import org.apache.cassandra.config.DatabaseDescriptor; import org.apache.cassandra.config.DurationSpec; +import org.apache.cassandra.metrics.TCMMetrics; import org.apache.cassandra.service.RetryStrategy; +import org.apache.cassandra.service.TimeoutStrategy; import org.apache.cassandra.service.TimeoutStrategy.LatencySourceFactory; import org.apache.cassandra.service.WaitStrategy; @@ -35,6 +37,7 @@ public class Retry implements WaitStrategy { private static final WaitStrategy DEFAULT_STRATEGY; + private static final Retry RETRY_INDEFINITELY; static { DurationSpec.IntMillisecondsBound defaultBackoff = DatabaseDescriptor.getDefaultRetryBackoff(); @@ -47,6 +50,12 @@ public class Retry implements WaitStrategy + ",retries=" + DatabaseDescriptor.getCmsDefaultRetryMaxTries(); } DEFAULT_STRATEGY = RetryStrategy.parse(defaultSpec, LatencySourceFactory.none()); + + Meter retryMeter = TCMMetrics.instance.commitRetries; + String spec = (defaultBackoff == null ? "100ms" : defaultBackoff.toMilliseconds() + "ms") + + "*attempts <=" + (defaultMaxBackoff == null ? "10s" : defaultMaxBackoff.toMilliseconds() + "ms"); + WaitStrategy wait = RetryStrategy.parse(spec, TimeoutStrategy.LatencySourceFactory.none()); + RETRY_INDEFINITELY = Retry.withNoTimeLimit(retryMeter, wait); } public final long deadlineNanos; @@ -61,6 +70,15 @@ public Retry(long deadlineNanos, Meter retryMeter, WaitStrategy delegate) this.delegate = delegate; } + /** + * To be used only when submitting a STARTUP transformation when a node is restarted with a new set of addresses or + * running a new release version. + */ + static Retry unsafeRetryIndefinitely() + { + return RETRY_INDEFINITELY; + } + public Retry(long deadlineNanos, Meter retryMeter) { this(deadlineNanos, retryMeter, DEFAULT_STRATEGY); @@ -142,6 +160,11 @@ public static Retry until(long deadlineNanos, Meter retryMeter) return new Retry(deadlineNanos, retryMeter, DEFAULT_STRATEGY); } + public static Retry until(long deadlineNanos, Meter retryMeter, WaitStrategy waitStrategy) + { + return new Retry(deadlineNanos, retryMeter, waitStrategy); + } + public static Retry untilElapsed(long timeoutNanos, Meter retryMeter) { return new Retry(nanoTime() + timeoutNanos, retryMeter, DEFAULT_STRATEGY); diff --git a/src/java/org/apache/cassandra/tcm/sequences/BootstrapAndJoin.java b/src/java/org/apache/cassandra/tcm/sequences/BootstrapAndJoin.java index d2909bd8737b..43891d5abc1c 100644 --- a/src/java/org/apache/cassandra/tcm/sequences/BootstrapAndJoin.java +++ b/src/java/org/apache/cassandra/tcm/sequences/BootstrapAndJoin.java @@ -210,8 +210,8 @@ public SequenceState executeNext() } catch (Throwable e) { + logger.warn("Exception committing startJoin, will retry", e); JVMStabilityInspector.inspectThrowable(e); - logger.warn("Exception committing startJoin", e); return continuable(); } @@ -295,8 +295,8 @@ public SequenceState executeNext() } catch (Throwable e) { + logger.warn("Exception committing finishJoin, will retry", e); JVMStabilityInspector.inspectThrowable(e); - logger.warn("Exception committing finishJoin", e); return continuable(); } ClusterMetadataService.instance().ensureCMSPlacement(metadata); diff --git a/src/java/org/apache/cassandra/tcm/sequences/BootstrapAndReplace.java b/src/java/org/apache/cassandra/tcm/sequences/BootstrapAndReplace.java index 32afe234916d..552818a77443 100644 --- a/src/java/org/apache/cassandra/tcm/sequences/BootstrapAndReplace.java +++ b/src/java/org/apache/cassandra/tcm/sequences/BootstrapAndReplace.java @@ -204,8 +204,8 @@ public SequenceState executeNext() } catch (Throwable e) { + logger.warn("Exception committing startReplace, will retry", e); JVMStabilityInspector.inspectThrowable(e); - logger.warn("Got exception committing startReplace", e); return continuable(); } break; @@ -284,8 +284,8 @@ public SequenceState executeNext() } catch (Throwable e) { + logger.warn("Exception committing finishReplace, sequence will halt", e); JVMStabilityInspector.inspectThrowable(e); - logger.warn("Got exception committing finishReplace", e); return halted(); } ClusterMetadataService.instance().ensureCMSPlacement(metadata); diff --git a/src/java/org/apache/cassandra/tcm/sequences/Move.java b/src/java/org/apache/cassandra/tcm/sequences/Move.java index b4c55ba34878..cf81e5a3ce8f 100644 --- a/src/java/org/apache/cassandra/tcm/sequences/Move.java +++ b/src/java/org/apache/cassandra/tcm/sequences/Move.java @@ -303,6 +303,7 @@ public SequenceState executeNext() } catch (Throwable t) { + logger.warn("Exception committing startMove, will retry on next TCM epoch advance", t); JVMStabilityInspector.inspectThrowable(t); return continuable(); } @@ -384,6 +385,7 @@ else if (destination.isSelf()) } catch (Throwable t) { + logger.warn("Exception committing midMove, will retry on next TCM epoch advance", t); JVMStabilityInspector.inspectThrowable(t); return continuable(); } @@ -397,6 +399,7 @@ else if (destination.isSelf()) } catch (Throwable t) { + logger.warn("Exception committing finishMove, will retry on next TCM epoch advance", t); JVMStabilityInspector.inspectThrowable(t); return continuable(); } diff --git a/src/java/org/apache/cassandra/tcm/sequences/UnbootstrapAndLeave.java b/src/java/org/apache/cassandra/tcm/sequences/UnbootstrapAndLeave.java index deca30ab9231..876caa3d965c 100644 --- a/src/java/org/apache/cassandra/tcm/sequences/UnbootstrapAndLeave.java +++ b/src/java/org/apache/cassandra/tcm/sequences/UnbootstrapAndLeave.java @@ -183,6 +183,7 @@ public SequenceState executeNext() } catch (Throwable t) { + logger.warn("Exception committing startLeave, will retry", t); JVMStabilityInspector.inspectThrowable(t); return continuable(); } @@ -206,7 +207,7 @@ public SequenceState executeNext() } catch (Throwable t) { - logger.warn("Error committing midLeave", t); + logger.warn("Exception committing midLeave, will retry", t); JVMStabilityInspector.inspectThrowable(t); return continuable(); } @@ -219,6 +220,7 @@ public SequenceState executeNext() } catch (Throwable t) { + logger.warn("Exception committing finishLeave, will retry", t); JVMStabilityInspector.inspectThrowable(t); return continuable(); } diff --git a/test/distributed/org/apache/cassandra/distributed/test/log/InProgressSequenceCoordinationTest.java b/test/distributed/org/apache/cassandra/distributed/test/log/InProgressSequenceCoordinationTest.java index a365d1bc361b..ccb720a42da2 100644 --- a/test/distributed/org/apache/cassandra/distributed/test/log/InProgressSequenceCoordinationTest.java +++ b/test/distributed/org/apache/cassandra/distributed/test/log/InProgressSequenceCoordinationTest.java @@ -332,10 +332,15 @@ public void rejectSubsequentInProgressSequence() throws Throwable @Test public void inProgressSequenceRetryTest() throws Throwable { + // This test expects ~50% of TCM_COMMIT_REQ messages to be dropped, so s + // ignificantly lower the timeout, backoff, and retry params try (Cluster cluster = builder().withNodes(1) .withTokenSupplier(TokenSupplier.evenlyDistributedTokens(2)) .withNodeIdTopology(NetworkTopology.singleDcNetworkTopology(2, "dc0", "rack0")) - .withConfig((config) -> config.with(Feature.NETWORK, Feature.GOSSIP).set("request_timeout_in_ms", "1000")) + .withConfig((config) -> config.with(Feature.NETWORK, Feature.GOSSIP) + .set("cms_await_timeout", "100ms") + .set("cms_commit_retry_initial_delay", "10ms") + .set("cms_commit_retry_max_delay", "5000ms")) .start()) { cluster.filters() diff --git a/test/distributed/org/apache/cassandra/distributed/test/tcm/LostCommitReqResTest.java b/test/distributed/org/apache/cassandra/distributed/test/tcm/LostCommitReqResTest.java index dfa3512d09f2..a96d6fd93e60 100644 --- a/test/distributed/org/apache/cassandra/distributed/test/tcm/LostCommitReqResTest.java +++ b/test/distributed/org/apache/cassandra/distributed/test/tcm/LostCommitReqResTest.java @@ -33,11 +33,18 @@ public class LostCommitReqResTest extends TestBaseImpl { + @Test public void lostMoveCommitResponseTest() throws IOException { try (Cluster cluster = init(builder().withNodes(2) - .withConfig(c -> c.with(Feature.NETWORK, Feature.GOSSIP).set("cms_await_timeout", "1s").set("cms_default_max_retries", "5")) + .withConfig( + c -> c.with(Feature.NETWORK, Feature.GOSSIP) + .set("cms_await_timeout", "1s") + .set("cms_default_max_retries", "5") + .set("cms_commit_timeout", "3s") + .set("cms_commit_retry_initial_delay", "200ms") + .set("cms_commit_retry_max_delay", "1s")) .start())) { // no commit responses @@ -60,7 +67,13 @@ public void lostMoveCommitResponseTest() throws IOException public void lostMoveCommitRequestTest() throws IOException { try (Cluster cluster = init(builder().withNodes(2) - .withConfig(c -> c.with(Feature.NETWORK, Feature.GOSSIP).set("cms_await_timeout", "1s").set("cms_default_max_retries", "5")) + .withConfig( + c -> c.with(Feature.NETWORK, Feature.GOSSIP) + .set("cms_await_timeout", "1s") + .set("cms_default_max_retries", "5") + .set("cms_commit_timeout", "3s") + .set("cms_commit_retry_initial_delay", "200ms") + .set("cms_commit_retry_max_delay", "1s")) .start())) { // no commit requests @@ -81,7 +94,13 @@ public void lostMoveCommitRequestTest() throws IOException public void lostDecomCommitResponseTest() throws IOException { try (Cluster cluster = init(builder().withNodes(2) - .withConfig(c -> c.with(Feature.NETWORK, Feature.GOSSIP).set("cms_await_timeout", "1s").set("cms_default_max_retries", "5")) + .withConfig( + c -> c.with(Feature.NETWORK, Feature.GOSSIP) + .set("cms_await_timeout", "1s") + .set("cms_default_max_retries", "5") + .set("cms_commit_timeout", "3s") + .set("cms_commit_retry_initial_delay", "200ms") + .set("cms_commit_retry_max_delay", "1s")) .start())) { cluster.filters().verbs(Verb.TCM_COMMIT_RSP.id).from(1).to(2).drop(); @@ -99,7 +118,13 @@ public void lostDecomCommitResponseTest() throws IOException public void lostDecomCommitRequestTest() throws IOException { try (Cluster cluster = init(builder().withNodes(2) - .withConfig(c -> c.with(Feature.NETWORK, Feature.GOSSIP).set("cms_await_timeout", "1s").set("cms_default_max_retries", "5")) + .withConfig( + c -> c.with(Feature.NETWORK, Feature.GOSSIP) + .set("cms_await_timeout", "1s") + .set("cms_default_max_retries", "5") + .set("cms_commit_timeout", "3s") + .set("cms_commit_retry_initial_delay", "200ms") + .set("cms_commit_retry_max_delay", "1s")) .start())) { cluster.filters().verbs(Verb.TCM_COMMIT_REQ.id).from(2).to(1).drop(); diff --git a/test/distributed/org/apache/cassandra/distributed/test/tcm/RemoteCmsCommitTimeoutDifferentiationTest.java b/test/distributed/org/apache/cassandra/distributed/test/tcm/RemoteCmsCommitTimeoutDifferentiationTest.java new file mode 100644 index 000000000000..67b01ef1109b --- /dev/null +++ b/test/distributed/org/apache/cassandra/distributed/test/tcm/RemoteCmsCommitTimeoutDifferentiationTest.java @@ -0,0 +1,240 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.cassandra.distributed.test.tcm; + +import java.util.concurrent.TimeUnit; + +import org.junit.AfterClass; +import org.junit.Before; +import org.junit.BeforeClass; +import org.junit.Test; + +import org.apache.cassandra.distributed.Cluster; +import org.apache.cassandra.distributed.api.ConsistencyLevel; +import org.apache.cassandra.distributed.api.Feature; +import org.apache.cassandra.distributed.api.IInvokableInstance; +import org.apache.cassandra.distributed.shared.ClusterUtils; +import org.apache.cassandra.distributed.test.TestBaseImpl; +import org.apache.cassandra.net.Verb; +import org.apache.cassandra.service.StorageService; +import org.apache.cassandra.tcm.CMSOperations; +import org.apache.cassandra.tcm.ClusterMetadata; +import org.apache.cassandra.tcm.sequences.InProgressSequences; + +import static org.apache.cassandra.utils.Clock.Global.nanoTime; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; + +/** + * Tests for CMS commit timeout differentiation across admin, schema, and default commit paths. + * Uses a shared cluster to reduce setup time. Each test resets all hot-modifiable timeout + * properties to defaults and clears message filters before running. + */ +public class RemoteCmsCommitTimeoutDifferentiationTest extends TestBaseImpl +{ + private static Cluster CLUSTER; + + // Config defaults (matching Config.java field initializers) + private static final long DEFAULT_CMS_AWAIT_TIMEOUT_MS = 120_000L; + private static final long DEFAULT_CMS_COMMIT_TIMEOUT_MS = 3_600_000L; + private static final long DEFAULT_CMS_COMMIT_RETRY_INITIAL_DELAY_MS = 5_000L; + private static final long DEFAULT_CMS_COMMIT_RETRY_MAX_DELAY_MS = 60_000L; + + @BeforeClass + public static void beforeClass() throws Throwable + { + TestBaseImpl.beforeClass(); + CLUSTER = init(Cluster.build(2) + .withConfig(c -> c.with(Feature.NETWORK, Feature.GOSSIP)) + .start()); + } + + @AfterClass + public static void afterClass() + { + if (CLUSTER != null) + CLUSTER.close(); + } + + @Before + public void setUp() + { + // Clear all message filters + CLUSTER.filters().reset(); + + // Reset all hot-modifiable timeout properties to defaults on ALL instances + CLUSTER.forEach(i -> i.runOnInstance(() -> { + CMSOperations.instance.setCmsAwaitTimeoutMillis(DEFAULT_CMS_AWAIT_TIMEOUT_MS); + CMSOperations.instance.setCmsCommitTimeoutMillis(DEFAULT_CMS_COMMIT_TIMEOUT_MS); + CMSOperations.instance.setCmsCommitRetryInitialDelayMillis(DEFAULT_CMS_COMMIT_RETRY_INITIAL_DELAY_MS); + CMSOperations.instance.setCmsCommitRetryMaxDelayMillis(DEFAULT_CMS_COMMIT_RETRY_MAX_DELAY_MS); + })); + + // Verify all instances are up to date on TCM log + ClusterUtils.waitForCMSToQuiesce(CLUSTER, CLUSTER.get(1)); + + // Verify no pending topology change operations + CLUSTER.forEach(i -> i.runOnInstance(() -> { + InProgressSequences seqs = ClusterMetadata.current().inProgressSequences; + assertTrue("Expected no in-progress sequences on instance, but found some", seqs.isEmpty()); + })); + } + + // ==================== Admin timeout tests ==================== + + /** + * Move with dropped TCM_COMMIT_REQ fails near cms_commit_timeout (3s), not cms_await_timeout (120s). + * After failure, restores filter and verifies move can be cancelled. + */ + @Test + public void moveUsesAdminTimeout() + { + setOnAllInstances(3_000L, 200L, 1_000L); + + CLUSTER.filters().verbs(Verb.TCM_COMMIT_REQ.id).from(2).to(1).drop(); + + long start = nanoTime(); + CLUSTER.get(2).nodetoolResult("move", "1234").asserts().failure(); + long elapsedMs = TimeUnit.NANOSECONDS.toMillis(nanoTime() - start); + + assertTrue("Move should fail near cms_commit_timeout (3s), but took " + elapsedMs + "ms", + elapsedMs < 30_000); + assertMoveFailed(CLUSTER.get(2)); + + // Restore filter and cancel the failed move + CLUSTER.filters().reset(); + cancelInProgressSequencesOnNode(CLUSTER.get(2)); + } + + /** + * Schema DDL is NOT routed through admin timeout — it should fail near cms_await_timeout + * (set to 3s), well under the admin timeout (60s). + */ + @Test + public void schemaNotRoutedThroughAdminTimeout() + { + // Short cms_await_timeout, long admin timeout + CLUSTER.forEach(i -> i.runOnInstance(() -> { + CMSOperations.instance.setCmsAwaitTimeoutMillis(3_000L); + CMSOperations.instance.setCmsCommitTimeoutMillis(60_000L); + })); + + CLUSTER.filters().verbs(Verb.TCM_COMMIT_REQ.id).from(2).to(1).drop(); + + long start = nanoTime(); + try + { + CLUSTER.coordinator(2).execute( + "CREATE KEYSPACE IF NOT EXISTS schema_not_admin_ks WITH replication = {'class': 'SimpleStrategy', 'replication_factor': 1}", + ConsistencyLevel.ONE); + } + catch (Exception e) + { + // Expected timeout + } + long elapsedMs = TimeUnit.NANOSECONDS.toMillis(nanoTime() - start); + + assertTrue("Schema DDL should fail near cms_await_timeout (3s), NOT admin timeout (60s), but took " + elapsedMs + "ms", + elapsedMs < 30_000); + } + + /** + * JMX runtime modification of cms_commit_timeout is a hot property and takes effect immediately. + */ + @Test + public void cmsCommitTimeoutIsHotProperty() + { + // Verify initial default + long initial = CLUSTER.get(2).callOnInstance(() -> CMSOperations.instance.getCmsCommitTimeoutMillis()); + assertEquals("Initial cms_commit_timeout should be default (1h)", DEFAULT_CMS_COMMIT_TIMEOUT_MS, initial); + + // Update via JMX on the node that will attempt the move + setOnAllInstances(3_000L, 200L, 1_000L); + + long updated = CLUSTER.get(2).callOnInstance(() -> CMSOperations.instance.getCmsCommitTimeoutMillis()); + assertEquals("Updated cms_commit_timeout should be 3s", 3_000L, updated); + + // Drop commits and verify the move fails with the new shorter timeout + CLUSTER.filters().verbs(Verb.TCM_COMMIT_REQ.id).from(2).to(1).drop(); + + long start = nanoTime(); + CLUSTER.get(2).nodetoolResult("move", "2345").asserts().failure(); + long elapsedMs = TimeUnit.NANOSECONDS.toMillis(nanoTime() - start); + + assertTrue("Move should fail near updated 3s timeout, but took " + elapsedMs + "ms", + elapsedMs < 30_000); + + // Restore filter and cancel + CLUSTER.filters().reset(); + cancelInProgressSequencesOnNode(CLUSTER.get(2)); + } + + /** + * cms_await_timeout is settable at runtime and takes effect. + */ + @Test + public void cmsAwaitTimeoutIsHotProperty() + { + long initial = CLUSTER.get(1).callOnInstance(() -> CMSOperations.instance.getCmsAwaitTimeoutMillis()); + assertEquals("Initial cms_await_timeout should be 120s", DEFAULT_CMS_AWAIT_TIMEOUT_MS, initial); + + CLUSTER.forEach(i -> i.runOnInstance(() -> CMSOperations.instance.setCmsAwaitTimeoutMillis(5_000L))); + long updated = CLUSTER.get(1).callOnInstance(() -> CMSOperations.instance.getCmsAwaitTimeoutMillis()); + assertEquals("cms_await_timeout should be 5s", 5_000L, updated); + } + + // ==================== Helpers ==================== + + /** + * Set admin timeout properties on ALL instances. + */ + private void setOnAllInstances(long adminTimeoutMs, long retryInitialDelayMs, long retryMaxDelayMs) + { + CLUSTER.forEach(i -> i.runOnInstance(() -> { + CMSOperations.instance.setCmsCommitTimeoutMillis(adminTimeoutMs); + CMSOperations.instance.setCmsCommitRetryInitialDelayMillis(retryInitialDelayMs); + CMSOperations.instance.setCmsCommitRetryMaxDelayMillis(retryMaxDelayMs); + })); + } + + private static void assertMoveFailed(IInvokableInstance i) + { + String mode = i.callOnInstance(() -> StorageService.instance.operationMode().toString()); + assertEquals(StorageService.Mode.MOVE_FAILED.toString(), mode); + } + + /** + * Cancel any in-progress topology sequences for the given node and verify cleanup. + */ + private static void cancelInProgressSequencesOnNode(IInvokableInstance instance) + { + instance.runOnInstance(() -> { + StorageService.cancelInProgressSequences(ClusterMetadata.current().myNodeId()); + }); + + // Wait briefly for cancellation to propagate + try { Thread.sleep(500); } catch (InterruptedException ignored) {} + + // Verify the cancellation succeeded + instance.runOnInstance(() -> { + InProgressSequences seqs = ClusterMetadata.current().inProgressSequences; + assertTrue("Expected no in-progress sequences after cancellation", seqs.isEmpty()); + }); + } +} diff --git a/test/unit/org/apache/cassandra/tcm/CommitTimeoutPathsTest.java b/test/unit/org/apache/cassandra/tcm/CommitTimeoutPathsTest.java new file mode 100644 index 000000000000..835b7d7b6aca --- /dev/null +++ b/test/unit/org/apache/cassandra/tcm/CommitTimeoutPathsTest.java @@ -0,0 +1,242 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.cassandra.tcm; + +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicReference; + +import org.junit.After; +import org.junit.BeforeClass; +import org.junit.Test; + +import org.apache.cassandra.config.DatabaseDescriptor; +import org.apache.cassandra.db.guardrails.Guardrails; +import org.apache.cassandra.dht.Murmur3Partitioner; +import org.apache.cassandra.exceptions.ExceptionCode; +import org.apache.cassandra.locator.MetaStrategy; +import org.apache.cassandra.schema.DistributedSchema; +import org.apache.cassandra.tcm.log.Entry; +import org.apache.cassandra.tcm.log.LocalLog; +import org.apache.cassandra.tcm.log.LogState; +import org.apache.cassandra.tcm.ownership.UniformRangePlacement; +import org.apache.cassandra.tcm.transformations.cms.Initialize; +import org.apache.cassandra.utils.FBUtilities; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertTrue; +import static org.junit.Assert.fail; + +/** + * Tests for timeout and exception paths in the TCM commit pipeline that were + * identified as uncovered by code coverage analysis: + * + * 1. handleCommitResult TimeoutException — commit succeeds but follower can't enact + * 2. handleCommitResult failure callback — commit returns failure + */ +public class CommitTimeoutPathsTest +{ + @BeforeClass + public static void setup() + { + DatabaseDescriptor.daemonInitialization(); + DatabaseDescriptor.setDefaultKeyspaceRF(1); + Guardrails.instance.setMinimumReplicationFactorThreshold(1, 1); + try + { + DatabaseDescriptor.setBroadcastAddress(java.net.InetAddress.getByName("127.0.0.1")); + } + catch (java.net.UnknownHostException e) + { + throw new RuntimeException(e); + } + } + + @After + public void cleanup() + { + try { ClusterMetadataService.unsetInstance(); } catch (Exception ignored) {} + } + + /** + * When processor.commit() returns a Failure result, handleCommitResult should invoke + * the onFailure handler. We inject a Processor that always returns failure. + */ + @Test + public void testHandleCommitResultFailureCallback() throws Exception + { + ClusterMetadata initial = new ClusterMetadata(Murmur3Partitioner.instance); + LocalLog log = LocalLog.logSpec() + .sync() + .withInitialState(initial) + .createLog(); + + // Create a Processor that can fail commits + AtomicBoolean succeed = new AtomicBoolean(true); + AtomicLongBackedProcessor realProcessor = new AtomicLongBackedProcessor(log); + Processor testProcessor = new Processor() + { + @Override + public Commit.Result commit(Entry.Id entryId, Transformation transform, Epoch lastKnown, Retry retryPolicy) + { + if (succeed.get()) return realProcessor.commit(entryId, transform, lastKnown, retryPolicy); + else return Commit.Result.failed(ExceptionCode.SERVER_ERROR, "injected failure for testing"); + } + + @Override + public ClusterMetadata fetchLogAndWait(Epoch waitFor, Retry retryPolicy) + { + if (succeed.get()) return realProcessor.fetchLogAndWait(waitFor, retryPolicy); + else return ClusterMetadata.current(); + } + }; + + ClusterMetadataService service = new ClusterMetadataService(new UniformRangePlacement(), + MetadataSnapshots.NO_OP, + log, + testProcessor, + Commit.Replicator.NO_OP, + true); + ClusterMetadataService.setInstance(service); + log.readyUnchecked(); + log.unsafeBootstrapForTesting(FBUtilities.getBroadcastAddressAndPort()); + service.commit(new Initialize(ClusterMetadata.current()) { + public Result execute(ClusterMetadata prev) + { + ClusterMetadata next = baseState; + DistributedSchema initialSchema = new DistributedSchema(prev.schema.getKeyspaces()); + ClusterMetadata.Transformer transformer = next.transformer().with(initialSchema); + return Transformation.success(transformer, MetaStrategy.affectedRanges(prev)); + } + }); + + succeed.set(false); + + // Now test the failure path via commit(transform, onSuccess, onFailure) + AtomicReference failureMessage = new AtomicReference<>(); + AtomicReference failureCode = new AtomicReference<>(); + + Transformation noopTransform = new Transformation() + { + public Kind kind() { return Kind.CUSTOM; } + public Result execute(ClusterMetadata metadata) + { + return Transformation.success(metadata.transformer(), MetaStrategy.affectedRanges(metadata)); + } + }; + + String result = service.commit(noopTransform, + metadata -> "success", (code, message) -> { + failureCode.set(code); + failureMessage.set(message); + return "failure:" + message; + }); + + assertEquals("failure:injected failure for testing", result); + assertEquals(ExceptionCode.SERVER_ERROR, failureCode.get()); + assertNotNull(failureMessage.get()); + assertTrue(failureMessage.get().contains("injected failure")); + } + + /** + * When commit succeeds but awaitAtLeast(epoch) fails because the log can't catch up + * to the committed epoch, handleCommitResult should throw IllegalStateException. + */ + @Test + public void testHandleCommitResultTimeoutException() + { + ClusterMetadata initial = new ClusterMetadata(Murmur3Partitioner.instance); + LocalLog log = LocalLog.logSpec() + .sync() + .withInitialState(initial) + .createLog(); + + /* Create a Processor that can returns a Success result with a far-future epoch + * that the local log will never reach, causing awaitAtLeast to fail. + */ + AtomicBoolean succeed = new AtomicBoolean(true); + AtomicLongBackedProcessor realProcessor = new AtomicLongBackedProcessor(log); + Processor testProcessor = new Processor() + { + @Override + public Commit.Result commit(Entry.Id entryId, Transformation transform, Epoch lastKnown, Retry retryPolicy) + { + if (succeed.get()) + { + return realProcessor.commit(entryId, transform, lastKnown, retryPolicy); + } + else + { + Epoch farFuture = Epoch.create(999999); + return new Commit.Result.Success(farFuture, LogState.EMPTY); + } + } + + @Override + public ClusterMetadata fetchLogAndWait(Epoch waitFor, Retry retryPolicy) + { + return ClusterMetadata.current(); + } + }; + ClusterMetadataService service = new ClusterMetadataService(new UniformRangePlacement(), + MetadataSnapshots.NO_OP, + log, + testProcessor, + Commit.Replicator.NO_OP, + true); + ClusterMetadataService.setInstance(service); + log.readyUnchecked(); + log.unsafeBootstrapForTesting(FBUtilities.getBroadcastAddressAndPort()); + service.commit(new Initialize(ClusterMetadata.current()) { + public Result execute(ClusterMetadata prev) + { + ClusterMetadata next = baseState; + DistributedSchema initialSchema = new DistributedSchema(prev.schema.getKeyspaces()); + ClusterMetadata.Transformer transformer = next.transformer().with(initialSchema); + return Transformation.success(transformer, MetaStrategy.affectedRanges(prev)); + } + }); + + // Make the processor returns success with a far-future epoch the log can never reach + succeed.set(false); + + Transformation noopTransform = new Transformation() + { + public Kind kind() { return Kind.CUSTOM; } + public Result execute(ClusterMetadata metadata) + { + return Transformation.success(metadata.transformer(), MetaStrategy.affectedRanges(metadata)); + } + }; + + try + { + service.commit(noopTransform, + metadata -> "success", (code, message) -> { throw new RuntimeException("unexpected failure: " + message); }); + fail("Expected IllegalStateException wrapping TimeoutException"); + } + catch (IllegalStateException e) + { + assertTrue("Should mention unreachable epoch or timeout: " + e.getMessage(), + e.getMessage().contains("Timed out") || e.getMessage().contains("Could not reach")); + assertTrue("Should mention epoch: " + e.getMessage(), + e.getMessage().contains("999999")); + } + } +} diff --git a/test/unit/org/apache/cassandra/tcm/RetryTest.java b/test/unit/org/apache/cassandra/tcm/RetryTest.java index e6a90903e9c8..5fe2c7ebbe28 100644 --- a/test/unit/org/apache/cassandra/tcm/RetryTest.java +++ b/test/unit/org/apache/cassandra/tcm/RetryTest.java @@ -23,6 +23,7 @@ import com.codahale.metrics.Meter; +import org.assertj.core.api.Assertions; import org.junit.Before; import org.junit.BeforeClass; import org.junit.Test; @@ -30,21 +31,21 @@ import org.slf4j.LoggerFactory; import org.apache.cassandra.config.DatabaseDescriptor; +import org.apache.cassandra.service.RetryStrategy; +import org.apache.cassandra.service.TimeoutStrategy; import org.apache.cassandra.service.WaitStrategy; -import org.apache.cassandra.tcm.log.Entry; -import org.apache.cassandra.tcm.membership.NodeVersion; -import org.apache.cassandra.tcm.transformations.Startup; import static java.util.concurrent.TimeUnit.MILLISECONDS; import static java.util.concurrent.TimeUnit.NANOSECONDS; -import static org.apache.cassandra.tcm.membership.MembershipUtils.node; -import static org.apache.cassandra.tcm.membership.MembershipUtils.nodeAddresses; import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertTrue; public class RetryTest { private static final Logger logger = LoggerFactory.getLogger(RetryTest.class); private Random random; + private static final Meter testMeter = new Meter(); @BeforeClass public static void setupClass() @@ -68,7 +69,10 @@ public void testRetryWithNoTimeLimitObservesTimeUnit() WaitStrategy fixed = new WaitStrategy() { @Override - public long computeWaitUntil(int attempts) {throw new UnsupportedOperationException();} + public long computeWaitUntil(int attempts) + { + throw new UnsupportedOperationException(); + } @Override public long computeWait(int attempts, TimeUnit units) @@ -87,25 +91,84 @@ public long computeWait(int attempts, TimeUnit units) @Test public void testProcessorIndefiniteRetryBehaviour() { - new Processor() + Retry retryPolicy = Retry.unsafeRetryIndefinitely(); + // Assert the properties of the Retry provided by the private static Processor::unsafeRetryIndefinitely + for (int i = 1; i < 1000; i++) { - @Override - public ClusterMetadata fetchLogAndWait(Epoch waitFor, Retry retryPolicy) {return null;} + // backoff increases in 100ms steps, up to a max of 10000ms + long waitTime = retryPolicy.computeWait(i, MILLISECONDS); + assertEquals(Math.min((i + 1) * 100, 10000), waitTime); + } + // Retry indefinitely means no explicit deadline is set + assertEquals(Long.MAX_VALUE, retryPolicy.deadlineNanos); + } - @Override - public Commit.Result commit(Entry.Id entryId, Transformation transform, Epoch lastKnown, Retry retryPolicy) - { - // Assert the properties of the Retry provided by the private static Processor::unsafeRetryIndefinitely - for (int i = 1; i<1000;i++) - { - // backoff increases in 100ms steps, up to a max of 10000ms - long waitTime = retryPolicy.computeWait(i, MILLISECONDS); - assertEquals(Math.min((i+1) * 100, 10000), waitTime); - } - // Retry indefinitely means no explicit deadline is set - assertEquals(Long.MAX_VALUE, retryPolicy.deadlineNanos); - return null; - } - }.commit(new Entry.Id(0L), new Startup(node(random), nodeAddresses(random), NodeVersion.CURRENT), Epoch.EMPTY); + @Test + public void testExponentialJitterValueDistribution() + { + String spec = String.format("0ms ... %dms * 2^attempts <= %dms", 100, 10000); + WaitStrategy jitter = RetryStrategy.parse(spec, + TimeoutStrategy.LatencySourceFactory.none(), + RetryStrategy.randomizers.uniform()); + for (int i = 0; i < 1000; i++) + { + long sleep = jitter.computeWait(i, TimeUnit.MILLISECONDS); + Assertions.assertThat(sleep).isNotNegative().isLessThanOrEqualTo(10000); + } + } + + @Test + public void testExponentialJitterEarlyAttemptsSmall() + { + // With base=100ms and cap=60000ms, first attempt max = min(60000, 100 * 2^1) = 200ms + String spec = String.format("0ms ... %dms * 2^attempts <= %dms", 100, 60000); + WaitStrategy jitter = RetryStrategy.parse(spec, + TimeoutStrategy.LatencySourceFactory.none(), + RetryStrategy.randomizers.uniform()); + long firstSleep = jitter.computeWait(1, TimeUnit.MILLISECONDS); + // tries is 0 initially, so expBackoff = min(60000, 100 * 2^0) = 100 + Assertions.assertThat(firstSleep).isNotNegative().isLessThan(100); + } + + @Test + public void testExponentialJitterWithDeadline() + { + long deadlineNanos = System.nanoTime() + 100_000_000L; // 100ms from now + String spec = String.format("0ms ... %dms * 2^attempts <= %dms", 100, 1000); + WaitStrategy jitter = RetryStrategy.parse(spec, + TimeoutStrategy.LatencySourceFactory.none(), + RetryStrategy.randomizers.uniform()); + Retry deadline = Retry.until(deadlineNanos, testMeter, jitter); + + assertFalse("Should not have reached deadline yet", deadline.hasExpired()); + assertTrue("Remaining should be positive", deadline.remainingNanos() > 0); + } + + @Test + public void testExponentialJitterWithExpiredDeadline() throws InterruptedException + { + long deadlineNanos = System.nanoTime() + 1_000_000L; // 1ms from now + String spec = String.format("0ms ... %dms * 2^attempts <= %dms", 100, 1000); + WaitStrategy jitter = RetryStrategy.parse(spec, + TimeoutStrategy.LatencySourceFactory.none(), + RetryStrategy.randomizers.uniform()); + Retry deadline = Retry.until(deadlineNanos, testMeter, jitter); + + Thread.sleep(5); // ensure deadline passes + assertTrue("Should have reached deadline", deadline.hasExpired()); + assertEquals("Remaining should be 0", 0, deadline.remainingNanos()); + } + + @Test + public void testExponentialJitterOverflowProtection() + { + // Regardless of the number of attemps, the jittered wait should be capped at 5s + String spec = String.format("0ms ... %dms * 2^attempts <= %dms", 100, 5000); + WaitStrategy jitter = RetryStrategy.parse(spec, + TimeoutStrategy.LatencySourceFactory.none(), + RetryStrategy.randomizers.uniform()); + // Should not throw or return negative + long sleep = jitter.computeWait(500, TimeUnit.MILLISECONDS); + assertTrue("Sleep should be within specified min/max even after 500 tries: " + sleep, sleep >= 0 && sleep <= 5000); } } From c99155c691560e49d02478ed55f37b20686b714b Mon Sep 17 00:00:00 2001 From: Sam Tunnicliffe Date: Tue, 23 Jun 2026 12:21:14 +0100 Subject: [PATCH 2/4] Pull RetryStrategy for CMS commit operations into DatabaseDescriptor --- .../cassandra/config/DatabaseDescriptor.java | 41 +++++++++++++++++++ .../cassandra/tcm/ClusterMetadataService.java | 9 +--- src/java/org/apache/cassandra/tcm/Commit.java | 9 +--- 3 files changed, 43 insertions(+), 16 deletions(-) diff --git a/src/java/org/apache/cassandra/config/DatabaseDescriptor.java b/src/java/org/apache/cassandra/config/DatabaseDescriptor.java index fe855721598e..981604736c4a 100644 --- a/src/java/org/apache/cassandra/config/DatabaseDescriptor.java +++ b/src/java/org/apache/cassandra/config/DatabaseDescriptor.java @@ -124,8 +124,10 @@ import org.apache.cassandra.security.SSLFactory; import org.apache.cassandra.service.CacheService.CacheType; import org.apache.cassandra.service.FileSystemOwnershipCheck; +import org.apache.cassandra.service.RetryStrategy; import org.apache.cassandra.service.StartupChecks; import org.apache.cassandra.service.StorageService; +import org.apache.cassandra.service.TimeoutStrategy; import org.apache.cassandra.service.accord.AccordService; import org.apache.cassandra.service.accord.api.AccordWaitStrategies; import org.apache.cassandra.service.consensus.TransactionalMode; @@ -270,6 +272,15 @@ public class DatabaseDescriptor public static volatile boolean allowUnlimitedConcurrentValidations = ALLOW_UNLIMITED_CONCURRENT_VALIDATIONS.getBoolean(); + /** + * RetryStrategy which provides exponential backoff with full jitter, for use by both CMS and non-CMS members + * when submitting a Commit request. The range and increments of the backoff times are defined by + * conf.cms_commit_retry_initial_delay and conf.cms_commit_retry_max_delay. Both are hot properties and so + * changing either one causes this retry strategy to be reconstructed. + */ + private static volatile RetryStrategy cms_commit_retry_strategy; + + /** * The configuration for guardrails. */ @@ -580,6 +591,8 @@ private static void applyAll() throws ConfigurationException applyAccord(); + applyCMS(); + applyStartupChecks(); } @@ -1388,6 +1401,25 @@ private static void applyAccord() AccordService.applyProtocolModifiers(getAccord()); } + private static void applyCMS() + { + try + { + long initialDelayMs = conf.cms_commit_retry_initial_delay.to(TimeUnit.MILLISECONDS); + long maxDelayMs = conf.cms_commit_retry_max_delay.to(TimeUnit.MILLISECONDS); + // range of backoff wait time starts at 0ms backing off exponentially at initialDelayMs * 2^attempts + String spec = String.format("0ms ... %dms * 2^attempts <= %dms", initialDelayMs, maxDelayMs); + logger.debug("Initializing cms_commit_retry_strategy from spec: " + spec); + cms_commit_retry_strategy = RetryStrategy.parse(spec, + TimeoutStrategy.LatencySourceFactory.none(), + RetryStrategy.randomizers.uniform()); + } + catch (Exception e) + { + throw new ConfigurationException("Invalid configuration for cms_commit_retry_strategy. " + e.getMessage(), e); + } + } + public static StartupChecksConfiguration getStartupChecksConfiguration() { return startupChecksConfiguration; @@ -6200,6 +6232,8 @@ public static void setCmsCommitTimeout(long timeoutInMillis) } } + + public static DurationSpec getCmsCommitRetryInitialDelay() { return conf.cms_commit_retry_initial_delay; @@ -6211,6 +6245,7 @@ public static void setCmsCommitRetryInitialDelay(long delayInMillis) { logger.info("Setting cms_commit_retry_initial_delay to {}ms", delayInMillis); conf.cms_commit_retry_initial_delay = new DurationSpec.LongMillisecondsBound(delayInMillis); + applyCMS(); } } @@ -6225,9 +6260,15 @@ public static void setCmsCommitRetryMaxDelay(long delayInMillis) { logger.info("Setting cms_commit_retry_max_delay to {}ms", delayInMillis); conf.cms_commit_retry_max_delay = new DurationSpec.LongMillisecondsBound(delayInMillis); + applyCMS(); } } + public static RetryStrategy getCmsCommitRetryStrategy() + { + return cms_commit_retry_strategy; + } + public static int getEpochAwareDebounceInFlightTrackerMaxSize() { return conf.epoch_aware_debounce_inflight_tracker_max_size; diff --git a/src/java/org/apache/cassandra/tcm/ClusterMetadataService.java b/src/java/org/apache/cassandra/tcm/ClusterMetadataService.java index a61d0e56213d..6d1b054becdc 100644 --- a/src/java/org/apache/cassandra/tcm/ClusterMetadataService.java +++ b/src/java/org/apache/cassandra/tcm/ClusterMetadataService.java @@ -58,7 +58,6 @@ import org.apache.cassandra.schema.Keyspaces; import org.apache.cassandra.schema.ReplicationParams; import org.apache.cassandra.service.RetryStrategy; -import org.apache.cassandra.service.TimeoutStrategy; import org.apache.cassandra.service.accord.topology.AccordFastPath; import org.apache.cassandra.service.accord.topology.AccordStaleReplicas; import org.apache.cassandra.service.consensus.migration.ConsensusMigrationState; @@ -733,13 +732,7 @@ else if (kind == Transformation.Kind.SCHEMA_CHANGE) // with jitter works to desynchronize retry waves after a CMS await timeout. For CMS members committing // locally, it helps to space Paxos CAS retries in the local commit loop. long deadlineNanos = nanoTime() + DatabaseDescriptor.getCmsCommitTimeout().to(TimeUnit.NANOSECONDS); - long initialDelayMs = DatabaseDescriptor.getCmsCommitRetryInitialDelay().to(TimeUnit.MILLISECONDS); - long maxDelayMs = DatabaseDescriptor.getCmsCommitRetryMaxDelay().to(TimeUnit.MILLISECONDS); - // range of backoff wait time starts at 0ms backing off exponentially at initialDelayMs * 2^attempts - String spec = String.format("0ms ... %dms * 2^attempts <= %dms", initialDelayMs, maxDelayMs); - RetryStrategy backoffWithJitter = RetryStrategy.parse(spec, - TimeoutStrategy.LatencySourceFactory.none(), - RetryStrategy.randomizers.uniform()); + RetryStrategy backoffWithJitter = DatabaseDescriptor.getCmsCommitRetryStrategy(); retryPolicy = Retry.until(deadlineNanos, TCMMetrics.instance.commitRetries, backoffWithJitter); } return retryPolicy; diff --git a/src/java/org/apache/cassandra/tcm/Commit.java b/src/java/org/apache/cassandra/tcm/Commit.java index 7550ce8ac452..7eda9b752300 100644 --- a/src/java/org/apache/cassandra/tcm/Commit.java +++ b/src/java/org/apache/cassandra/tcm/Commit.java @@ -43,7 +43,6 @@ import org.apache.cassandra.net.MessagingService; import org.apache.cassandra.net.Verb; import org.apache.cassandra.service.RetryStrategy; -import org.apache.cassandra.service.TimeoutStrategy; import org.apache.cassandra.tcm.log.Entry; import org.apache.cassandra.tcm.log.LogState; import org.apache.cassandra.tcm.membership.Directory; @@ -396,13 +395,7 @@ public void doVerb(Message message) throws IOException long now = MonotonicClock.Global.preciseTime.now(); long localDeadlineNanos = Math.max(now + casWriteRpcTimeoutNanos, message.expiresAtNanos() - casWriteRpcTimeoutNanos); - long initialDelayMs = DatabaseDescriptor.getCmsCommitRetryInitialDelay().to(TimeUnit.MILLISECONDS); - long maxDelayMs = DatabaseDescriptor.getCmsCommitRetryMaxDelay().to(TimeUnit.MILLISECONDS); - - String spec = String.format("... %dms^attempts <= %dms", initialDelayMs, maxDelayMs); - RetryStrategy backoffWithJitter = RetryStrategy.parse(spec, - TimeoutStrategy.LatencySourceFactory.none(), - RetryStrategy.randomizers.uniform()); + RetryStrategy backoffWithJitter = DatabaseDescriptor.getCmsCommitRetryStrategy(); Retry retryPolicy = Retry.until(localDeadlineNanos, TCMMetrics.instance.commitRetries, backoffWithJitter); Result result = processor.commit(message.payload.entryId, message.payload.transform, message.payload.lastKnown, retryPolicy); if (result.isSuccess()) From b928235609fbbd4806cc90d27afe30e36bb5942b Mon Sep 17 00:00:00 2001 From: Sam Tunnicliffe Date: Thu, 25 Jun 2026 12:19:47 +0100 Subject: [PATCH 3/4] Use specific retry deadline when fetching log during a failure to commit --- .../cassandra/tcm/AbstractLocalProcessor.java | 13 +++++++++++-- .../apache/cassandra/tcm/PaxosBackedProcessor.java | 9 +++++++++ 2 files changed, 20 insertions(+), 2 deletions(-) diff --git a/src/java/org/apache/cassandra/tcm/AbstractLocalProcessor.java b/src/java/org/apache/cassandra/tcm/AbstractLocalProcessor.java index 41c7ebbeb503..382bba43a967 100644 --- a/src/java/org/apache/cassandra/tcm/AbstractLocalProcessor.java +++ b/src/java/org/apache/cassandra/tcm/AbstractLocalProcessor.java @@ -23,6 +23,7 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import org.apache.cassandra.metrics.TCMMetrics; import org.apache.cassandra.tcm.log.Entry; import org.apache.cassandra.tcm.log.LocalLog; import org.apache.cassandra.tcm.log.LogState; @@ -84,7 +85,11 @@ public final Commit.Result commit(Entry.Id entryId, Transformation transform, fi // Just try to catch up to the latest distributed state. if (result.isRejected()) { - ClusterMetadata replayed = fetchLogAndWait(null, retryPolicy); + // Use a dedicated retry policy here as the one for the commit itself may not be appropriate. + // It uses the wrong metric and for STARTUP transformations will retry indefinitely, which is not + // what we want here. + Retry fetchLogRetry = Retry.until(retryPolicy.deadlineNanos, TCMMetrics.instance.fetchLogRetries); + ClusterMetadata replayed = fetchLogAndWait(null, fetchLogRetry); // Retry if replay has changed the epoch, return rejection otherwise. if (!replayed.epoch.isAfter(previous.epoch)) @@ -124,7 +129,11 @@ public final Commit.Result commit(Entry.Id entryId, Transformation transform, fi if (!retryPolicy.maybeSleep()) break; // TODO: could also add epoch from mis-application from [applied]. - fetchLogAndWait(null, retryPolicy); + // Use a dedicated retry policy here as the one for the commit itself may not be appropriate. + // It uses the wrong metric and for STARTUP transformations will retry indefinitely, which is not + // what we want here. + Retry fetchLogRetry = Retry.until(retryPolicy.deadlineNanos, TCMMetrics.instance.fetchLogRetries); + fetchLogAndWait(null, fetchLogRetry); } } catch (Throwable e) diff --git a/src/java/org/apache/cassandra/tcm/PaxosBackedProcessor.java b/src/java/org/apache/cassandra/tcm/PaxosBackedProcessor.java index 03f9afe0537c..4e18964e4df0 100644 --- a/src/java/org/apache/cassandra/tcm/PaxosBackedProcessor.java +++ b/src/java/org/apache/cassandra/tcm/PaxosBackedProcessor.java @@ -214,6 +214,15 @@ public void retry() condition = new AsyncPromise<>(); messagingService.sendWithCallback(Message.out(Verb.TCM_FETCH_CMS_LOG_REQ, request), to.endpoint(), this); } + + @Override + public String toString() + { + return "FetchLogRequest{" + + "to=" + to.endpoint() + + ", request=" + request + + '}'; + } } From ba08d4dadf35c53db1288b5fcbdb6c26e0ecbc8c Mon Sep 17 00:00:00 2001 From: Sam Tunnicliffe Date: Tue, 30 Jun 2026 10:03:25 +0100 Subject: [PATCH 4/4] Relocate CMS member selection policy mbean methods --- .../apache/cassandra/service/StorageService.java | 13 ------------- .../cassandra/service/StorageServiceMBean.java | 12 ------------ 2 files changed, 25 deletions(-) diff --git a/src/java/org/apache/cassandra/service/StorageService.java b/src/java/org/apache/cassandra/service/StorageService.java index 2ad2a5bf278d..53f0986e0cd2 100644 --- a/src/java/org/apache/cassandra/service/StorageService.java +++ b/src/java/org/apache/cassandra/service/StorageService.java @@ -1761,19 +1761,6 @@ public String listConsensusMigrations(@Nullable Set keyspaceNames, return pojoMapToString(snapshotAsMap, format); } - @Override - public String getCmsCommitMemberPreferencePolicy() - { - return DatabaseDescriptor.getCmsCommitMemberPreferencePolicy().name(); - } - - @Override - public void setCmsCommitMemberPreferencePolicy(String policy) - { - DatabaseDescriptor.setCmsCommitMemberPreferencePolicy(policy); - logger.info("Set cms_commit_member_preference_policy to {}", policy); - } - public Map> getConcurrency(List stageNames) { Stream stageStream = stageNames.isEmpty() ? stream(Stage.values()) : stageNames.stream().map(Stage::fromPoolName); diff --git a/src/java/org/apache/cassandra/service/StorageServiceMBean.java b/src/java/org/apache/cassandra/service/StorageServiceMBean.java index 34db8b418057..b9f47b3b2dd6 100644 --- a/src/java/org/apache/cassandra/service/StorageServiceMBean.java +++ b/src/java/org/apache/cassandra/service/StorageServiceMBean.java @@ -1170,18 +1170,6 @@ Integer finishConsensusMigration(@Nonnull String keyspace, List getAccordManagedKeyspaces(); List getAccordManagedTables(); - /** Get the CMS commit member preference policy - * - * @return how to choose the cms member preference order for commits - */ - public String getCmsCommitMemberPreferencePolicy(); - - /** Update the CMS commit member preference policy - * - * @param policy see Config.CMSCommitMemberPreferencePolicy - */ - public void setCmsCommitMemberPreferencePolicy(String policy); - /** Gets the concurrency settings for processing stages*/ static class StageConcurrency implements Serializable {