Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions src/java/org/apache/cassandra/db/SystemPeersValidator.java
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@
import org.apache.cassandra.locator.InetAddressAndPort;
import org.apache.cassandra.schema.SchemaConstants;
import org.apache.cassandra.tcm.ClusterMetadata;
import org.apache.cassandra.tcm.Epoch;
import org.apache.cassandra.tcm.membership.Location;
import org.apache.cassandra.tcm.membership.NodeAddresses;
import org.apache.cassandra.tcm.membership.NodeId;
Expand Down Expand Up @@ -68,6 +69,8 @@ public final class SystemPeersValidator

public static void validateAndRepair(ClusterMetadata metadata)
{
if (metadata.epoch.isBefore(Epoch.FIRST))
return;
Map<InetAddressAndPort, UntypedResultSet.Row> peersV2Rows = getPeersV2Rows();
Map<InetAddress, UntypedResultSet.Row> peersRows = getPeersRows();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@
import org.apache.cassandra.tcm.transformations.cms.PreInitialize;
import org.apache.cassandra.utils.JVMStabilityInspector;

import static org.apache.cassandra.schema.SchemaConstants.METADATA_KEYSPACE_NAME;
import static org.apache.cassandra.tcm.Epoch.FIRST;

public final class DistributedMetadataLogKeyspace
Expand All @@ -65,7 +66,7 @@ private DistributedMetadataLogKeyspace(){}
*/
public static final long GENERATION = 0;

public static final TableId LOG_TABLE_ID = TableId.unsafeDeterministic(SchemaConstants.METADATA_KEYSPACE_NAME, TABLE_NAME);
public static final TableId LOG_TABLE_ID = TableId.unsafeDeterministic(METADATA_KEYSPACE_NAME, TABLE_NAME);
public static final String LOG_TABLE_CQL = "CREATE TABLE %s.%s ("
+ "epoch bigint,"
+ "entry_id bigint,"
Expand All @@ -80,16 +81,16 @@ private DistributedMetadataLogKeyspace(){}
"compaction_window_size","1")))
.build();

