Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -146,7 +146,8 @@ public Collection<SSTableReader> finished()
public SSTableMultiWriter setOpenResult(boolean openResult)
{
finishedWriters.forEach((w) -> w.setOpenResult(openResult));
currentWriter.setOpenResult(openResult);
if (currentWriter != null)
currentWriter.setOpenResult(openResult);
return this;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -88,19 +88,22 @@ public class TrackedImportTransfer extends CoordinatedTransfer

final Collection<SSTableReader> sstables;
private final ConsistencyLevel cl;
final Map<SSTableReader, List<SSTableReader.PartitionPositionBounds>> positionForSSTables;

@VisibleForTesting
TrackedImportTransfer(Range<Token> range, MutationId id)
{
super(id, null, range);
this.sstables = Collections.emptyList();
this.positionForSSTables = Collections.emptyMap();
this.cl = null;
}

TrackedImportTransfer(String keyspace, Range<Token> range, Participants participants, Collection<SSTableReader> sstables, ConsistencyLevel cl, Supplier<MutationId> nextId)
TrackedImportTransfer(String keyspace, Range<Token> range, Participants participants, Collection<SSTableReader> sstables, Map<SSTableReader, List<SSTableReader.PartitionPositionBounds>> positionForSSTables, ConsistencyLevel cl, Supplier<MutationId> nextId)
{
super(nextId.get(), participants, keyspace, range);
this.sstables = sstables;
this.positionForSSTables = positionForSSTables;
this.cl = cl;

ClusterMetadata cm = ClusterMetadata.current();
Expand Down Expand Up @@ -228,8 +231,8 @@ private void maybeCleanupFailedStreams(Throwable cause)
if (!purgeable)
return;

notifyFailure();
TransferTrackingService.instance().scheduleCleanup();
notifyFailure();
}
catch (Throwable t)
{
Expand Down Expand Up @@ -361,7 +364,7 @@ private SingleTransferResult streamTask(InetAddressAndPort to) throws StreamExce
for (SSTableReader sstable : sstables)
{
List<Range<Token>> ranges = Collections.singletonList(range);
List<SSTableReader.PartitionPositionBounds> positions = sstable.getPositionsForRanges(ranges);
List<SSTableReader.PartitionPositionBounds> positions = positionForSSTables.get(sstable);
long estimatedKeys = sstable.estimatedKeysForRanges(ranges);
OutgoingStream stream = new CassandraOutgoingFile(StreamOperation.IMPORT, sstable.ref(), positions, ranges, estimatedKeys);
plan.transferStreams(to, Collections.singleton(stream));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,8 +22,11 @@
import java.io.UncheckedIOException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;

import org.apache.cassandra.db.ConsistencyLevel;
import org.apache.cassandra.db.lifecycle.SSTableIntervalTree;
Expand Down Expand Up @@ -67,10 +70,19 @@ static TrackedImportTransfers create(String keyspace, MutationTrackingService.Ke
shards.forEachShard(shard -> {
Range<Token> range = shard.tokenRange();
Collection<SSTableReader> sstablesForRange = intervals.search(Interval.create(range.left.minKeyBound(), range.right.maxKeyBound()));
List<Range<Token>> ranges = Collections.singletonList(range);
Map<SSTableReader, List<SSTableReader.PartitionPositionBounds>> positionForSSTables = new HashMap<>();
sstablesForRange.removeIf(sstable -> {
List<SSTableReader.PartitionPositionBounds> position = sstable.getPositionsForRanges(ranges);
if (!position.isEmpty())
positionForSSTables.put(sstable, position);
return position.isEmpty();
});

if (sstablesForRange.isEmpty())
return;

TrackedImportTransfer transfer = new TrackedImportTransfer(keyspace, range, shard.participants, sstablesForRange, cl, shard::nextId);
TrackedImportTransfer transfer = new TrackedImportTransfer(keyspace, range, shard.participants, sstablesForRange, positionForSSTables, cl, shard::nextId);
transfers.add(transfer);
});
return new TrackedImportTransfers(transfers);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -351,10 +351,11 @@ public void importReplicaDown() throws Throwable
createSchema(cluster, keyspace);

Iterable<IInvokableInstance> down = Collections.singleton(cluster.get(3));
Iterable<IInvokableInstance> up = cluster.stream().filter(instance -> instance != down).collect(Collectors.toList());
for (IInvokableInstance instance : down)
instance.shutdown().get();

Iterable<IInvokableInstance> up = cluster.stream().filter(instance -> !instance.isShutdown()).collect(Collectors.toList());

doImport(cluster, keyspace);

cluster.get(3).startup();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,13 +19,18 @@
package org.apache.cassandra.distributed.test.tracking;

import java.io.IOException;
import java.nio.file.Files;
import java.time.Duration;
import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
import java.util.concurrent.TimeUnit;
import java.util.function.Consumer;

import com.google.common.util.concurrent.Uninterruptibles;

import org.assertj.core.api.Assertions;
import org.junit.AfterClass;
import org.junit.BeforeClass;
import org.junit.Test;
Expand All @@ -36,7 +41,9 @@
import org.apache.cassandra.distributed.Cluster;
import org.apache.cassandra.distributed.api.ConsistencyLevel;
import org.apache.cassandra.distributed.api.IInvokableInstance;
import org.apache.cassandra.distributed.shared.AssertUtils;
import org.apache.cassandra.distributed.test.sai.SAIUtil;
import org.apache.cassandra.io.sstable.CQLSSTableWriter;
import org.apache.cassandra.io.util.File;
import org.apache.cassandra.tcm.ClusterMetadata;
import org.apache.cassandra.tcm.ownership.DataPlacement;
Expand Down Expand Up @@ -199,4 +206,45 @@ public void importOutOfRange() throws Throwable
});
}
}

