From b0e3efb42e4a7765a3781d854b4aacca028e4c48 Mon Sep 17 00:00:00 2001 From: Alan Wang Date: Tue, 12 May 2026 09:28:43 -0700 Subject: [PATCH 1/5] test --- .../test/tracking/TrackedImportTransferTest.java | 6 +++--- .../distributed/test/tracking/TrackedTransferTestBase.java | 1 + 2 files changed, 4 insertions(+), 3 deletions(-) diff --git a/test/distributed/org/apache/cassandra/distributed/test/tracking/TrackedImportTransferTest.java b/test/distributed/org/apache/cassandra/distributed/test/tracking/TrackedImportTransferTest.java index 3c279afcd5fa..0b9e33360700 100644 --- a/test/distributed/org/apache/cassandra/distributed/test/tracking/TrackedImportTransferTest.java +++ b/test/distributed/org/apache/cassandra/distributed/test/tracking/TrackedImportTransferTest.java @@ -85,14 +85,14 @@ public void importHappyPath() throws Throwable assertCompaction(cluster, keyspace, cluster, TRANSFERS_EXIST, TRANSFERS_EMPTY); // Run after compaction, to enforce offset persistence + broadcast - assertSummary(cluster, keyspace, summary -> { + /*assertSummary(cluster, keyspace, summary -> { assertThat(summary).satisfies(s -> { assert s.reconciledIds() == 1; assert s.unreconciledIds() == 0; }); - }); + });*/ - assertLocalSelect(cluster, keyspace, rows -> assertRows(rows, row(1, 1))); + /*assertLocalSelect(cluster, keyspace, rows -> assertRows(rows, row(1, 1)));*/ } @Test diff --git a/test/distributed/org/apache/cassandra/distributed/test/tracking/TrackedTransferTestBase.java b/test/distributed/org/apache/cassandra/distributed/test/tracking/TrackedTransferTestBase.java index 002dcef5f95d..ddecc8f31c29 100644 --- a/test/distributed/org/apache/cassandra/distributed/test/tracking/TrackedTransferTestBase.java +++ b/test/distributed/org/apache/cassandra/distributed/test/tracking/TrackedTransferTestBase.java @@ -329,6 +329,7 @@ protected static void doImport(Cluster cluster, IInvokableInstance target, Consu try (CQLSSTableWriter writer = builder.build()) { writer.addRow(IMPORT_PK, 1); + writer.addRow(3, 1); } // empty From 01fffd8ec2a93edac6206b2cc8c9c9e349dc7aac Mon Sep 17 00:00:00 2001 From: Alan Wang Date: Tue, 12 May 2026 13:28:15 -0700 Subject: [PATCH 2/5] fix --- .../io/sstable/RangeAwareSSTableWriter.java | 3 +- .../replication/TrackedImportTransfer.java | 7 ++- .../replication/TrackedImportTransfers.java | 14 ++++- .../tracking/TrackedImportTransferTest.java | 51 +++++++++++++++++-- .../tracking/TrackedTransferTestBase.java | 14 ++--- 5 files changed, 76 insertions(+), 13 deletions(-) diff --git a/src/java/org/apache/cassandra/io/sstable/RangeAwareSSTableWriter.java b/src/java/org/apache/cassandra/io/sstable/RangeAwareSSTableWriter.java index a4c15ff53e01..97b6273443d1 100644 --- a/src/java/org/apache/cassandra/io/sstable/RangeAwareSSTableWriter.java +++ b/src/java/org/apache/cassandra/io/sstable/RangeAwareSSTableWriter.java @@ -146,7 +146,8 @@ public Collection finished() public SSTableMultiWriter setOpenResult(boolean openResult) { finishedWriters.forEach((w) -> w.setOpenResult(openResult)); - currentWriter.setOpenResult(openResult); + if (currentWriter != null) + currentWriter.setOpenResult(openResult); return this; } diff --git a/src/java/org/apache/cassandra/replication/TrackedImportTransfer.java b/src/java/org/apache/cassandra/replication/TrackedImportTransfer.java index ee61d47c5fd3..b0225263c7ac 100644 --- a/src/java/org/apache/cassandra/replication/TrackedImportTransfer.java +++ b/src/java/org/apache/cassandra/replication/TrackedImportTransfer.java @@ -88,19 +88,22 @@ public class TrackedImportTransfer extends CoordinatedTransfer final Collection sstables; private final ConsistencyLevel cl; + final Map> positionForSSTables; @VisibleForTesting TrackedImportTransfer(Range range, MutationId id) { super(id, null, range); this.sstables = Collections.emptyList(); + this.positionForSSTables = Collections.emptyMap(); this.cl = null; } - TrackedImportTransfer(String keyspace, Range range, Participants participants, Collection sstables, ConsistencyLevel cl, Supplier nextId) + TrackedImportTransfer(String keyspace, Range range, Participants participants, Collection sstables, Map> positionForSSTables, ConsistencyLevel cl, Supplier nextId) { super(nextId.get(), participants, keyspace, range); this.sstables = sstables; + this.positionForSSTables = positionForSSTables; this.cl = cl; ClusterMetadata cm = ClusterMetadata.current(); @@ -361,7 +364,7 @@ private SingleTransferResult streamTask(InetAddressAndPort to) throws StreamExce for (SSTableReader sstable : sstables) { List> ranges = Collections.singletonList(range); - List positions = sstable.getPositionsForRanges(ranges); + List positions = positionForSSTables.get(sstable); long estimatedKeys = sstable.estimatedKeysForRanges(ranges); OutgoingStream stream = new CassandraOutgoingFile(StreamOperation.IMPORT, sstable.ref(), positions, ranges, estimatedKeys); plan.transferStreams(to, Collections.singleton(stream)); diff --git a/src/java/org/apache/cassandra/replication/TrackedImportTransfers.java b/src/java/org/apache/cassandra/replication/TrackedImportTransfers.java index 250c240e0ae5..b2974df4c6c8 100644 --- a/src/java/org/apache/cassandra/replication/TrackedImportTransfers.java +++ b/src/java/org/apache/cassandra/replication/TrackedImportTransfers.java @@ -22,8 +22,11 @@ import java.io.UncheckedIOException; import java.util.ArrayList; import java.util.Collection; +import java.util.Collections; +import java.util.HashMap; import java.util.Iterator; import java.util.List; +import java.util.Map; import org.apache.cassandra.db.ConsistencyLevel; import org.apache.cassandra.db.lifecycle.SSTableIntervalTree; @@ -67,10 +70,19 @@ static TrackedImportTransfers create(String keyspace, MutationTrackingService.Ke shards.forEachShard(shard -> { Range range = shard.tokenRange(); Collection sstablesForRange = intervals.search(Interval.create(range.left.minKeyBound(), range.right.maxKeyBound())); + List> ranges = Collections.singletonList(range); + Map> positionForSSTables = new HashMap<>(); + sstablesForRange.removeIf(sstable -> { + List position = sstable.getPositionsForRanges(ranges); + if (!position.isEmpty()) + positionForSSTables.put(sstable, position); + return position.isEmpty(); + }); + if (sstablesForRange.isEmpty()) return; - TrackedImportTransfer transfer = new TrackedImportTransfer(keyspace, range, shard.participants, sstablesForRange, cl, shard::nextId); + TrackedImportTransfer transfer = new TrackedImportTransfer(keyspace, range, shard.participants, sstablesForRange, positionForSSTables, cl, shard::nextId); transfers.add(transfer); }); return new TrackedImportTransfers(transfers); diff --git a/test/distributed/org/apache/cassandra/distributed/test/tracking/TrackedImportTransferTest.java b/test/distributed/org/apache/cassandra/distributed/test/tracking/TrackedImportTransferTest.java index 0b9e33360700..c0ad3d9c9640 100644 --- a/test/distributed/org/apache/cassandra/distributed/test/tracking/TrackedImportTransferTest.java +++ b/test/distributed/org/apache/cassandra/distributed/test/tracking/TrackedImportTransferTest.java @@ -19,13 +19,18 @@ package org.apache.cassandra.distributed.test.tracking; import java.io.IOException; +import java.nio.file.Files; import java.time.Duration; import java.util.Collections; import java.util.HashSet; import java.util.List; import java.util.Set; +import java.util.concurrent.TimeUnit; import java.util.function.Consumer; +import com.google.common.util.concurrent.Uninterruptibles; + +import org.assertj.core.api.Assertions; import org.junit.AfterClass; import org.junit.BeforeClass; import org.junit.Test; @@ -36,7 +41,9 @@ import org.apache.cassandra.distributed.Cluster; import org.apache.cassandra.distributed.api.ConsistencyLevel; import org.apache.cassandra.distributed.api.IInvokableInstance; +import org.apache.cassandra.distributed.shared.AssertUtils; import org.apache.cassandra.distributed.test.sai.SAIUtil; +import org.apache.cassandra.io.sstable.CQLSSTableWriter; import org.apache.cassandra.io.util.File; import org.apache.cassandra.tcm.ClusterMetadata; import org.apache.cassandra.tcm.ownership.DataPlacement; @@ -85,14 +92,14 @@ public void importHappyPath() throws Throwable assertCompaction(cluster, keyspace, cluster, TRANSFERS_EXIST, TRANSFERS_EMPTY); // Run after compaction, to enforce offset persistence + broadcast - /*assertSummary(cluster, keyspace, summary -> { + assertSummary(cluster, keyspace, summary -> { assertThat(summary).satisfies(s -> { assert s.reconciledIds() == 1; assert s.unreconciledIds() == 0; }); - });*/ + }); - /*assertLocalSelect(cluster, keyspace, rows -> assertRows(rows, row(1, 1)));*/ + assertLocalSelect(cluster, keyspace, rows -> assertRows(rows, row(1, 1))); } @Test @@ -199,4 +206,42 @@ public void importOutOfRange() throws Throwable }); } } + + @Test + public void importIntervalTreeFalsePositive() throws IOException + { + String keyspace = "interval_tree_false_positive"; + cluster.schemaChange("CREATE KEYSPACE " + keyspace + " WITH replication = {'class': 'SimpleStrategy', 'replication_factor': 3} AND replication_type='tracked';"); + cluster.schemaChange("CREATE TABLE " + tableWithKeyspace(keyspace) + " (k BLOB PRIMARY KEY, v INT)"); + + String file = Files.createTempDirectory(TrackedTransferTestBase.class.getSimpleName()).toString(); + + // Needs to run outside of instance executor because creates schema + CQLSSTableWriter.Builder builder = CQLSSTableWriter.builder() + .forTable("CREATE TABLE " + tableWithKeyspace(keyspace) + " (k BLOB PRIMARY KEY, v INT)") + .inDirectory(file) + .using("INSERT INTO " + tableWithKeyspace(keyspace) + " (k, v) " + "VALUES (?, ?)"); + + try (CQLSSTableWriter writer = builder.build()) + { + writer.addRow(KEY_100, 1); + writer.addRow(KEY_300, 1); + } + + // empty + assertLocalSelect(cluster, keyspace, AssertUtils::assertRows); + + List failed = cluster.get(1).callOnInstance(() -> { + ColumnFamilyStore cfs = ColumnFamilyStore.getIfExists(keyspace, TABLE); + Set paths = Set.of(file); + logger.info("Importing SSTables {}", paths); + return cfs.importNewSSTables(paths, true, true, true, true, true, true, true); + }); + + // Sleep for a while to make sure import completes + Uninterruptibles.sleepUninterruptibly(3, TimeUnit.SECONDS); + + Assertions.assertThat(failed).isEmpty(); + assertLocalSelect(cluster, keyspace, rows -> assertRows(rows, row(KEY_100, 1), row(KEY_300, 1))); + } } diff --git a/test/distributed/org/apache/cassandra/distributed/test/tracking/TrackedTransferTestBase.java b/test/distributed/org/apache/cassandra/distributed/test/tracking/TrackedTransferTestBase.java index ddecc8f31c29..c292b8ed79da 100644 --- a/test/distributed/org/apache/cassandra/distributed/test/tracking/TrackedTransferTestBase.java +++ b/test/distributed/org/apache/cassandra/distributed/test/tracking/TrackedTransferTestBase.java @@ -124,6 +124,10 @@ public abstract class TrackedTransferTestBase extends TestBaseImpl protected final static Token TOKEN_201 = new Murmur3Partitioner.LongToken(TOKEN_VALUE_201); protected final static ByteBuffer KEY_201 = Murmur3Partitioner.LongToken.keyForToken(TOKEN_201.getLongValue()); + protected final static long TOKEN_VALUE_300 = 3074457345618258602L; + protected final static Token TOKEN_300 = new Murmur3Partitioner.LongToken(TOKEN_VALUE_300); + protected final static ByteBuffer KEY_300 = Murmur3Partitioner.LongToken.keyForToken(TOKEN_300.getLongValue()); + protected final static Range SHARD_ALIGNED_RANGE_2 = new Range<>(new Murmur3Partitioner.LongToken(TOKEN_VALUE_200 - 10), new Murmur3Partitioner.LongToken(TOKEN_VALUE_200 + 10)); static @@ -136,6 +140,9 @@ public abstract class TrackedTransferTestBase extends TestBaseImpl reversed = Murmur3Partitioner.instance.decorateKey(KEY_201); Assertions.assertThat(reversed.getToken()).isEqualTo(TOKEN_201); + + reversed = Murmur3Partitioner.instance.decorateKey(KEY_300); + Assertions.assertThat(reversed.getToken()).isEqualTo(TOKEN_300); } protected static Cluster cluster() throws IOException @@ -313,7 +320,7 @@ protected static void doImport(Cluster cluster, IInvokableInstance target, Strin protected static void doImport(Cluster cluster, IInvokableInstance target, Consumer> onFailedDirs, String keyspace, @Nullable String createIndexCql) throws IOException { - String file = Files.createTempDirectory(MutationTrackingTest.class.getSimpleName()).toString(); + String file = Files.createTempDirectory(TrackedTransferTestBase.class.getSimpleName()).toString(); // Needs to run outside of instance executor because creates schema CQLSSTableWriter.Builder builder = CQLSSTableWriter.builder() @@ -329,7 +336,6 @@ protected static void doImport(Cluster cluster, IInvokableInstance target, Consu try (CQLSSTableWriter writer = builder.build()) { writer.addRow(IMPORT_PK, 1); - writer.addRow(3, 1); } // empty @@ -356,10 +362,6 @@ protected static void assertLocalSelect(Iterable validate, S { for (IInvokableInstance instance : validate) { - { - Object[][] rows = instance.executeInternal(withKeyspace("SELECT * FROM %s." + TABLE + " WHERE k = 1", keyspace)); - onRows.accept(rows); - } { Object[][] rows = instance.executeInternal(withKeyspace("SELECT * FROM %s." + TABLE, keyspace)); onRows.accept(rows); From 5eac3e52d16e8455d321ce3d3debc573b29f2164 Mon Sep 17 00:00:00 2001 From: Alan Wang Date: Wed, 3 Jun 2026 11:13:23 -0700 Subject: [PATCH 3/5] reorder so local cleanup always occurs --- .../org/apache/cassandra/replication/TrackedImportTransfer.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/java/org/apache/cassandra/replication/TrackedImportTransfer.java b/src/java/org/apache/cassandra/replication/TrackedImportTransfer.java index b0225263c7ac..8c1b521a49b3 100644 --- a/src/java/org/apache/cassandra/replication/TrackedImportTransfer.java +++ b/src/java/org/apache/cassandra/replication/TrackedImportTransfer.java @@ -231,8 +231,8 @@ private void maybeCleanupFailedStreams(Throwable cause) if (!purgeable) return; - notifyFailure(); TransferTrackingService.instance().scheduleCleanup(); + notifyFailure(); } catch (Throwable t) { From 003baa1ea69dd5e78cd5f8e827a5def4e13d029d Mon Sep 17 00:00:00 2001 From: Alan Wang Date: Wed, 3 Jun 2026 11:17:56 -0700 Subject: [PATCH 4/5] test fix --- .../distributed/test/tracking/TrackedImportFailureTest.java | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/test/distributed/org/apache/cassandra/distributed/test/tracking/TrackedImportFailureTest.java b/test/distributed/org/apache/cassandra/distributed/test/tracking/TrackedImportFailureTest.java index eb1624d77375..9d6d0a8f7273 100644 --- a/test/distributed/org/apache/cassandra/distributed/test/tracking/TrackedImportFailureTest.java +++ b/test/distributed/org/apache/cassandra/distributed/test/tracking/TrackedImportFailureTest.java @@ -351,10 +351,11 @@ public void importReplicaDown() throws Throwable createSchema(cluster, keyspace); Iterable down = Collections.singleton(cluster.get(3)); - Iterable up = cluster.stream().filter(instance -> instance != down).collect(Collectors.toList()); for (IInvokableInstance instance : down) instance.shutdown().get(); + Iterable up = cluster.stream().filter(instance -> !instance.isShutdown()).collect(Collectors.toList()); + doImport(cluster, keyspace); cluster.get(3).startup(); From e3804cb207da29758b72125dc4eed591c0883218 Mon Sep 17 00:00:00 2001 From: Alan Wang Date: Wed, 17 Jun 2026 16:24:25 -0700 Subject: [PATCH 5/5] add comment --- .../test/tracking/TrackedImportTransferTest.java | 7 +++++-- 1 file changed, 5 insertions(+), 2 deletions(-) diff --git a/test/distributed/org/apache/cassandra/distributed/test/tracking/TrackedImportTransferTest.java b/test/distributed/org/apache/cassandra/distributed/test/tracking/TrackedImportTransferTest.java index c0ad3d9c9640..8f92ebc2f3de 100644 --- a/test/distributed/org/apache/cassandra/distributed/test/tracking/TrackedImportTransferTest.java +++ b/test/distributed/org/apache/cassandra/distributed/test/tracking/TrackedImportTransferTest.java @@ -210,6 +210,7 @@ public void importOutOfRange() throws Throwable @Test public void importIntervalTreeFalsePositive() throws IOException { + // See CASSANDRA-21470 String keyspace = "interval_tree_false_positive"; cluster.schemaChange("CREATE KEYSPACE " + keyspace + " WITH replication = {'class': 'SimpleStrategy', 'replication_factor': 3} AND replication_type='tracked';"); cluster.schemaChange("CREATE TABLE " + tableWithKeyspace(keyspace) + " (k BLOB PRIMARY KEY, v INT)"); @@ -222,10 +223,12 @@ public void importIntervalTreeFalsePositive() throws IOException .inDirectory(file) .using("INSERT INTO " + tableWithKeyspace(keyspace) + " (k, v) " + "VALUES (?, ?)"); + // For shard (-3074457345618258603,3074457345618258601], this SSTable + // intersects it, but does not contain any values in between the shard. try (CQLSSTableWriter writer = builder.build()) { - writer.addRow(KEY_100, 1); - writer.addRow(KEY_300, 1); + writer.addRow(KEY_100, 1); // -4074457345618258601L + writer.addRow(KEY_300, 1); // 3074457345618258602L } // empty