Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -30,4 +30,10 @@ public class MutationTrackingSpec
* The interval in which the backgroun reconciliation process runs
*/
public volatile DurationSpec.LongMillisecondsBound background_reconciliation_interval = new DurationSpec.LongMillisecondsBound("1s");
/**
* Minimum time to wait before re-issuing a pull request for the same coordinator log
* during background reconciliation. Decoupled from {@link #background_reconciliation_interval}
* so that suppression spans multiple scheduler ticks.
*/
public volatile DurationSpec.LongMillisecondsBound background_reconciliation_request_cooldown = new DurationSpec.LongMillisecondsBound("3s");
}
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,9 @@ public static MutationTrackingMetrics instance()

public final Counter broadcastOffsetsDiscovered; // Newly-witnessed offsets discovered via broadcast
public final Counter writeTimeOffsetsDiscovered; // Newly-witnessed offsets discovered at write time
public final Counter backgroundPullRequestsSent; // PullMutationsRequest messages issued by background reconciliation
public final Counter backgroundPullRequestsSuppressed; // PullMutationsRequest sends skipped by the per-coordinator-log cooldown
public final Counter backgroundPullRequestsFailed; // PullMutationsRequest sends that failed before transmission (overload, serialization, closed connection)
public final Histogram readSummarySize; // Read summary sizes
public final Gauge<Long> unreconciledMutationCount; // Number of unreconciled mutations
public final Gauge<Long> journalDiskSpaceUsed; // Size of MutationJournal on disk
Expand All @@ -55,6 +58,9 @@ private MutationTrackingMetrics()
{
broadcastOffsetsDiscovered = Metrics.counter(factory.createMetricName("BroadcastOffsetsDiscovered"));
writeTimeOffsetsDiscovered = Metrics.counter(factory.createMetricName("WriteTimeOffsetsDiscovered"));
backgroundPullRequestsSent = Metrics.counter(factory.createMetricName("BackgroundPullRequestsSent"));
backgroundPullRequestsSuppressed = Metrics.counter(factory.createMetricName("BackgroundPullRequestsSuppressed"));
backgroundPullRequestsFailed = Metrics.counter(factory.createMetricName("BackgroundPullRequestsFailed"));
readSummarySize = Metrics.histogram(factory.createMetricName("ReadSummarySize"), true);
unreconciledMutationCount = Metrics.register(
factory.createMetricName("UnreconciledMutationCount"),
Expand Down
126 changes: 116 additions & 10 deletions src/java/org/apache/cassandra/replication/MutationTrackingService.java
Original file line number Diff line number Diff line change
Expand Up @@ -65,13 +65,17 @@
import org.apache.cassandra.dht.Splitter;
import org.apache.cassandra.dht.Token;
import org.apache.cassandra.exceptions.RequestFailure;
import org.apache.cassandra.exceptions.RequestFailureReason;
import org.apache.cassandra.gms.FailureDetector;
import org.apache.cassandra.io.sstable.format.SSTableReader;
import org.apache.cassandra.locator.EndpointsForRange;
import org.apache.cassandra.locator.InetAddressAndPort;
import org.apache.cassandra.metrics.MutationTrackingMetrics;
import org.apache.cassandra.net.Message;
import org.apache.cassandra.net.MessageFlag;
import org.apache.cassandra.net.MessagingService;
import org.apache.cassandra.net.NoPayload;
import org.apache.cassandra.net.RequestCallback;
import org.apache.cassandra.net.Verb;
import org.apache.cassandra.repair.SyncTask;
import org.apache.cassandra.repair.SyncTasks;
Expand All @@ -89,6 +93,7 @@
import org.apache.cassandra.tcm.membership.NodeId;
import org.apache.cassandra.tcm.ownership.ReplicaGroups;
import org.apache.cassandra.tcm.ownership.VersionedEndpoints;
import org.apache.cassandra.utils.Clock;
import org.apache.cassandra.utils.FBUtilities;
import org.apache.cassandra.utils.MBeanWrapper;

Expand Down Expand Up @@ -255,6 +260,8 @@ private synchronized void startInternal(Function<ChangeListener, ClusterMetadata
if (!keyspaceShards.isEmpty() && !config.background_reconciliation_enabled)
logBackgroundReconciliationDisabledWarning(keyspaceShards.keySet());

warnIfCooldownBelowInterval();

offsetsBroadcaster.start();
offsetsPersister.start();
backgroundReconciler.start();
Expand Down Expand Up @@ -290,6 +297,7 @@ public void setMutationTrackingBackgroundReconciliationIntervalMilliseconds(long
logger.info("Setting mutation tracking background reconciliation interval from {} to {}",
config.background_reconciliation_interval, backgroundReconciliationInterval);
config.background_reconciliation_interval = backgroundReconciliationInterval;
warnIfCooldownBelowInterval();
}
}

Expand All @@ -299,6 +307,26 @@ public long getMutationTrackingBackgroundReconciliationIntervalMilliseconds()
return config.background_reconciliation_interval.toMilliseconds();
}

@Override
public void setMutationTrackingBackgroundReconciliationRequestCooldownMilliseconds(long cooldownMilliseconds)
{
if (cooldownMilliseconds != config.background_reconciliation_request_cooldown.toMilliseconds())
{
DurationSpec.LongMillisecondsBound backgroundReconciliationRequestCooldown =
new DurationSpec.LongMillisecondsBound(cooldownMilliseconds, TimeUnit.MILLISECONDS);
logger.info("Setting mutation tracking background reconciliation request cooldown from {} to {}",
config.background_reconciliation_request_cooldown, backgroundReconciliationRequestCooldown);
config.background_reconciliation_request_cooldown = backgroundReconciliationRequestCooldown;
warnIfCooldownBelowInterval();
}
}

@Override
public long getMutationTrackingBackgroundReconciliationRequestCooldownMilliseconds()
{
return config.background_reconciliation_request_cooldown.toMilliseconds();
}

public void pauseOffsetBroadcast(boolean pause)
{
offsetsBroadcaster.pauseOffsetBroadcast(pause);
Expand Down Expand Up @@ -1465,8 +1493,21 @@ static int loadHostLogIdFromSystemTable()
return rows.one().getInt("host_log_id");
}

static void warnIfCooldownBelowInterval()
{
long cooldownMs = config.background_reconciliation_request_cooldown.toMilliseconds();
long intervalMs = config.background_reconciliation_interval.toMilliseconds();
if (cooldownMs < intervalMs)
logger.warn("Mutation tracking background reconciliation request cooldown ({} ms) is less than the " +
"scheduling interval ({} ms); per-coordinator-log request suppression will not span " +
"consecutive scheduler ticks.",
cooldownMs, intervalMs);
}

private static class BackgroundReconciler
{
private final Map<CoordinatorLogId, Long> lastRequestedAt = new ConcurrentHashMap<>();

void start()
{
scheduleNext();
Expand All @@ -1492,16 +1533,28 @@ private void runAndReschedule()

void run()
{
MutationTrackingService.instance().forEachKeyspace(this::run);
}
long now = Clock.Global.nanoTime();
long cooldownNanos = TimeUnit.MILLISECONDS.toNanos(config.background_reconciliation_request_cooldown.toMilliseconds());

private void run(KeyspaceShards shards)
{
if (config.background_reconciliation_enabled)
shards.forEachShard(this::run);
{
MutationTrackingService.instance()
.forEachKeyspace(shards -> shards.forEachShard(shard -> run(shard, now, cooldownNanos)));
}
else if (!lastRequestedAt.isEmpty())
{
// When the background reconciliation is disabled, we clean up any pending
// requests being tracked for the cool down period
lastRequestedAt.clear();
}

if (!lastRequestedAt.isEmpty())
{
lastRequestedAt.values().removeIf(timestamp -> now - timestamp > cooldownNanos);
}
}

private void run(Shard shard)
private void run(Shard shard, long now, long cooldownNanos)
{
try
{
Expand All @@ -1510,8 +1563,16 @@ private void run(Shard shard)

for (Offsets.Immutable offsets : missing)
{
// Prefer pulling from the coordinator
int coordinatorHostId = offsets.logId().hostId();
CoordinatorLogId logId = offsets.logId();

Long lastRequested = lastRequestedAt.get(logId);
if (lastRequested != null && now - lastRequested < cooldownNanos)
{
MutationTrackingMetrics.instance().backgroundPullRequestsSuppressed.inc();
continue;
}

int coordinatorHostId = logId.hostId();
InetAddressAndPort coordinator = ClusterMetadata.current().directory.endpoint(new NodeId(coordinatorHostId));
InetAddressAndPort pullFrom = FailureDetector.instance.isAlive(coordinator)
? coordinator
Expand All @@ -1523,10 +1584,12 @@ private void run(Shard shard)
continue; // No reachable source
}

// TODO (expected): backoff, rate limits, per host and total
PullMutationsRequest request = new PullMutationsRequest(offsets, ActiveLogReconciler.Priority.REGULAR);
logger.trace("Requesting pull mutation request from replica {} for missing offset {}", pullFrom, offsets);
MessagingService.instance().send(Message.out(Verb.MT_PULL_MUTATIONS_REQ, request), pullFrom);
Message<PullMutationsRequest> message = Message.outWithFlag(Verb.MT_PULL_MUTATIONS_REQ, request, MessageFlag.CALL_BACK_ON_FAILURE);
MessagingService.instance().sendWithCallback(message, pullFrom, new PullRequestFailureCallback(logId, now));
lastRequestedAt.put(logId, now);
MutationTrackingMetrics.instance().backgroundPullRequestsSent.inc();
}
}
catch (Throwable throwable)
Expand All @@ -1550,6 +1613,49 @@ private InetAddressAndPort findAliveReplica(Shard shard, int excludeHostId)
}
return null;
}

/**
* MT_PULL_MUTATIONS_REQ is a one-way verb (no response). The callback registered with
* {@link MessagingService#sendWithCallback} fires on send-layer failures (queue overload,
* serialization error, closed connection) and on the eventual TIMEOUT after
* {@code readTimeout} expires. We treat TIMEOUT as benign (a one-way verb never produces
* a response) and only react to genuine send failures by clearing the cooldown entry,
* so the next reconciliation tick can retry without waiting for the cooldown to expire.
*/
private final class PullRequestFailureCallback implements RequestCallback<NoPayload>
{
private final CoordinatorLogId logId;
private final long sentAt;

PullRequestFailureCallback(CoordinatorLogId logId, long sentAt)
{
this.logId = logId;
this.sentAt = sentAt;
}

@Override
public void onResponse(Message<NoPayload> msg)
{
// MT_PULL_MUTATIONS_REQ is one-way; no response is expected.
}

@Override
public boolean invokeOnFailure()
{
return true;
}

@Override
public void onFailure(InetAddressAndPort from, RequestFailure failure)
{
if (failure.reason == RequestFailureReason.TIMEOUT)
return; // expected for a one-way verb
// Only remove if our entry is still the one in place; a newer reconcile may have
// already overwritten it.
lastRequestedAt.remove(logId, sentAt);
MutationTrackingMetrics.instance().backgroundPullRequestsFailed.inc();
}
}
}