@Test
public void importIntervalTreeFalsePositive() throws IOException
{
// See CASSANDRA-21470
String keyspace = "interval_tree_false_positive";
cluster.schemaChange("CREATE KEYSPACE " + keyspace + " WITH replication = {'class': 'SimpleStrategy', 'replication_factor': 3} AND replication_type='tracked';");
cluster.schemaChange("CREATE TABLE " + tableWithKeyspace(keyspace) + " (k BLOB PRIMARY KEY, v INT)");

String file = Files.createTempDirectory(TrackedTransferTestBase.class.getSimpleName()).toString();

// Needs to run outside of instance executor because creates schema
CQLSSTableWriter.Builder builder = CQLSSTableWriter.builder()
.forTable("CREATE TABLE " + tableWithKeyspace(keyspace) + " (k BLOB PRIMARY KEY, v INT)")
.inDirectory(file)
.using("INSERT INTO " + tableWithKeyspace(keyspace) + " (k, v) " + "VALUES (?, ?)");

// For shard (-3074457345618258603,3074457345618258601], this SSTable
// intersects it, but does not contain any values in between the shard.
try (CQLSSTableWriter writer = builder.build())
{
writer.addRow(KEY_100, 1); // -4074457345618258601L
writer.addRow(KEY_300, 1); // 3074457345618258602L
}

// empty
assertLocalSelect(cluster, keyspace, AssertUtils::assertRows);

List<String> failed = cluster.get(1).callOnInstance(() -> {
ColumnFamilyStore cfs = ColumnFamilyStore.getIfExists(keyspace, TABLE);
Set<String> paths = Set.of(file);
logger.info("Importing SSTables {}", paths);
return cfs.importNewSSTables(paths, true, true, true, true, true, true, true);
});

// Sleep for a while to make sure import completes
Uninterruptibles.sleepUninterruptibly(3, TimeUnit.SECONDS);

Assertions.assertThat(failed).isEmpty();
assertLocalSelect(cluster, keyspace, rows -> assertRows(rows, row(KEY_100, 1), row(KEY_300, 1)));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -124,6 +124,10 @@ public abstract class TrackedTransferTestBase extends TestBaseImpl
protected final static Token TOKEN_201 = new Murmur3Partitioner.LongToken(TOKEN_VALUE_201);
protected final static ByteBuffer KEY_201 = Murmur3Partitioner.LongToken.keyForToken(TOKEN_201.getLongValue());

protected final static long TOKEN_VALUE_300 = 3074457345618258602L;
protected final static Token TOKEN_300 = new Murmur3Partitioner.LongToken(TOKEN_VALUE_300);
protected final static ByteBuffer KEY_300 = Murmur3Partitioner.LongToken.keyForToken(TOKEN_300.getLongValue());

protected final static Range<Token> SHARD_ALIGNED_RANGE_2 = new Range<>(new Murmur3Partitioner.LongToken(TOKEN_VALUE_200 - 10), new Murmur3Partitioner.LongToken(TOKEN_VALUE_200 + 10));

static
Expand All @@ -136,6 +140,9 @@ public abstract class TrackedTransferTestBase extends TestBaseImpl

reversed = Murmur3Partitioner.instance.decorateKey(KEY_201);
Assertions.assertThat(reversed.getToken()).isEqualTo(TOKEN_201);

reversed = Murmur3Partitioner.instance.decorateKey(KEY_300);
Assertions.assertThat(reversed.getToken()).isEqualTo(TOKEN_300);
}

protected static Cluster cluster() throws IOException
Expand Down Expand Up @@ -313,7 +320,7 @@ protected static void doImport(Cluster cluster, IInvokableInstance target, Strin

protected static void doImport(Cluster cluster, IInvokableInstance target, Consumer<List<String>> onFailedDirs, String keyspace, @Nullable String createIndexCql) throws IOException
{
String file = Files.createTempDirectory(MutationTrackingTest.class.getSimpleName()).toString();
String file = Files.createTempDirectory(TrackedTransferTestBase.class.getSimpleName()).toString();

// Needs to run outside of instance executor because creates schema
CQLSSTableWriter.Builder builder = CQLSSTableWriter.builder()
Expand Down Expand Up @@ -355,10 +362,6 @@ protected static void assertLocalSelect(Iterable<IInvokableInstance> validate, S
{
for (IInvokableInstance instance : validate)
{
{
Object[][] rows = instance.executeInternal(withKeyspace("SELECT * FROM %s." + TABLE + " WHERE k = 1", keyspace));
onRows.accept(rows);
}
{
Object[][] rows = instance.executeInternal(withKeyspace("SELECT * FROM %s." + TABLE, keyspace));
onRows.accept(rows);
Expand Down