public static boolean initialize() throws IOException
public static boolean insertPreInitialize(PreInitialize preInit) throws IOException
{
try
{
String init = String.format("INSERT INTO %s.%s (epoch, transformation, kind, entry_id) " +
"VALUES(?, ?, ?, ?) " +
"IF NOT EXISTS", SchemaConstants.METADATA_KEYSPACE_NAME, TABLE_NAME);
"IF NOT EXISTS", METADATA_KEYSPACE_NAME, TABLE_NAME);
UntypedResultSet result = QueryProcessor.execute(init, ConsistencyLevel.QUORUM,
FIRST.getEpoch(),
Transformation.Kind.PRE_INITIALIZE_CMS.toVersionedBytes(PreInitialize.blank()),
Transformation.Kind.PRE_INITIALIZE_CMS.toVersionedBytes(preInit),
Transformation.Kind.PRE_INITIALIZE_CMS.id,
Entry.Id.NONE.entryId);

Expand Down Expand Up @@ -124,21 +125,25 @@ public static boolean tryCommit(Entry.Id entryId,
{
try
{
if (previousEpoch.is(FIRST) && !initialize())
// log is not initialized yet this is unexpected
if (previousEpoch.isBefore(FIRST))
{
logger.warn("Previous epoch {} indicates that the {} has not been initialized yet, " +
"not committing entry {}/{} at epoch {}",
previousEpoch, METADATA_KEYSPACE_NAME, entryId, transform, nextEpoch);
return false;
}

// TODO get lowest supported metadata version from ClusterMetadata
ByteBuffer serializedEvent = transform.kind().toVersionedBytes(transform);

ByteBuffer serializedTransform = transform.kind().toVersionedBytes(transform);
String query = String.format("INSERT INTO %s.%s (epoch, entry_id, transformation, kind) " +
"VALUES (?, ?, ?, ?) " +
"IF NOT EXISTS;",
SchemaConstants.METADATA_KEYSPACE_NAME, TABLE_NAME);
METADATA_KEYSPACE_NAME, TABLE_NAME);
UntypedResultSet result = QueryProcessor.execute(query,
ConsistencyLevel.QUORUM,
nextEpoch.getEpoch(),
entryId.entryId,
serializedEvent,
serializedTransform,
transform.kind().id);

return result.one().getBoolean("[applied]");
Expand Down Expand Up @@ -186,7 +191,7 @@ public EntryHolder getEntries(Epoch since) throws IOException
// note that we want all entries with epoch >= since - but since we use a reverse partitioner, we actually
// want all entries where the token is less than token(since)
UntypedResultSet resultSet = execute(String.format("SELECT epoch, kind, transformation, entry_id FROM %s.%s WHERE token(epoch) <= token(?)",
SchemaConstants.METADATA_KEYSPACE_NAME, TABLE_NAME),
METADATA_KEYSPACE_NAME, TABLE_NAME),
consistencyLevel, since.getEpoch());
EntryHolder entryHolder = new EntryHolder(since);
for (UntypedResultSet.Row row : resultSet)
Expand Down Expand Up @@ -237,15 +242,15 @@ private static UntypedResultSet execute(String query, ConsistencyLevel cl, Objec

private static TableMetadata.Builder parse(String cql, String table, String description)
{
return CreateTableStatement.parse(String.format(cql, SchemaConstants.METADATA_KEYSPACE_NAME, table), SchemaConstants.METADATA_KEYSPACE_NAME)
return CreateTableStatement.parse(String.format(cql, METADATA_KEYSPACE_NAME, table), METADATA_KEYSPACE_NAME)
.id(LOG_TABLE_ID)
.epoch(FIRST)
.comment(description);
}

public static KeyspaceMetadata initialMetadata(Set<String> knownDatacenters)
{
return KeyspaceMetadata.create(SchemaConstants.METADATA_KEYSPACE_NAME, new KeyspaceParams(true, ReplicationParams.simpleMeta(1, knownDatacenters), FastPathStrategy.simple()), Tables.of(Log));
return KeyspaceMetadata.create(METADATA_KEYSPACE_NAME, new KeyspaceParams(true, ReplicationParams.simpleMeta(1, knownDatacenters), FastPathStrategy.simple()), Tables.of(Log));
}

public static KeyspaceMetadata initialMetadata(String datacenter)
Expand Down
63 changes: 25 additions & 38 deletions src/java/org/apache/cassandra/schema/DistributedSchema.java
Original file line number Diff line number Diff line change
Expand Up @@ -21,14 +21,13 @@
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.Set;
import java.util.UUID;
import java.util.stream.Collectors;

import com.google.common.base.Preconditions;
import com.google.common.collect.ImmutableList;
Expand Down Expand Up @@ -65,25 +64,6 @@ public static DistributedSchema empty()
return new DistributedSchema(Keyspaces.none(), Epoch.EMPTY);
}

public static DistributedSchema first(Set<String> knownDatacenters)
{
// During upgrades from pre-6.0 versions, the replication params of the system_cluster_metadata
// keyspace using one of the existing DCs. This is so that this keyspace does not cause issues
// for tooling, clients or control plane systems which may inspect schema and have specific
// expectations about DC layout. This keyspace is unused until the CMS is initialized.
// For new clusters which start out on 6.0 or later, this is not necessary to the initial
// replication params use a empty string for the placeholder DC name.

// During CMS initialization, the replication of this keyspace will be set for real using
// the DC of the first node to become a CMS member. This happens in the PreInitialize
// transformation when executed on the first CMS member.

if (knownDatacenters.isEmpty())
knownDatacenters = Collections.singleton("");

return new DistributedSchema(Keyspaces.of(DistributedMetadataLogKeyspace.initialMetadata(knownDatacenters)), Epoch.FIRST);
}

private static ImmutableMap<TableId, TableMetadata> keyspacesToTableMap(Keyspaces keyspaces)
{
ImmutableMap.Builder<TableId, TableMetadata> builder = ImmutableMap.builder();
Expand Down Expand Up @@ -159,28 +139,23 @@ public boolean hasAccordKeyspaces()
* @deprecated since TCM, used on upgrade from gossip to populate system schema tables with the correct generation
*/
@Deprecated(since = "TCM")
public static List<Pair<KeyspaceMetadata, Long>> distributedKeyspacesWithGeneration(Set<String> knownDatacenters)
public static List<Pair<KeyspaceMetadata, Long>> distributedKeyspacesWithGeneration()
{
return ImmutableList.of(Pair.create(DistributedMetadataLogKeyspace.initialMetadata(knownDatacenters), DistributedMetadataLogKeyspace.GENERATION),
Pair.create(TraceKeyspace.metadata(), TraceKeyspace.GENERATION),
return ImmutableList.of(Pair.create(TraceKeyspace.metadata(), TraceKeyspace.GENERATION),
Pair.create(SystemDistributedKeyspace.metadata(), SystemDistributedKeyspace.GENERATION),
Pair.create(AuthKeyspace.metadata(),AuthKeyspace.GENERATION));
}

public static DistributedSchema fromSystemTables(Keyspaces keyspaces, Set<String> knownDatacenters)
public static DistributedSchema fromSystemTables(Keyspaces keyspaces)
{
if (!keyspaces.containsKeyspace(SchemaConstants.METADATA_KEYSPACE_NAME))
{
Keyspaces kss = Keyspaces.none();
for (Pair<KeyspaceMetadata, Long> ksmGen : distributedKeyspacesWithGeneration(knownDatacenters))
kss = kss.with(ksmGen.left);
for (KeyspaceMetadata ksm : keyspaces) // on disk keyspaces
kss = kss.withAddedOrUpdated(kss.get(ksm.name)
.map(k -> merged(ksm, k))
.orElse(ksm));
keyspaces = kss;
}
return new DistributedSchema(keyspaces, Epoch.UPGRADE_GOSSIP);
Keyspaces kss = Keyspaces.none();
for (Pair<KeyspaceMetadata, Long> ksmGen : distributedKeyspacesWithGeneration())
kss = kss.with(ksmGen.left);
for (KeyspaceMetadata ksm : keyspaces) // on disk keyspaces
kss = kss.withAddedOrUpdated(kss.get(ksm.name)
.map(k -> merged(ksm, k))
.orElse(ksm));
return new DistributedSchema(kss, Epoch.UPGRADE_GOSSIP);
}

/**
Expand Down Expand Up @@ -268,7 +243,8 @@ public void initializeKeyspaceInstances(DistributedSchema prev, boolean loadSSTa
}
});

// Avoid system table side effects during initialization
// Avoid system table side effects before initialization, otherwise mismatching schema can block CMS
// progress as nodes report disagreement. Also, tooling should not mutate system tables.
if (epoch.isEqualOrAfter(Epoch.FIRST) && !DatabaseDescriptor.isClientOrToolInitialized())
{
Collection<Mutation> mutations = SchemaKeyspace.convertSchemaDiffToMutations(ksDiff, FBUtilities.timestampMicros());
Expand Down Expand Up @@ -481,6 +457,17 @@ private void validate()
});
}

public String conciseToString()
{
return keyspaces.stream()
.map(ksm -> ksm.name + ": {" +
ksm.tables.stream()
.map(t -> t.name)
.collect(Collectors.joining(","))
+ '}')
.collect(Collectors.joining(","));
}

public static class Serializer implements MetadataSerializer<DistributedSchema>
{
public void serialize(DistributedSchema t, DataOutputPlus out, Version version) throws IOException
Expand Down
14 changes: 11 additions & 3 deletions src/java/org/apache/cassandra/service/StorageService.java
Original file line number Diff line number Diff line change
Expand Up @@ -2151,9 +2151,17 @@ private EndpointsByRange constructRangeToEndpointMap(String keyspace, List<Range
else
{
// Handling the keyspaces which are not handled by CMS like system keyspace which uses LocalStrategy.
AbstractReplicationStrategy strategy = Keyspace.open(keyspace).getReplicationStrategy();
for (Range<Token> range : ranges)
rangeToEndpointMap.put(range, strategy.calculateNaturalReplicas(range.right, metadata));
Keyspace ks = Keyspace.openIfExists(keyspace);
if (ks != null)
{
AbstractReplicationStrategy strategy = ks.getReplicationStrategy();
for (Range<Token> range : ranges)
rangeToEndpointMap.put(range, strategy.calculateNaturalReplicas(range.right, metadata));
}
else
{
throw new IllegalArgumentException("Unknown keyspace " + keyspace);
}
}

return new EndpointsByRange(rangeToEndpointMap);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -196,4 +196,4 @@ private LogState toLogState(Epoch lastKnown)

public abstract ClusterMetadata fetchLogAndWait(Epoch waitFor, Retry retryPolicy);
protected abstract boolean tryCommitOne(Entry.Id entryId, Transformation transform, Epoch previousEpoch, Epoch nextEpoch);
}
}
4 changes: 2 additions & 2 deletions src/java/org/apache/cassandra/tcm/CMSOperations.java
Original file line number Diff line number Diff line change
Expand Up @@ -154,14 +154,14 @@ public Map<String, String> describeCMS()
ClusterMetadata metadata = ClusterMetadata.current();
String members = metadata.fullCMSMembers().stream().sorted().map(Object::toString).collect(Collectors.joining(","));
info.put(MEMBERS, members);
info.put(NEEDS_RECONFIGURATION, Boolean.toString(needsReconfiguration(metadata)));
info.put(NEEDS_RECONFIGURATION, Boolean.toString(metadata.epoch.isBefore(Epoch.FIRST) || needsReconfiguration(metadata)));
info.put(IS_MEMBER, Boolean.toString(cms.isCurrentMember(FBUtilities.getBroadcastAddressAndPort())));
info.put(SERVICE_STATE, ClusterMetadataService.state(metadata).toString());
info.put(IS_MIGRATING, Boolean.toString(cms.isMigrating()));
info.put(EPOCH, Long.toString(metadata.epoch.getEpoch()));
info.put(LOCAL_PENDING, Integer.toString(cms.log().pendingBufferSize()));
info.put(COMMITS_PAUSED, Boolean.toString(cms.commitsPaused()));
info.put(REPLICATION_FACTOR, ReplicationParams.meta(metadata).toString());
info.put(REPLICATION_FACTOR, metadata.epoch.isBefore(Epoch.FIRST) ? "" : ReplicationParams.meta(metadata).toString());
info.put(CMS_ID, Integer.toString(metadata.metadataIdentifier));
return info;
}
Expand Down
23 changes: 21 additions & 2 deletions src/java/org/apache/cassandra/tcm/ClusterMetadata.java
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
Expand Down Expand Up @@ -131,7 +132,7 @@ public ClusterMetadata(IPartitioner partitioner)
@VisibleForTesting
public ClusterMetadata(IPartitioner partitioner, Directory directory)
{
this(partitioner, directory, DistributedSchema.first(directory.knownDatacenters()));
this(partitioner, directory, DistributedSchema.empty());
}

@VisibleForTesting
Expand Down Expand Up @@ -216,20 +217,29 @@ private ClusterMetadata(int metadataIdentifier,

public Set<InetAddressAndPort> fullCMSMembers()
{
if (epoch.isBefore(Epoch.FIRST))
return Collections.emptySet();

if (fullCMSEndpoints == null)
this.fullCMSEndpoints = ImmutableSet.copyOf(placements.get(ReplicationParams.meta(this)).reads.byEndpoint().keySet());
return fullCMSEndpoints;
}

public Set<NodeId> fullCMSMemberIds()
{
if (epoch.isBefore(Epoch.FIRST))
return Collections.emptySet();

if (fullCMSIds == null)
this.fullCMSIds = placements.get(ReplicationParams.meta(this)).reads.byEndpoint().keySet().stream().map(directory::peerId).collect(toImmutableSet());
return fullCMSIds;
}

public EndpointsForRange fullCMSMembersAsReplicas()
{
if (epoch.isBefore(Epoch.FIRST))
return EndpointsForRange.empty(MetaStrategy.entireRange);

if (fullCMSReplicas == null)
fullCMSReplicas = placements.get(ReplicationParams.meta(this)).reads.forRange(MetaStrategy.entireRange).get();
return fullCMSReplicas;
Expand Down Expand Up @@ -309,7 +319,7 @@ private static Map<ExtensionKey<?,?>, ExtensionValue<?>> capLastModified(Map<Ext
@SuppressWarnings("unchecked")
private static <V> V capLastModified(MetadataValue<V> value, Epoch maxEpoch)
{
return value == null || value.lastModified().isEqualOrBefore(maxEpoch)
return value == null || (value.lastModified().isEqualOrAfter(Epoch.EMPTY) && value.lastModified().isEqualOrBefore(maxEpoch))
? (V)value
: value.withLastModified(maxEpoch);
}
Expand Down Expand Up @@ -946,6 +956,15 @@ public String toString()
'}';
}

public String conciseToString()
{
return "ClusterMetadata{" + "epoch=" + epoch +
", schema=" + schema.conciseToString() +
", directory=" + directory.conciseToString(tokenMap.asMap()) +
", placements=" + placements.conciseToString() +
'}';
}

@Override
public boolean equals(Object o)
{
Expand Down
Loading