// TODO (later): a more intelligent heuristic for offsets included in broadcasts
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,4 +46,17 @@ public interface MutationTrackingServiceMBean
* @return the interval, in milliseconds, in which the background reconciliation runs when enabled
*/
long getMutationTrackingBackgroundReconciliationIntervalMilliseconds();

/**
* Sets the minimum delay (in milliseconds) between successive pull requests issued by
* background reconciliation for the same coordinator log.
*
* @param cooldownMilliseconds the cooldown value in milliseconds
*/
void setMutationTrackingBackgroundReconciliationRequestCooldownMilliseconds(long cooldownMilliseconds);

/**
* @return the per-coordinator-log request cooldown, in milliseconds, used by background reconciliation
*/
long getMutationTrackingBackgroundReconciliationRequestCooldownMilliseconds();
}
10 changes: 10 additions & 0 deletions src/java/org/apache/cassandra/tools/NodeProbe.java
Original file line number Diff line number Diff line change
Expand Up @@ -2879,6 +2879,16 @@ public void setMutationTrackingBackgroundReconciliationIntervalMilliseconds(long
{
mutationTrackingProxy.setMutationTrackingBackgroundReconciliationIntervalMilliseconds(intervalMilliseconds);
}

public long getMutationTrackingBackgroundReconciliationRequestCooldownMilliseconds()
{
return mutationTrackingProxy.getMutationTrackingBackgroundReconciliationRequestCooldownMilliseconds();
}

public void setMutationTrackingBackgroundReconciliationRequestCooldownMilliseconds(long cooldownMilliseconds)
{
mutationTrackingProxy.setMutationTrackingBackgroundReconciliationRequestCooldownMilliseconds(cooldownMilliseconds);
}
}

class ColumnFamilyStoreMBeanIterator implements Iterator<Map.Entry<String, ColumnFamilyStoreMBean>>
Expand Down
6 changes: 5 additions & 1 deletion src/java/org/apache/cassandra/tools/nodetool/MTAdmin.java
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,7 @@ public void execute(NodeProbe probe)

out.println("background_reconciliation_enabled: " + probe.getMutationTrackingBackgroundReconciliationEnabled());
out.println("background_reconciliation_interval_ms: " + probe.getMutationTrackingBackgroundReconciliationIntervalMilliseconds());
out.println("background_reconciliation_request_cooldown_ms: " + probe.getMutationTrackingBackgroundReconciliationRequestCooldownMilliseconds());
}
}

Expand All @@ -73,7 +74,7 @@ public static class SetConfig extends AbstractCommand

@Parameters(index = "0", arity = "0..1", description = { "Mutation tracking param type.",
"Possible parameters: " +
"[background_reconciliation_enabled|background_reconciliation_interval_ms]" })
"[background_reconciliation_enabled|background_reconciliation_interval_ms|background_reconciliation_request_cooldown_ms]" })
public String paramType;

@Parameters(index = "1", description = "Mutation tracking param value", arity = "0..1")
Expand Down Expand Up @@ -104,6 +105,9 @@ public void execute(NodeProbe probe)
case "background_reconciliation_interval_ms":
probe.setMutationTrackingBackgroundReconciliationIntervalMilliseconds(Long.parseLong(value));
break;
case "background_reconciliation_request_cooldown_ms":
probe.setMutationTrackingBackgroundReconciliationRequestCooldownMilliseconds(Long.parseLong(value));
break;
default:
throw new IllegalArgumentException("Unknown parameter: " + type);
}
Expand Down
Loading