From a1353204ee585e0f073db5546fac5366cb7fb13a Mon Sep 17 00:00:00 2001 From: Alex Petrov Date: Fri, 17 Apr 2026 12:07:30 +0200 Subject: [PATCH 1/6] [CASSANDRA-21477] Correctly invalidate cached KeyspaceMetadata when in a pre-initialized state --- .../apache/cassandra/tcm/ClusterMetadata.java | 2 +- ...usterMetadataUpgradeDC2InitializeTest.java | 64 +++++++++++++++++++ 2 files changed, 65 insertions(+), 1 deletion(-) create mode 100644 test/distributed/org/apache/cassandra/distributed/upgrade/ClusterMetadataUpgradeDC2InitializeTest.java diff --git a/src/java/org/apache/cassandra/tcm/ClusterMetadata.java b/src/java/org/apache/cassandra/tcm/ClusterMetadata.java index 5645b439e4b3..bb4f31e71cdf 100644 --- a/src/java/org/apache/cassandra/tcm/ClusterMetadata.java +++ b/src/java/org/apache/cassandra/tcm/ClusterMetadata.java @@ -309,7 +309,7 @@ private static Map, ExtensionValue> capLastModified(Map V capLastModified(MetadataValue value, Epoch maxEpoch) { - return value == null || value.lastModified().isEqualOrBefore(maxEpoch) + return value == null || (value.lastModified().isEqualOrAfter(Epoch.EMPTY) && value.lastModified().isEqualOrBefore(maxEpoch)) ? (V)value : value.withLastModified(maxEpoch); } diff --git a/test/distributed/org/apache/cassandra/distributed/upgrade/ClusterMetadataUpgradeDC2InitializeTest.java b/test/distributed/org/apache/cassandra/distributed/upgrade/ClusterMetadataUpgradeDC2InitializeTest.java new file mode 100644 index 000000000000..cf4e47e152db --- /dev/null +++ b/test/distributed/org/apache/cassandra/distributed/upgrade/ClusterMetadataUpgradeDC2InitializeTest.java @@ -0,0 +1,64 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.cassandra.distributed.upgrade; + +import com.google.common.collect.ImmutableMap; + +import org.junit.Test; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import org.apache.cassandra.distributed.Constants; +import org.apache.cassandra.distributed.api.Feature; +import org.apache.cassandra.distributed.api.IInvokableInstance; +import org.apache.cassandra.distributed.shared.ClusterUtils; +import org.apache.cassandra.distributed.shared.NetworkTopology; + +import static org.junit.Assert.assertFalse; + +public class ClusterMetadataUpgradeDC2InitializeTest extends UpgradeTestBase +{ + public static final Logger logger = LoggerFactory.getLogger(ClusterMetadataUpgradeDC2InitializeTest.class); + @Test + public void testCMSInitializeOnDC2NodeAfterUpgrade() throws Throwable + { + new TestCase() + .nodes(2) + .nodesToUpgrade(1, 2) + .withNodeIdTopology(ImmutableMap.of(1, NetworkTopology.dcAndRack("dc1", "rack1"), + 2, NetworkTopology.dcAndRack("dc2", "rack2"))) + .withConfig((cfg) -> cfg.with(Feature.NETWORK, Feature.GOSSIP) + .set(Constants.KEY_DTEST_FULL_STARTUP, true)) + .upgradesToCurrentFrom(v50) + .setup((cluster) -> { + }) + .runBeforeClusterUpgrade(cluster -> { + cluster.forEach(node -> { + node.flush("system"); + }); + }) + .runAfterClusterUpgrade((cluster) -> { + // Run cms initialize on the node in dc2 (node 2) instead of dc1. + cluster.get(2).nodetoolResult("cms", "initialize").asserts().success(); + + cluster.forEach(i -> assertFalse("node " + i.config().num() + " is still in MIGRATING STATE", + ClusterUtils.isMigrating((IInvokableInstance) i))); + }).run(); + } +} From bccf1f23f634b382a8685de8692abd9385888510 Mon Sep 17 00:00:00 2001 From: Sam Tunnicliffe Date: Thu, 16 Apr 2026 15:53:05 +0100 Subject: [PATCH 2/6] [CASSANDRA-21477] Defer the creation of system_cluster_metadata keyspace until CMS initialization Also allows the actual PreInit entry to be included in the dist log table by means of a post-log-bootstrap callback. * Remove implicit insert on subsequent commit, i.e. of the Initialize entry * Enables the full specifics of PreInit to be encoded in the serialized form, removing the special casing for first-cms-member and other nodes Co-authored-by: Alex Petrov Co-authored-by: Marcus Eriksson [CASSANDRA-21477] PreInitialize serialization change in breaking bootstraps --- .../DistributedMetadataLogKeyspace.java | 31 +++-- .../cassandra/schema/DistributedSchema.java | 63 ++++------ .../cassandra/service/StorageService.java | 14 ++- .../cassandra/tcm/AbstractLocalProcessor.java | 2 +- .../apache/cassandra/tcm/CMSOperations.java | 4 +- .../apache/cassandra/tcm/ClusterMetadata.java | 21 +++- .../cassandra/tcm/ClusterMetadataService.java | 64 +++++++++- .../org/apache/cassandra/tcm/Startup.java | 25 ++-- .../tcm/compatibility/GossipHelper.java | 4 +- .../apache/cassandra/tcm/log/LocalLog.java | 17 +-- .../cassandra/tcm/membership/Directory.java | 26 ++++ .../migration/CMSInitializationRequest.java | 10 ++ .../tcm/ownership/DataPlacements.java | 6 + .../tcm/transformations/cms/Initialize.java | 4 +- .../transformations/cms/PreInitialize.java | 41 +++---- .../ClusterMetadataSingleNodeUpgradeTest.java | 12 +- ...etadataUpgradeInconsistentPeersV2Test.java | 114 ++++++++++++++++++ .../upgrade/ClusterMetadataUpgradeTest.java | 10 +- .../org/apache/cassandra/ServerTestUtils.java | 8 +- .../schema/DistributedSchemaTest.java | 5 +- .../tcm/log/DistributedLogStateTest.java | 7 +- .../tcm/log/LogListenerNotificationTest.java | 2 +- .../cassandra/utils/CassandraGenerators.java | 2 +- 23 files changed, 365 insertions(+), 127 deletions(-) create mode 100644 test/distributed/org/apache/cassandra/distributed/upgrade/ClusterMetadataUpgradeInconsistentPeersV2Test.java diff --git a/src/java/org/apache/cassandra/schema/DistributedMetadataLogKeyspace.java b/src/java/org/apache/cassandra/schema/DistributedMetadataLogKeyspace.java index 02c710ae49fc..ce8ebd5e10c5 100644 --- a/src/java/org/apache/cassandra/schema/DistributedMetadataLogKeyspace.java +++ b/src/java/org/apache/cassandra/schema/DistributedMetadataLogKeyspace.java @@ -46,6 +46,7 @@ import org.apache.cassandra.tcm.transformations.cms.PreInitialize; import org.apache.cassandra.utils.JVMStabilityInspector; +import static org.apache.cassandra.schema.SchemaConstants.METADATA_KEYSPACE_NAME; import static org.apache.cassandra.tcm.Epoch.FIRST; public final class DistributedMetadataLogKeyspace @@ -65,7 +66,7 @@ private DistributedMetadataLogKeyspace(){} */ public static final long GENERATION = 0; - public static final TableId LOG_TABLE_ID = TableId.unsafeDeterministic(SchemaConstants.METADATA_KEYSPACE_NAME, TABLE_NAME); + public static final TableId LOG_TABLE_ID = TableId.unsafeDeterministic(METADATA_KEYSPACE_NAME, TABLE_NAME); public static final String LOG_TABLE_CQL = "CREATE TABLE %s.%s (" + "epoch bigint," + "entry_id bigint," @@ -80,16 +81,16 @@ private DistributedMetadataLogKeyspace(){} "compaction_window_size","1"))) .build(); - public static boolean initialize() throws IOException + public static boolean insertPreInitialize(PreInitialize preInit) throws IOException { try { String init = String.format("INSERT INTO %s.%s (epoch, transformation, kind, entry_id) " + "VALUES(?, ?, ?, ?) " + - "IF NOT EXISTS", SchemaConstants.METADATA_KEYSPACE_NAME, TABLE_NAME); + "IF NOT EXISTS", METADATA_KEYSPACE_NAME, TABLE_NAME); UntypedResultSet result = QueryProcessor.execute(init, ConsistencyLevel.QUORUM, FIRST.getEpoch(), - Transformation.Kind.PRE_INITIALIZE_CMS.toVersionedBytes(PreInitialize.blank()), + Transformation.Kind.PRE_INITIALIZE_CMS.toVersionedBytes(preInit), Transformation.Kind.PRE_INITIALIZE_CMS.id, Entry.Id.NONE.entryId); @@ -124,21 +125,25 @@ public static boolean tryCommit(Entry.Id entryId, { try { - if (previousEpoch.is(FIRST) && !initialize()) + // log is not initialized yet this is unexpected + if (previousEpoch.isBefore(FIRST)) + { + logger.warn("Previous epoch {} indicates that the {} has not been initialized yet, " + + "not committing entry {}/{} at epoch {}", + previousEpoch, METADATA_KEYSPACE_NAME, entryId, transform, nextEpoch); return false; + } - // TODO get lowest supported metadata version from ClusterMetadata - ByteBuffer serializedEvent = transform.kind().toVersionedBytes(transform); - + ByteBuffer serializedTransform = transform.kind().toVersionedBytes(transform); String query = String.format("INSERT INTO %s.%s (epoch, entry_id, transformation, kind) " + "VALUES (?, ?, ?, ?) " + "IF NOT EXISTS;", - SchemaConstants.METADATA_KEYSPACE_NAME, TABLE_NAME); + METADATA_KEYSPACE_NAME, TABLE_NAME); UntypedResultSet result = QueryProcessor.execute(query, ConsistencyLevel.QUORUM, nextEpoch.getEpoch(), entryId.entryId, - serializedEvent, + serializedTransform, transform.kind().id); return result.one().getBoolean("[applied]"); @@ -186,7 +191,7 @@ public EntryHolder getEntries(Epoch since) throws IOException // note that we want all entries with epoch >= since - but since we use a reverse partitioner, we actually // want all entries where the token is less than token(since) UntypedResultSet resultSet = execute(String.format("SELECT epoch, kind, transformation, entry_id FROM %s.%s WHERE token(epoch) <= token(?)", - SchemaConstants.METADATA_KEYSPACE_NAME, TABLE_NAME), + METADATA_KEYSPACE_NAME, TABLE_NAME), consistencyLevel, since.getEpoch()); EntryHolder entryHolder = new EntryHolder(since); for (UntypedResultSet.Row row : resultSet) @@ -237,7 +242,7 @@ private static UntypedResultSet execute(String query, ConsistencyLevel cl, Objec private static TableMetadata.Builder parse(String cql, String table, String description) { - return CreateTableStatement.parse(String.format(cql, SchemaConstants.METADATA_KEYSPACE_NAME, table), SchemaConstants.METADATA_KEYSPACE_NAME) + return CreateTableStatement.parse(String.format(cql, METADATA_KEYSPACE_NAME, table), METADATA_KEYSPACE_NAME) .id(LOG_TABLE_ID) .epoch(FIRST) .comment(description); @@ -245,7 +250,7 @@ private static TableMetadata.Builder parse(String cql, String table, String desc public static KeyspaceMetadata initialMetadata(Set knownDatacenters) { - return KeyspaceMetadata.create(SchemaConstants.METADATA_KEYSPACE_NAME, new KeyspaceParams(true, ReplicationParams.simpleMeta(1, knownDatacenters), FastPathStrategy.simple()), Tables.of(Log)); + return KeyspaceMetadata.create(METADATA_KEYSPACE_NAME, new KeyspaceParams(true, ReplicationParams.simpleMeta(1, knownDatacenters), FastPathStrategy.simple()), Tables.of(Log)); } public static KeyspaceMetadata initialMetadata(String datacenter) diff --git a/src/java/org/apache/cassandra/schema/DistributedSchema.java b/src/java/org/apache/cassandra/schema/DistributedSchema.java index 763632fd6650..59859d75788e 100644 --- a/src/java/org/apache/cassandra/schema/DistributedSchema.java +++ b/src/java/org/apache/cassandra/schema/DistributedSchema.java @@ -21,14 +21,13 @@ import java.io.IOException; import java.util.ArrayList; import java.util.Collection; -import java.util.Collections; import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.Objects; import java.util.Optional; -import java.util.Set; import java.util.UUID; +import java.util.stream.Collectors; import com.google.common.base.Preconditions; import com.google.common.collect.ImmutableList; @@ -65,25 +64,6 @@ public static DistributedSchema empty() return new DistributedSchema(Keyspaces.none(), Epoch.EMPTY); } - public static DistributedSchema first(Set knownDatacenters) - { - // During upgrades from pre-6.0 versions, the replication params of the system_cluster_metadata - // keyspace using one of the existing DCs. This is so that this keyspace does not cause issues - // for tooling, clients or control plane systems which may inspect schema and have specific - // expectations about DC layout. This keyspace is unused until the CMS is initialized. - // For new clusters which start out on 6.0 or later, this is not necessary to the initial - // replication params use a empty string for the placeholder DC name. - - // During CMS initialization, the replication of this keyspace will be set for real using - // the DC of the first node to become a CMS member. This happens in the PreInitialize - // transformation when executed on the first CMS member. - - if (knownDatacenters.isEmpty()) - knownDatacenters = Collections.singleton(""); - - return new DistributedSchema(Keyspaces.of(DistributedMetadataLogKeyspace.initialMetadata(knownDatacenters)), Epoch.FIRST); - } - private static ImmutableMap keyspacesToTableMap(Keyspaces keyspaces) { ImmutableMap.Builder builder = ImmutableMap.builder(); @@ -159,28 +139,23 @@ public boolean hasAccordKeyspaces() * @deprecated since TCM, used on upgrade from gossip to populate system schema tables with the correct generation */ @Deprecated(since = "TCM") - public static List> distributedKeyspacesWithGeneration(Set knownDatacenters) + public static List> distributedKeyspacesWithGeneration() { - return ImmutableList.of(Pair.create(DistributedMetadataLogKeyspace.initialMetadata(knownDatacenters), DistributedMetadataLogKeyspace.GENERATION), - Pair.create(TraceKeyspace.metadata(), TraceKeyspace.GENERATION), + return ImmutableList.of(Pair.create(TraceKeyspace.metadata(), TraceKeyspace.GENERATION), Pair.create(SystemDistributedKeyspace.metadata(), SystemDistributedKeyspace.GENERATION), Pair.create(AuthKeyspace.metadata(),AuthKeyspace.GENERATION)); } - public static DistributedSchema fromSystemTables(Keyspaces keyspaces, Set knownDatacenters) + public static DistributedSchema fromSystemTables(Keyspaces keyspaces) { - if (!keyspaces.containsKeyspace(SchemaConstants.METADATA_KEYSPACE_NAME)) - { - Keyspaces kss = Keyspaces.none(); - for (Pair ksmGen : distributedKeyspacesWithGeneration(knownDatacenters)) - kss = kss.with(ksmGen.left); - for (KeyspaceMetadata ksm : keyspaces) // on disk keyspaces - kss = kss.withAddedOrUpdated(kss.get(ksm.name) - .map(k -> merged(ksm, k)) - .orElse(ksm)); - keyspaces = kss; - } - return new DistributedSchema(keyspaces, Epoch.UPGRADE_GOSSIP); + Keyspaces kss = Keyspaces.none(); + for (Pair ksmGen : distributedKeyspacesWithGeneration()) + kss = kss.with(ksmGen.left); + for (KeyspaceMetadata ksm : keyspaces) // on disk keyspaces + kss = kss.withAddedOrUpdated(kss.get(ksm.name) + .map(k -> merged(ksm, k)) + .orElse(ksm)); + return new DistributedSchema(kss, Epoch.UPGRADE_GOSSIP); } /** @@ -268,7 +243,8 @@ public void initializeKeyspaceInstances(DistributedSchema prev, boolean loadSSTa } }); - // Avoid system table side effects during initialization + // Avoid system table side effects before initialization, otherwise mismatching schema can block CMS + // progress as nodes report disagreement. Also, tooling should not mutate system tables. if (epoch.isEqualOrAfter(Epoch.FIRST) && !DatabaseDescriptor.isClientOrToolInitialized()) { Collection mutations = SchemaKeyspace.convertSchemaDiffToMutations(ksDiff, FBUtilities.timestampMicros()); @@ -481,6 +457,17 @@ private void validate() }); } + public String conciseToString() + { + return keyspaces.stream() + .map(ksm -> ksm.name + ": {" + + ksm.tables.stream() + .map(t -> t.name) + .collect(Collectors.joining(",")) + + '}') + .collect(Collectors.joining(",")); + } + public static class Serializer implements MetadataSerializer { public void serialize(DistributedSchema t, DataOutputPlus out, Version version) throws IOException diff --git a/src/java/org/apache/cassandra/service/StorageService.java b/src/java/org/apache/cassandra/service/StorageService.java index 0b461093348d..2ad2a5bf278d 100644 --- a/src/java/org/apache/cassandra/service/StorageService.java +++ b/src/java/org/apache/cassandra/service/StorageService.java @@ -2151,9 +2151,17 @@ private EndpointsByRange constructRangeToEndpointMap(String keyspace, List range : ranges) - rangeToEndpointMap.put(range, strategy.calculateNaturalReplicas(range.right, metadata)); + Keyspace ks = Keyspace.openIfExists(keyspace); + if (ks != null) + { + AbstractReplicationStrategy strategy = ks.getReplicationStrategy(); + for (Range range : ranges) + rangeToEndpointMap.put(range, strategy.calculateNaturalReplicas(range.right, metadata)); + } + else + { + throw new IllegalArgumentException("Unknown keyspace " + keyspace); + } } return new EndpointsByRange(rangeToEndpointMap); diff --git a/src/java/org/apache/cassandra/tcm/AbstractLocalProcessor.java b/src/java/org/apache/cassandra/tcm/AbstractLocalProcessor.java index 0f42bae46261..a245377f9171 100644 --- a/src/java/org/apache/cassandra/tcm/AbstractLocalProcessor.java +++ b/src/java/org/apache/cassandra/tcm/AbstractLocalProcessor.java @@ -196,4 +196,4 @@ private LogState toLogState(Epoch lastKnown) public abstract ClusterMetadata fetchLogAndWait(Epoch waitFor, Retry retryPolicy); protected abstract boolean tryCommitOne(Entry.Id entryId, Transformation transform, Epoch previousEpoch, Epoch nextEpoch); -} +} \ No newline at end of file diff --git a/src/java/org/apache/cassandra/tcm/CMSOperations.java b/src/java/org/apache/cassandra/tcm/CMSOperations.java index aa49a8ac3bf8..d2b18ce36122 100644 --- a/src/java/org/apache/cassandra/tcm/CMSOperations.java +++ b/src/java/org/apache/cassandra/tcm/CMSOperations.java @@ -154,14 +154,14 @@ public Map describeCMS() ClusterMetadata metadata = ClusterMetadata.current(); String members = metadata.fullCMSMembers().stream().sorted().map(Object::toString).collect(Collectors.joining(",")); info.put(MEMBERS, members); - info.put(NEEDS_RECONFIGURATION, Boolean.toString(needsReconfiguration(metadata))); + info.put(NEEDS_RECONFIGURATION, Boolean.toString(metadata.epoch.isBefore(Epoch.FIRST) || needsReconfiguration(metadata))); info.put(IS_MEMBER, Boolean.toString(cms.isCurrentMember(FBUtilities.getBroadcastAddressAndPort()))); info.put(SERVICE_STATE, ClusterMetadataService.state(metadata).toString()); info.put(IS_MIGRATING, Boolean.toString(cms.isMigrating())); info.put(EPOCH, Long.toString(metadata.epoch.getEpoch())); info.put(LOCAL_PENDING, Integer.toString(cms.log().pendingBufferSize())); info.put(COMMITS_PAUSED, Boolean.toString(cms.commitsPaused())); - info.put(REPLICATION_FACTOR, ReplicationParams.meta(metadata).toString()); + info.put(REPLICATION_FACTOR, metadata.epoch.isBefore(Epoch.FIRST) ? "" : ReplicationParams.meta(metadata).toString()); info.put(CMS_ID, Integer.toString(metadata.metadataIdentifier)); return info; } diff --git a/src/java/org/apache/cassandra/tcm/ClusterMetadata.java b/src/java/org/apache/cassandra/tcm/ClusterMetadata.java index bb4f31e71cdf..40dcb7fb4c41 100644 --- a/src/java/org/apache/cassandra/tcm/ClusterMetadata.java +++ b/src/java/org/apache/cassandra/tcm/ClusterMetadata.java @@ -21,6 +21,7 @@ import java.io.IOException; import java.util.ArrayList; import java.util.Collection; +import java.util.Collections; import java.util.HashMap; import java.util.HashSet; import java.util.Iterator; @@ -131,7 +132,7 @@ public ClusterMetadata(IPartitioner partitioner) @VisibleForTesting public ClusterMetadata(IPartitioner partitioner, Directory directory) { - this(partitioner, directory, DistributedSchema.first(directory.knownDatacenters())); + this(partitioner, directory, DistributedSchema.empty()); } @VisibleForTesting @@ -216,6 +217,9 @@ private ClusterMetadata(int metadataIdentifier, public Set fullCMSMembers() { + if (epoch.isBefore(Epoch.FIRST)) + return Collections.emptySet(); + if (fullCMSEndpoints == null) this.fullCMSEndpoints = ImmutableSet.copyOf(placements.get(ReplicationParams.meta(this)).reads.byEndpoint().keySet()); return fullCMSEndpoints; @@ -223,6 +227,9 @@ public Set fullCMSMembers() public Set fullCMSMemberIds() { + if (epoch.isBefore(Epoch.FIRST)) + return Collections.emptySet(); + if (fullCMSIds == null) this.fullCMSIds = placements.get(ReplicationParams.meta(this)).reads.byEndpoint().keySet().stream().map(directory::peerId).collect(toImmutableSet()); return fullCMSIds; @@ -230,6 +237,9 @@ public Set fullCMSMemberIds() public EndpointsForRange fullCMSMembersAsReplicas() { + if (epoch.isBefore(Epoch.FIRST)) + return EndpointsForRange.empty(MetaStrategy.entireRange); + if (fullCMSReplicas == null) fullCMSReplicas = placements.get(ReplicationParams.meta(this)).reads.forRange(MetaStrategy.entireRange).get(); return fullCMSReplicas; @@ -946,6 +956,15 @@ public String toString() '}'; } + public String conciseToString() + { + return "ClusterMetadata{" + "epoch=" + epoch + + ", schema=" + schema.conciseToString() + + ", directory=" + directory.conciseToString(tokenMap.asMap()) + + ", placements=" + placements.conciseToString() + + '}'; + } + @Override public boolean equals(Object o) { diff --git a/src/java/org/apache/cassandra/tcm/ClusterMetadataService.java b/src/java/org/apache/cassandra/tcm/ClusterMetadataService.java index 9c2c4ca0bfe0..6caea0001fc1 100644 --- a/src/java/org/apache/cassandra/tcm/ClusterMetadataService.java +++ b/src/java/org/apache/cassandra/tcm/ClusterMetadataService.java @@ -29,6 +29,7 @@ import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; import java.util.concurrent.atomic.AtomicBoolean; +import java.util.function.Consumer; import java.util.function.Function; import java.util.function.Supplier; @@ -52,6 +53,7 @@ import org.apache.cassandra.net.IVerbHandler; import org.apache.cassandra.net.Message; import org.apache.cassandra.net.MessageDelivery; +import org.apache.cassandra.schema.DistributedMetadataLogKeyspace; import org.apache.cassandra.schema.DistributedSchema; import org.apache.cassandra.schema.Keyspaces; import org.apache.cassandra.schema.ReplicationParams; @@ -79,6 +81,7 @@ import org.apache.cassandra.tcm.serialization.Version; import org.apache.cassandra.tcm.transformations.ForceSnapshot; import org.apache.cassandra.tcm.transformations.TriggerSnapshot; +import org.apache.cassandra.tcm.transformations.cms.PreInitialize; import org.apache.cassandra.tcm.transformations.cms.PrepareCMSReconfiguration; import org.apache.cassandra.utils.FBUtilities; import org.apache.cassandra.utils.JVMStabilityInspector; @@ -138,6 +141,14 @@ public static ClusterMetadataService instance() private final LocalLog log; private final MetadataSnapshots snapshots; + /* + * Special callback for execution when the PreInitialize transformation is used to bootstrap the cluster metadata + * log. In practice, this is only relevant when using PaxosBackedProcessor which needs to insert the PreInitialize + * entry into the distributed log table, but is unable to do so until it has been enacted, the keyspace created and + * the replication configured. + */ + private final Consumer logBootstrapCallback; + private final IVerbHandler replicationHandler; private final IVerbHandler logNotifyHandler; private final IVerbHandler fetchLogHandler; @@ -181,13 +192,17 @@ public static State state(ClusterMetadata metadata) if (CassandraRelevantProperties.TCM_USE_ATOMIC_LONG_PROCESSOR.getBoolean()) { log = logSpec.sync().withStorage(new AtomicLongBackedProcessor.InMemoryStorage()).createLog(); - localProcessor = wrapProcessor.apply(new AtomicLongBackedProcessor(log, logSpec.isReset())); + AtomicLongBackedProcessor processor = new AtomicLongBackedProcessor(log, logSpec.isReset()); + logBootstrapCallback = logBootstrapCallback(processor); + localProcessor = wrapProcessor.apply(processor); fetchLogHandler = new FetchCMSLog.Handler((e, ignored) -> logSpec.storage().getLogState(e)); } else { log = logSpec.async().createLog(); - localProcessor = wrapProcessor.apply(new PaxosBackedProcessor(log)); + PaxosBackedProcessor processor = new PaxosBackedProcessor(log); + logBootstrapCallback = logBootstrapCallback(processor); + localProcessor = wrapProcessor.apply(processor); fetchLogHandler = new FetchCMSLog.Handler(); } @@ -234,8 +249,12 @@ public ClusterMetadataService(PlacementProvider placementProvider, commitRequestHandler = isMemberOfOwnershipGroup ? new Commit.Handler(processor, replicator, () -> LOCAL) : null; peerLogFetcher = new PeerLogFetcher(log); + logBootstrapCallback = logBootstrapCallback(processor); } + /** + * Only called from initializeForTools + */ private ClusterMetadataService(PlacementProvider placementProvider, MetadataSnapshots snapshots, LocalLog log, @@ -257,6 +276,7 @@ private ClusterMetadataService(PlacementProvider placementProvider, this.fetchLogHandler = fetchLogHandler; this.commitRequestHandler = commitRequestHandler; this.peerLogFetcher = peerLogFetcher; + this.logBootstrapCallback = logBootstrapCallback(processor); } @SuppressWarnings("resource") @@ -265,8 +285,7 @@ public static void initializeForTools(boolean loadSSTables) if (instance != null) return; String localDC = DatabaseDescriptor.getLocalDataCenter(); - ClusterMetadata emptyFromSystemTables = emptyWithSchemaFromSystemTables(Collections.singleton(localDC)) - .forceEpoch(Epoch.EMPTY); + ClusterMetadata emptyFromSystemTables = emptyWithSchemaFromSystemTables().forceEpoch(Epoch.EMPTY); LocalLog.LogSpec logSpec = LocalLog.logSpec() .withInitialState(emptyFromSystemTables) @@ -294,7 +313,7 @@ public void notifyPostCommit(ClusterMetadata prev, ClusterMetadata next, boolean new PeerLogFetcher(log)); log.readyUnchecked(); - log.bootstrap(FBUtilities.getBroadcastAddressAndPort(), localDC); + log.bootstrap(FBUtilities.getBroadcastAddressAndPort(), localDC, (p) -> {}); ClusterMetadataService.setInstance(cms); } @@ -336,10 +355,43 @@ public static void empty(Keyspaces keyspaces) null); log.readyUnchecked(); - log.bootstrap(FBUtilities.getBroadcastAddressAndPort(), localDC); + log.bootstrap(FBUtilities.getBroadcastAddressAndPort(), localDC, (p) -> {}); ClusterMetadataService.setInstance(cms); } + /* + * Hook to be executed when the LocalLog is bootstrapped with the PreInitialize transformation. This is done on + * the first CMS member to set up the initial replication and data placements for the metadata keyspace. + */ + public Consumer logBootstrapCallback() + { + return logBootstrapCallback; + } + + private static Consumer logBootstrapCallback(Processor processor) + { + if (processor instanceof PaxosBackedProcessor) + { + // Insert an entry containing the PRE_INITIALIZE_CMS transform at Epoch.FIRST in the distributed + // log table. This can only be done after the log is bootstrapped as it depends on the effects of + // that transform on ClusterMetadata. + return preInit -> { + try + { + if (DistributedMetadataLogKeyspace.insertPreInitialize(preInit)) + logger.info("Successfully inserted pre-initialize entry into distributed metadata log"); + else + throw new IllegalStateException("Failed to insert pre-initialize entry into distributed metadata log. Check server for details"); + } + catch (IOException e) + { + throw new IllegalStateException("Unable to pre-initialize distributed metadata log table", e); + } + }; + } + // otherwise, this is a noop. + return preInit -> {}; + } @SuppressWarnings("resource") public static void initializeForClients() diff --git a/src/java/org/apache/cassandra/tcm/Startup.java b/src/java/org/apache/cassandra/tcm/Startup.java index 2bfe4b4cc948..175769798143 100644 --- a/src/java/org/apache/cassandra/tcm/Startup.java +++ b/src/java/org/apache/cassandra/tcm/Startup.java @@ -134,11 +134,14 @@ public static void initialize(Set seeds, } /** - * Make this node a _first_ CMS node. + * Make this node the _first_ CMS node. *

- * (1) Append PreInitialize transformation to local in-memory log. When distributed metadata keyspace is initialized, a no-op transformation will - * be added to other nodes. This is required since as of now, no node actually owns distributed metadata keyspace. - * (2) Commit Initialize transformation, which holds a snapshot of metadata as of now. + * (1) Append PreInitialize transformation to local in-memory log. + * (1a) Once this enacted and the distributed metadata keyspace is initialized, the PreInitialize transformation + * will be inserted into the log table. This is required since as before this point, the keyspace was not availble + * or configured with any replication or placements. + * (2) Commit Initialize transformation, which holds a complete snapshot of metadata as of now. + * Other nodes in the cluster, if there are any, will receive both of these log entries and enact them locally. *

* This process is applicable for gossip upgrades as well as regular vote-and-startup process. */ @@ -146,7 +149,8 @@ public static void initializeAsFirstCMSNode() { InetAddressAndPort addr = FBUtilities.getBroadcastAddressAndPort(); String datacenter = DatabaseDescriptor.getLocator().local().datacenter; - ClusterMetadataService.instance().log().bootstrap(addr, datacenter); + ClusterMetadataService cms = ClusterMetadataService.instance(); + cms.log().bootstrap(addr, datacenter, cms.logBootstrapCallback()); ClusterMetadata metadata = ClusterMetadata.current(); assert ClusterMetadataService.state() == LOCAL : String.format("Can't initialize as node hasn't transitioned to CMS state. State: %s.\n%s", ClusterMetadataService.state(), metadata); Initialize initialize = new Initialize(metadata.initializeClusterIdentifier(addr.hashCode())); @@ -262,9 +266,9 @@ public static void initializeForDiscovery(Runnable initMessaging) Election.instance.migrated(); } - private static void updateSystemSchemaTables(Set knownDatacenters) + private static void updateSystemSchemaTables() { - List> kss = DistributedSchema.distributedKeyspacesWithGeneration(knownDatacenters); + List> kss = DistributedSchema.distributedKeyspacesWithGeneration(); List mutations = new ArrayList<>(); for (Pair ksm : kss) { @@ -279,9 +283,8 @@ private static void updateSystemSchemaTables(Set knownDatacenters) */ public static void initializeFromGossip(Function wrapProcessor, Runnable initMessaging) throws StartupException { - Set knownDcs = SystemKeyspace.allKnownDatacenters(); - updateSystemSchemaTables(knownDcs); - ClusterMetadata emptyFromSystemTables = emptyWithSchemaFromSystemTables(knownDcs); + updateSystemSchemaTables(); + ClusterMetadata emptyFromSystemTables = emptyWithSchemaFromSystemTables(); LocalLog.LogSpec logSpec = LocalLog.logSpec() .withInitialState(emptyFromSystemTables) .afterReplay(Startup::scrubDataDirectories, @@ -332,7 +335,7 @@ public static void initializeFromGossip(Function wrapProce logger.debug("Got epStates {}", epStates); ClusterMetadata initial = fromEndpointStates(emptyFromSystemTables.schema, epStates); - logger.debug("Created initial ClusterMetadata {}", initial); + logger.info("Created initial ClusterMetadata {}", initial.conciseToString()); ClusterMetadataService.instance().setFromGossip(initial); Gossiper.instance.clearUnsafe(); diff --git a/src/java/org/apache/cassandra/tcm/compatibility/GossipHelper.java b/src/java/org/apache/cassandra/tcm/compatibility/GossipHelper.java index 4ba870837a34..965751c554a4 100644 --- a/src/java/org/apache/cassandra/tcm/compatibility/GossipHelper.java +++ b/src/java/org/apache/cassandra/tcm/compatibility/GossipHelper.java @@ -287,7 +287,7 @@ private static NodeVersion getVersionFromEndpointState(InetAddressAndPort endpoi return NodeVersion.fromCassandraVersion(cassandraVersion); } - public static ClusterMetadata emptyWithSchemaFromSystemTables(Set allKnownDatacenters) + public static ClusterMetadata emptyWithSchemaFromSystemTables() { // If this instance was previously upgraded then subsequently downgraded, the metadata keyspace may have been // added to system_schema tables. If so, don't include it in the initial schema as this will cause it to be @@ -297,7 +297,7 @@ public static ClusterMetadata emptyWithSchemaFromSystemTables(Set allKno .filter(k -> !k.name.equals(SchemaConstants.METADATA_KEYSPACE_NAME)); return new ClusterMetadata(Epoch.UPGRADE_STARTUP, DatabaseDescriptor.getPartitioner(), - DistributedSchema.fromSystemTables(keyspaces, allKnownDatacenters), + DistributedSchema.fromSystemTables(keyspaces), Directory.EMPTY, new TokenMap(DatabaseDescriptor.getPartitioner()), DataPlacements.empty(), diff --git a/src/java/org/apache/cassandra/tcm/log/LocalLog.java b/src/java/org/apache/cassandra/tcm/log/LocalLog.java index b3069eb029fb..cd6fb0912902 100644 --- a/src/java/org/apache/cassandra/tcm/log/LocalLog.java +++ b/src/java/org/apache/cassandra/tcm/log/LocalLog.java @@ -32,6 +32,7 @@ import java.util.concurrent.TimeoutException; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicReference; +import java.util.function.Consumer; import java.util.function.Predicate; import java.util.stream.Collectors; @@ -277,22 +278,24 @@ private LocalLog(LogSpec logSpec) @VisibleForTesting public void unsafeBootstrapForTesting(InetAddressAndPort addr) { - bootstrap(addr, ""); + bootstrap(addr, "", (p) -> {}); } /** - * - * @param addr - * @param datacenter + * Set up the initial state of the local ClusterMetadata, enacting the PreInitialize transform which sets up the + * distributed metadata log keyspace, its replication, and data placements. + * @param addr Address of the first CMS member, this should be the local node address + * @param datacenter DC of the first CMS member */ - public void bootstrap(InetAddressAndPort addr, String datacenter) + public void bootstrap(InetAddressAndPort addr, String datacenter, Consumer postBootstrap) { ClusterMetadata metadata = metadata(); assert metadata.epoch.isBefore(FIRST) : String.format("Metadata epoch %s should be before first", metadata.epoch); - Transformation transform = PreInitialize.withFirstCMS(addr, datacenter); + PreInitialize transform = PreInitialize.withFirstCMS(addr, datacenter); append(new Entry(Entry.Id.NONE, FIRST, transform)); metadata = waitForHighestConsecutive(); assert metadata.epoch.is(Epoch.FIRST) : String.format("Epoch: %s. CMS: %s", metadata.epoch, metadata.fullCMSMembers()); + postBootstrap.accept(transform); } public LogStorage storage() @@ -917,7 +920,7 @@ protected void addListeners() private LogListener snapshotListener() { return (entry, metadata) -> { - if (ClusterMetadataService.state() != ClusterMetadataService.State.LOCAL) + if (entry.epoch.isEqualOrBefore(Epoch.FIRST) || ClusterMetadataService.state() != ClusterMetadataService.State.LOCAL) return; if ((entry.epoch.getEpoch() % DatabaseDescriptor.getMetadataSnapshotFrequency()) == 0) diff --git a/src/java/org/apache/cassandra/tcm/membership/Directory.java b/src/java/org/apache/cassandra/tcm/membership/Directory.java index 55a5e7ac9687..f86481f1418a 100644 --- a/src/java/org/apache/cassandra/tcm/membership/Directory.java +++ b/src/java/org/apache/cassandra/tcm/membership/Directory.java @@ -42,6 +42,7 @@ import accord.utils.btree.BTreeSet; import org.apache.cassandra.db.TypeSizes; +import org.apache.cassandra.dht.Token; import org.apache.cassandra.io.util.DataInputPlus; import org.apache.cassandra.io.util.DataOutputPlus; import org.apache.cassandra.locator.InetAddressAndPort; @@ -50,6 +51,7 @@ import org.apache.cassandra.tcm.MetadataValue; import org.apache.cassandra.tcm.serialization.MetadataSerializer; import org.apache.cassandra.tcm.serialization.Version; +import org.apache.cassandra.utils.SortedBiMultiValMap; import org.apache.cassandra.utils.UUIDSerializer; import org.apache.cassandra.utils.btree.BTreeBiMap; import org.apache.cassandra.utils.btree.BTreeMap; @@ -524,6 +526,30 @@ public String toDebugString() .collect(Collectors.joining("\n")); } + /** + * nodeId(address): {state, tokens, ser ver, cass ver, dc:rack} + */ + public String conciseToString(SortedBiMultiValMap tokenMap) + { + return peers.entrySet().stream().map((peerEntry) -> { + StringBuilder sb = new StringBuilder(); + NodeId nodeId = peerEntry.getKey(); + sb.append(nodeId.id()).append('(').append(peerEntry.getValue()).append("): "); + sb.append('{'); + sb.append(states.get(nodeId).ordinal()); + if (tokenMap != null) + sb.append(',').append(tokenMap.inverse().get(nodeId)); + NodeVersion version = versions.get(nodeId); + if (version != null) + sb.append(',').append(version.serializationVersion).append(',').append(version.cassandraVersion); + Location loc = locations.get(nodeId); + if (loc != null) + sb.append(',').append(loc.datacenter).append(':').append(loc.rack); + sb.append('}'); + return sb.toString(); + }).collect(Collectors.joining(",")); + } + private static class Node { public static final Serializer serializer = new Serializer(); diff --git a/src/java/org/apache/cassandra/tcm/migration/CMSInitializationRequest.java b/src/java/org/apache/cassandra/tcm/migration/CMSInitializationRequest.java index 599bfca0da7d..a76d35ca70e3 100644 --- a/src/java/org/apache/cassandra/tcm/migration/CMSInitializationRequest.java +++ b/src/java/org/apache/cassandra/tcm/migration/CMSInitializationRequest.java @@ -68,6 +68,16 @@ public CMSInitializationRequest(Initiator initiator, Directory directory, TokenM this.schemaVersion = schemaVersion; } + @Override + public String toString() + { + return "CMSInitializationRequest{" + + "initiator=" + initiator + + ", directory=" + directory.conciseToString(tokenMap.asMap()) + + ", schemaVersion=" + schemaVersion + + '}'; + } + public static class Serializer implements IVersionedSerializer { private final Version serializationVersion; diff --git a/src/java/org/apache/cassandra/tcm/ownership/DataPlacements.java b/src/java/org/apache/cassandra/tcm/ownership/DataPlacements.java index a4dc95eef47c..d813e8c28779 100644 --- a/src/java/org/apache/cassandra/tcm/ownership/DataPlacements.java +++ b/src/java/org/apache/cassandra/tcm/ownership/DataPlacements.java @@ -24,6 +24,7 @@ import java.util.Map; import java.util.Objects; import java.util.function.BiConsumer; +import java.util.stream.Collectors; import com.google.common.collect.ImmutableMap; import com.google.common.collect.Maps; @@ -219,6 +220,11 @@ public Builder unbuild() return new Builder(new HashMap<>(this.asMap())); } + public String conciseToString() + { + return keys().stream().map(ReplicationParams::toString).collect(Collectors.joining(",")); + } + public static class Builder { private final Map map; diff --git a/src/java/org/apache/cassandra/tcm/transformations/cms/Initialize.java b/src/java/org/apache/cassandra/tcm/transformations/cms/Initialize.java index 11c2ed4d31af..9fabef54d66b 100644 --- a/src/java/org/apache/cassandra/tcm/transformations/cms/Initialize.java +++ b/src/java/org/apache/cassandra/tcm/transformations/cms/Initialize.java @@ -23,12 +23,12 @@ import org.apache.cassandra.auth.AuthKeyspace; import org.apache.cassandra.io.util.DataInputPlus; import org.apache.cassandra.io.util.DataOutputPlus; -import org.apache.cassandra.locator.MetaStrategy; import org.apache.cassandra.schema.DistributedSchema; import org.apache.cassandra.schema.Keyspaces; import org.apache.cassandra.schema.SystemDistributedKeyspace; import org.apache.cassandra.tcm.ClusterMetadata; import org.apache.cassandra.tcm.Transformation; +import org.apache.cassandra.tcm.sequences.LockedRanges; import org.apache.cassandra.tcm.serialization.AsymmetricMetadataSerializer; import org.apache.cassandra.tcm.serialization.Version; import org.apache.cassandra.tcm.transformations.ForceSnapshot; @@ -75,7 +75,7 @@ public Result execute(ClusterMetadata prev) ? setUpDistributedSystemKeyspaces(next) : next.schema.getKeyspaces()); ClusterMetadata.Transformer transformer = next.transformer().with(initialSchema); - return Transformation.success(transformer, MetaStrategy.affectedRanges(prev)); + return Transformation.success(transformer, LockedRanges.AffectedRanges.EMPTY); } public Keyspaces setUpDistributedSystemKeyspaces(ClusterMetadata next) diff --git a/src/java/org/apache/cassandra/tcm/transformations/cms/PreInitialize.java b/src/java/org/apache/cassandra/tcm/transformations/cms/PreInitialize.java index e07d1f1a2989..7286743f91d0 100644 --- a/src/java/org/apache/cassandra/tcm/transformations/cms/PreInitialize.java +++ b/src/java/org/apache/cassandra/tcm/transformations/cms/PreInitialize.java @@ -52,11 +52,6 @@ private PreInitialize(InetAddressAndPort addr, String datacenter) this.datacenter = datacenter; } - public static PreInitialize forTesting() - { - return new PreInitialize(null, null); - } - public static PreInitialize blank() { return new PreInitialize(null, null); @@ -78,23 +73,17 @@ public Result execute(ClusterMetadata metadata) ClusterMetadata.Transformer transformer = metadata.transformer(); + // This null check is a leftover from previous implementations. In earlier versions the address and datacenter + // were not be included in the serialized form of this transform and so were not written to the local or + // distributed logs nor included in the log entries sent over the wire between peers. + // The null check remains to handle log entries written in that legacy format. The log entry which immediately + // follows PRE_INITIALIZE_CMS must contain an INITIALIZE_CMS transform, which will necessarily include the + // distributed metadata keyspace definition with the replication settings bootstrapped by PRE_INITIALIZE. This + // full ClusterMetadata becomes the starting point upon which further log entries are applied. The ultimate + // effect of this sequence is that once INITIALIZE_CMS has been committed to the log, the actual content of + // PRE_INITIALIZE_CMS becomes irrelevant. if (addr != null) { - // If addr != null, then this is being executed on the peer which is actually initializing the log - // for the very first time. - - // addr and datacenter are only used to bootstrap the replication of the distributed metatada - // keyspace on the first CMS node. They are never serialized into the distributed metadata log or - // passed to any other peer. - // - // PRE_INITIALIZE_CMS @ Epoch.FIRST, must be followed in the log by INITIALIZE_CMS @ (Epoch.FIRST + 1). - // The serialization of INITIALIZE_CMS includes the full ClusterMetadata at that point, which is - // obviously minimal, but will necessarily include the distributed metadata keyspace definition with - // the replication settings bootstrapped by PRE_INITIALIZE. This full ClusterMetadata becomes the - // starting point upon which further log entries are applied. So this means that once INITIALIZE_CMS - // has been committed to the log, the actual content of PRE_INITIALIZE_CMS is irrelevant, even on - // the first CMS node if it happens to replay it from its local storage after a restart. - DataPlacement.Builder dataPlacementBuilder = DataPlacement.builder(); Replica replica = new Replica(addr, MetaStrategy.partitioner.getMinimumToken(), @@ -107,11 +96,11 @@ public Result execute(ClusterMetadata metadata) dataPlacementBuilder.build()).build(); transformer.with(initialPlacement); - // re-initialise the schema distributed metadata keyspace so it gets the - // correct replication settings based on the DC of the initial CMS node + // create the distributed metadata keyspace in schema with replication settings based on the DC of the + // initial CMS node Keyspaces updated = metadata.schema.getKeyspaces() .withAddedOrReplaced(DistributedMetadataLogKeyspace.initialMetadata(datacenter)); - transformer.with(new DistributedSchema(updated)); + transformer.with(new DistributedSchema(updated, Epoch.FIRST)); } ClusterMetadata.Transformer.Transformed transformed = transformer.build(); @@ -121,9 +110,13 @@ public Result execute(ClusterMetadata metadata) return new Success(metadata, LockedRanges.AffectedRanges.EMPTY, transformed.modifiedKeys); } + @Override public String toString() { - return "PreInitialize"; + return "PreInitialize{" + + "addr=" + addr + + ", datacenter='" + datacenter + '\'' + + '}'; } public static class Serializer implements AsymmetricMetadataSerializer diff --git a/test/distributed/org/apache/cassandra/distributed/upgrade/ClusterMetadataSingleNodeUpgradeTest.java b/test/distributed/org/apache/cassandra/distributed/upgrade/ClusterMetadataSingleNodeUpgradeTest.java index 24cf020a7325..3d6c3cfd02c5 100644 --- a/test/distributed/org/apache/cassandra/distributed/upgrade/ClusterMetadataSingleNodeUpgradeTest.java +++ b/test/distributed/org/apache/cassandra/distributed/upgrade/ClusterMetadataSingleNodeUpgradeTest.java @@ -26,6 +26,7 @@ import org.apache.cassandra.service.StorageService; import static org.junit.Assert.assertTrue; +import static org.junit.Assert.fail; /** * @see org.apache.cassandra.tools.nodetool.CMSAdmin.InitializeCMS @@ -46,8 +47,17 @@ public void testSingleNodeUpgrade() throws Throwable cluster.schemaChange("CREATE TABLE " + KEYSPACE + ".tbl (pk int, ck int, v int, PRIMARY KEY (pk, ck))"); }) .runAfterClusterUpgrade((cluster) -> { - assertTrue(((IInvokableInstance)cluster.get(1)).callOnInstance(() -> StorageService.instance.getRangeToAddressMap("system_cluster_metadata").isEmpty())); + try + { + ((IInvokableInstance) cluster.get(1)).callOnInstance(() -> StorageService.instance.getRangeToAddressMap("system_cluster_metadata").isEmpty()); + fail(); + } + catch (Exception e) + { + assertTrue(e.getMessage().contains("Unknown keyspace system_cluster_metadata")); + } cluster.get(1).nodetoolResult("cms", "initialize").asserts().success(); + ((IInvokableInstance) cluster.get(1)).callOnInstance(() -> StorageService.instance.getRangeToAddressMap("system_cluster_metadata").isEmpty()); // make sure we can execute transformations: cluster.schemaChange(withKeyspace("ALTER TABLE %s.tbl with comment = 'hello123'")); }).run(); diff --git a/test/distributed/org/apache/cassandra/distributed/upgrade/ClusterMetadataUpgradeInconsistentPeersV2Test.java b/test/distributed/org/apache/cassandra/distributed/upgrade/ClusterMetadataUpgradeInconsistentPeersV2Test.java new file mode 100644 index 000000000000..9ced62767e30 --- /dev/null +++ b/test/distributed/org/apache/cassandra/distributed/upgrade/ClusterMetadataUpgradeInconsistentPeersV2Test.java @@ -0,0 +1,114 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.cassandra.distributed.upgrade; + +import java.util.Map; + +import org.junit.Test; + +import org.apache.cassandra.distributed.Constants; +import org.apache.cassandra.distributed.UpgradeableCluster; +import org.apache.cassandra.distributed.api.Feature; +import org.apache.cassandra.distributed.api.IInvokableInstance; +import org.apache.cassandra.distributed.api.IUpgradeableInstance; +import org.apache.cassandra.distributed.api.SimpleQueryResult; +import org.apache.cassandra.distributed.shared.NetworkTopology; +import org.apache.cassandra.locator.MetaStrategy; +import org.apache.cassandra.locator.NetworkTopologyStrategy; +import org.apache.cassandra.schema.ReplicationParams; +import org.apache.cassandra.tcm.ClusterMetadata; + +import static java.lang.String.format; +import static org.apache.cassandra.schema.SchemaConstants.METADATA_KEYSPACE_NAME; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertTrue; + +public class ClusterMetadataUpgradeInconsistentPeersV2Test extends UpgradeTestBase +{ + @Test + public void upgradeWithInconsistentSystemPeersV2Test() throws Throwable + { + new TestCase() + .nodes(3) + .withNodeIdTopology(NetworkTopology.networkTopology(3, (i) -> NetworkTopology.dcAndRack("datacenter0" + i % 2, "rack0" + i))) + .nodesToUpgrade(1, 2, 3) + .withConfig((cfg) -> cfg.with(Feature.NETWORK, Feature.GOSSIP) + .set(Constants.KEY_DTEST_FULL_STARTUP, true)) + .upgradesToCurrentFrom(v50) + .setup((cluster) -> { + // insert mismatching entries into system.peers_v2 table + for (int i = 1; i <= 3; i++) + { + cluster.get(i).executeInternal(format("insert into system.peers_v2 (peer, peer_port, data_center) " + + "values ('10.10.10.10', 7000, 'a%s')", i)); + cluster.get(i).flush("system"); + } + }) + .runAfterClusterUpgrade((cluster) -> { + // The system_cluster_metadata keyspace shouldn't be created until the CMS is intialized + assertNoMetaKeyspace(cluster); + + cluster.get(3).nodetoolResult("cms", "initialize").asserts().success(); + + // post-initialization, the replication params for system_cluster_metadata should be RF 1 in the DC of the + // first CMS member - node3 / datacenter01 + Map actualReplication = Map.of("class", MetaStrategy.class.getName(), "datacenter01", "1"); + Map fromSystemTable = Map.of("class", NetworkTopologyStrategy.class.getName(), "datacenter01", "1"); + assertReplicationParams(cluster, actualReplication, fromSystemTable); + }).run(); + } + + private static void assertNoMetaKeyspace(UpgradeableCluster cluster) + { + for (IUpgradeableInstance inst : cluster) + { + IInvokableInstance i = (IInvokableInstance) inst; + boolean found = i.callOnInstance(() -> ClusterMetadata.current().schema.getKeyspaces().containsKeyspace(METADATA_KEYSPACE_NAME)); + assertFalse("Metadata keyspace present on node" + i + " when it should not be", found); + + SimpleQueryResult res = inst.executeInternalWithResult("select replication from system_schema.keyspaces " + + "where keyspace_name = ?", + METADATA_KEYSPACE_NAME); + assertFalse(res.hasNext()); + } + } + + private static void assertReplicationParams(UpgradeableCluster cluster, + Map expectedActual, + Map expectedInSystemTable) + { + for (IUpgradeableInstance inst : cluster) + { + IInvokableInstance i = (IInvokableInstance) inst; + Map rs = i.callOnInstance(() -> { + ReplicationParams r = ClusterMetadata.current().schema.getKeyspaceMetadata("system_cluster_metadata").params.replication; + return r.asMap(); + }); + assertEquals(rs, expectedActual); + + SimpleQueryResult res = inst.executeInternalWithResult("select replication from system_schema.keyspaces " + + "where keyspace_name = ?", + METADATA_KEYSPACE_NAME); + assertTrue(res.hasNext()); + Map replication = res.next().get("replication"); + assertEquals(replication, expectedInSystemTable); + } + } +} diff --git a/test/distributed/org/apache/cassandra/distributed/upgrade/ClusterMetadataUpgradeTest.java b/test/distributed/org/apache/cassandra/distributed/upgrade/ClusterMetadataUpgradeTest.java index c7e33b822837..7f590379afad 100644 --- a/test/distributed/org/apache/cassandra/distributed/upgrade/ClusterMetadataUpgradeTest.java +++ b/test/distributed/org/apache/cassandra/distributed/upgrade/ClusterMetadataUpgradeTest.java @@ -56,9 +56,9 @@ public void simpleUpgradeTest() throws Throwable }) .runAfterClusterUpgrade((cluster) -> { Object [][] r = cluster.get(1).executeInternal("select keyspace_name from system_schema.keyspaces where keyspace_name='system_cluster_metadata'"); - assertEquals(1, r.length); + assertEquals(0, r.length); r = cluster.get(1).executeInternal("select table_name from system_schema.tables where keyspace_name='system_cluster_metadata' and table_name='distributed_metadata_log'"); - assertEquals(1, r.length); + assertEquals(0, r.length); cluster.get(1).nodetoolResult("cms","initialize").asserts().success(); cluster.forEach(i -> @@ -67,6 +67,12 @@ public void simpleUpgradeTest() throws Throwable assertFalse("node " + i.config().num() + " is still in MIGRATING STATE", ClusterUtils.isMigrating((IInvokableInstance) i)); }); + + r = cluster.get(1).executeInternal("select keyspace_name from system_schema.keyspaces where keyspace_name='system_cluster_metadata'"); + assertEquals(1, r.length); + r = cluster.get(1).executeInternal("select table_name from system_schema.tables where keyspace_name='system_cluster_metadata' and table_name='distributed_metadata_log'"); + assertEquals(1, r.length); + cluster.get(2).nodetoolResult("cms", "reconfigure", "3").asserts().success(); cluster.schemaChange(withKeyspace("create table %s.xyz (id int primary key)")); cluster.forEach(i -> { diff --git a/test/unit/org/apache/cassandra/ServerTestUtils.java b/test/unit/org/apache/cassandra/ServerTestUtils.java index ebc5bf1df57e..ea6c53003c3c 100644 --- a/test/unit/org/apache/cassandra/ServerTestUtils.java +++ b/test/unit/org/apache/cassandra/ServerTestUtils.java @@ -274,13 +274,7 @@ public static EmbeddedCassandraService startEmbeddedCassandraService() throws IO public static void initCMS() { - // Effectively disable automatic snapshots using AtomicLongBackedProcessor and LocaLLog.Sync interacts - // badly with submitting SealPeriod transformations from the log listener. In this configuration, SealPeriod - // commits performed on NonPeriodicTasks threads end up actually performing the transformations as well as - // calling the pre and post commit listeners, which is not threadsafe. In a non-test setup the processing of - // log entries is always done by the dedicated log follower thread. DatabaseDescriptor.setMetadataSnapshotFrequency(Integer.MAX_VALUE); - IPartitioner partitioner = DatabaseDescriptor.getPartitioner(); Location location = DatabaseDescriptor.getLocator().local(); boolean addListeners = true; @@ -304,7 +298,7 @@ public static void initCMS() ClusterMetadataService.setInstance(service); log.readyUnchecked(); - log.bootstrap(FBUtilities.getBroadcastAddressAndPort(), location.datacenter); + log.bootstrap(FBUtilities.getBroadcastAddressAndPort(), location.datacenter, (p) -> {}); service.commit(new Initialize(ClusterMetadata.current())); QueryProcessor.registerStatementInvalidatingListener(); service.mark(); diff --git a/test/unit/org/apache/cassandra/schema/DistributedSchemaTest.java b/test/unit/org/apache/cassandra/schema/DistributedSchemaTest.java index 4b74f5c38b6e..f5abb312f052 100644 --- a/test/unit/org/apache/cassandra/schema/DistributedSchemaTest.java +++ b/test/unit/org/apache/cassandra/schema/DistributedSchemaTest.java @@ -19,14 +19,12 @@ package org.apache.cassandra.schema; import java.util.HashMap; -import java.util.Set; import org.junit.BeforeClass; import org.junit.Test; import org.apache.cassandra.ServerTestUtils; import org.apache.cassandra.auth.AuthKeyspace; -import org.apache.cassandra.config.DatabaseDescriptor; import org.apache.cassandra.db.marshal.Int32Type; import org.apache.cassandra.service.reads.PercentileSpeculativeRetryPolicy; import org.apache.cassandra.service.reads.repair.ReadRepairStrategy; @@ -64,8 +62,7 @@ public void fromSystemTablesPreservesExistingSchemaProperties() KeyspaceParams.simple(3), Tables.of(modifiedTable, nonstandard)); - DistributedSchema schema = DistributedSchema.fromSystemTables(Keyspaces.of(km), - Set.of(DatabaseDescriptor.getLocalDataCenter())); + DistributedSchema schema = DistributedSchema.fromSystemTables(Keyspaces.of(km)); KeyspaceMetadata merged = schema.getKeyspaceMetadata(SchemaConstants.AUTH_KEYSPACE_NAME); assertThat(merged.getTableOrViewNullable(AuthKeyspace.ROLES)).isEqualTo(modifiedTable); assertThat(merged.getTableOrViewNullable("nonstandard")).isEqualTo(nonstandard); diff --git a/test/unit/org/apache/cassandra/tcm/log/DistributedLogStateTest.java b/test/unit/org/apache/cassandra/tcm/log/DistributedLogStateTest.java index 37cee7237347..a4c884406c68 100644 --- a/test/unit/org/apache/cassandra/tcm/log/DistributedLogStateTest.java +++ b/test/unit/org/apache/cassandra/tcm/log/DistributedLogStateTest.java @@ -35,6 +35,7 @@ import org.apache.cassandra.tcm.MetadataSnapshots; import org.apache.cassandra.tcm.transformations.CustomTransformation; import org.apache.cassandra.tcm.transformations.TriggerSnapshot; +import org.apache.cassandra.tcm.transformations.cms.PreInitialize; import static org.apache.cassandra.cql3.QueryProcessor.executeInternal; import static org.apache.cassandra.db.ColumnFamilyStore.FlushReason.UNIT_TESTS; @@ -76,8 +77,12 @@ public void cleanup() } @Override - public void insertRegularEntry() + public void insertRegularEntry() throws IOException { + + if (currentEpoch == Epoch.FIRST) + DistributedMetadataLogKeyspace.insertPreInitialize(PreInitialize.blank()); + nextEpoch = currentEpoch.nextEpoch(); boolean applied = DistributedMetadataLogKeyspace.tryCommit(new Entry.Id(currentEpoch.getEpoch()), CustomTransformation.make((int) currentEpoch.getEpoch()), diff --git a/test/unit/org/apache/cassandra/tcm/log/LogListenerNotificationTest.java b/test/unit/org/apache/cassandra/tcm/log/LogListenerNotificationTest.java index f37313ac4a73..e4105b8e5fab 100644 --- a/test/unit/org/apache/cassandra/tcm/log/LogListenerNotificationTest.java +++ b/test/unit/org/apache/cassandra/tcm/log/LogListenerNotificationTest.java @@ -101,7 +101,7 @@ public void notify(Entry entry, Transformation.Result result) .withLogListener(listener) .createLog(); log.readyUnchecked(); - log.append(new Entry(Entry.Id.NONE, Epoch.FIRST, PreInitialize.forTesting())); + log.append(new Entry(Entry.Id.NONE, Epoch.FIRST, PreInitialize.blank())); log.append(input); } diff --git a/test/unit/org/apache/cassandra/utils/CassandraGenerators.java b/test/unit/org/apache/cassandra/utils/CassandraGenerators.java index e3d401774495..4c0a6758ec0b 100644 --- a/test/unit/org/apache/cassandra/utils/CassandraGenerators.java +++ b/test/unit/org/apache/cassandra/utils/CassandraGenerators.java @@ -1946,7 +1946,7 @@ public Gen build() Epoch epoch = epochGen.generate(rnd); IPartitioner partitioner = partitionerGen.generate(rnd); Directory directory = Directory.EMPTY; - DistributedSchema schema = DistributedSchema.first(directory.knownDatacenters()); + DistributedSchema schema = DistributedSchema.empty(); TokenMap tokenMap = new TokenMap(partitioner); DataPlacements placements = DataPlacements.EMPTY; AccordFastPath accordFastPath = accordFastPathGen.generate(rnd); From a15751ef6a80ab1f231e36ce237ec02618b64289 Mon Sep 17 00:00:00 2001 From: Sam Tunnicliffe Date: Wed, 15 Apr 2026 09:21:18 +0100 Subject: [PATCH 3/6] [CASSANDRA-21477] Update Host ids in the system.peers_v2 table automatically during CMS initialization Following the commit of the INITIALIZE_CMS transformation, make sure that all known peers are updated in the legacy peers tables. Co-authored-by: Marcus Eriksson --- .../tcm/listeners/LegacyStateListener.java | 7 +- .../apache/cassandra/tcm/log/LocalLog.java | 15 ++- ...lusterMetadataUpgradePeersHostIdsTest.java | 109 ++++++++++++++++++ 3 files changed, 126 insertions(+), 5 deletions(-) create mode 100644 test/distributed/org/apache/cassandra/distributed/upgrade/ClusterMetadataUpgradePeersHostIdsTest.java diff --git a/src/java/org/apache/cassandra/tcm/listeners/LegacyStateListener.java b/src/java/org/apache/cassandra/tcm/listeners/LegacyStateListener.java index 52606dc115ba..d8b9dd746e72 100644 --- a/src/java/org/apache/cassandra/tcm/listeners/LegacyStateListener.java +++ b/src/java/org/apache/cassandra/tcm/listeners/LegacyStateListener.java @@ -41,6 +41,7 @@ import org.apache.cassandra.schema.Schema; import org.apache.cassandra.service.StorageService; import org.apache.cassandra.tcm.ClusterMetadata; +import org.apache.cassandra.tcm.Epoch; import org.apache.cassandra.tcm.MultiStepOperation; import org.apache.cassandra.tcm.compatibility.GossipHelper; import org.apache.cassandra.tcm.membership.Directory; @@ -73,8 +74,12 @@ public void notifyPostCommit(ClusterMetadata prev, ClusterMetadata next, boolean Set changed = new HashSet<>(); for (NodeId node : next.directory.peerIds()) { - if (directoryEntryChangedFor(node, prev.directory, next.directory) || !prev.tokenMap.tokens(node).equals(next.tokenMap.tokens(node))) + if (prev.epoch.isEqualOrBefore(Epoch.FIRST) + || directoryEntryChangedFor(node, prev.directory, next.directory) + || !prev.tokenMap.tokens(node).equals(next.tokenMap.tokens(node))) + { changed.add(node); + } } // next.myNodeId() can be null during replay (before we have registered) but if it is present and diff --git a/src/java/org/apache/cassandra/tcm/log/LocalLog.java b/src/java/org/apache/cassandra/tcm/log/LocalLog.java index cd6fb0912902..9e283b324491 100644 --- a/src/java/org/apache/cassandra/tcm/log/LocalLog.java +++ b/src/java/org/apache/cassandra/tcm/log/LocalLog.java @@ -478,8 +478,15 @@ void processPendingInternal() ClusterMetadata prev = committed.get(); // ForceSnapshot + Bootstrap entries can "jump" epoch - boolean isPreInit = pendingEntry.transform.kind() == Transformation.Kind.PRE_INITIALIZE_CMS; - boolean isSnapshot = pendingEntry.transform.kind() == Transformation.Kind.FORCE_SNAPSHOT; + Transformation.Kind kind = pendingEntry.transform.kind(); + boolean isPreInit = kind == Transformation.Kind.PRE_INITIALIZE_CMS; + boolean isInit = kind == Transformation.Kind.INITIALIZE_CMS; + boolean isSnapshot = kind == Transformation.Kind.FORCE_SNAPSHOT ; + // only a PRE_INITIALIZE_CMS or a snapshot is allowed to skip over gaps. + // Note: INITIALIZE_CMS is not allowed to do this as during upgrades it can allow us to jump over the + // PRE_INITIALIZE_CMS entry if the INITIALIZE_CMS is received first. This then creates a gap at Epoch.FIRST + // which can never be resolved. In turn, that makes it impossible to build a LogState for replay purposes + // with a correct and consecutive set of entries if the node is bounced before applying a later snapshot. if (pendingEntry.epoch.isDirectlyAfter(prev.epoch) || ((isPreInit || isSnapshot) && pendingEntry.epoch.isAfter(prev.epoch))) { @@ -515,7 +522,7 @@ void processPendingInternal() if (replayComplete.get() && pendingEntry.transform.kind() != Transformation.Kind.FORCE_SNAPSHOT) storage.append(pendingEntry.maybeUnwrapExecuted()); - notifyPreCommit(prev, next, isSnapshot); + notifyPreCommit(prev, next, isSnapshot || isInit); if (committed.compareAndSet(prev, next)) { @@ -531,7 +538,7 @@ void processPendingInternal() next.epoch, prev.epoch, metadata().epoch)); } - notifyPostCommit(prev, next, isSnapshot); + notifyPostCommit(prev, next, isSnapshot || isInit); } catch (StopProcessingException t) { diff --git a/test/distributed/org/apache/cassandra/distributed/upgrade/ClusterMetadataUpgradePeersHostIdsTest.java b/test/distributed/org/apache/cassandra/distributed/upgrade/ClusterMetadataUpgradePeersHostIdsTest.java new file mode 100644 index 000000000000..727166f29636 --- /dev/null +++ b/test/distributed/org/apache/cassandra/distributed/upgrade/ClusterMetadataUpgradePeersHostIdsTest.java @@ -0,0 +1,109 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.cassandra.distributed.upgrade; + +import java.util.HashMap; +import java.util.Map; +import java.util.UUID; + +import org.junit.Test; + +import org.apache.cassandra.distributed.Constants; +import org.apache.cassandra.distributed.api.Feature; +import org.apache.cassandra.distributed.api.IUpgradeableInstance; +import org.apache.cassandra.distributed.api.Row; +import org.apache.cassandra.distributed.api.SimpleQueryResult; +import org.apache.cassandra.distributed.impl.TestEndpointCache; +import org.apache.cassandra.locator.InetAddressAndPort; +import org.apache.cassandra.tcm.membership.NodeId; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; + +public class ClusterMetadataUpgradePeersHostIdsTest extends UpgradeTestBase +{ + @Test + public void upgradeHostIdUpdateTest() throws Throwable + { + Map preUpgradeIDs = new HashMap<>(); + new TestCase() + .nodes(3) + .nodesToUpgrade(1, 2, 3) + .withConfig((cfg) -> cfg.with(Feature.NETWORK, Feature.GOSSIP) + .set(Constants.KEY_DTEST_FULL_STARTUP, true)) + .upgradesToCurrentFrom(v50) + .setup((cluster) -> { + // combine the system.peers_v2 entries from each instance pre-upgrade + for (int i = 1; i <= 3; i++) + preUpgradeIDs.putAll(getHostIdsFromSystemPeersV2(cluster.get(i))); + + assertEquals(3, preUpgradeIDs.size()); + + for (UUID hostId : preUpgradeIDs.values()) + assertFalse(NodeId.isValidNodeId(hostId)); + }) + .runAfterClusterUpgrade((cluster) -> { + for (int i = 1; i <= 3; i++) + { + // system.peers/peers_v2 should still contain the pre-upgrade ids + IUpgradeableInstance inst = cluster.get(i); + Map expected = new HashMap<>(preUpgradeIDs); + expected.remove(TestEndpointCache.toCassandraInetAddressAndPort(inst.config().broadcastAddress())); + assertEquals(expected, getHostIdsFromSystemPeersV2(cluster.get(i))); + } + + // initialize the CMS and fetch new ids from the ClusterMetadata Directory + cluster.get(1).nodetoolResult("cms", "initialize").asserts().success(); + Map postUpgradeIDs = getHostIdsFromClusterMetadata(cluster.get(1)); + + for (int i = 1; i <= 3; i++) + { + // assert system.peers/peers_v2 now contain the post-upgrade ids + IUpgradeableInstance inst = cluster.get(i); + Map expected = new HashMap<>(postUpgradeIDs); + expected.remove(TestEndpointCache.toCassandraInetAddressAndPort(inst.config().broadcastAddress())); + assertEquals(expected, getHostIdsFromSystemPeersV2(cluster.get(i))); + } + }).run(); + } + + private static Map getHostIdsFromSystemPeersV2(IUpgradeableInstance instance) + { + Map hostIds = new HashMap<>(); + SimpleQueryResult res = instance.executeInternalWithResult("select peer, peer_port, host_id from system.peers_v2"); + while(res.hasNext()) + { + Row row = res.next(); + hostIds.put(InetAddressAndPort.getByAddressOverrideDefaults(row.get(0), row.getInteger(1)), row.getUUID(2)); + } + return hostIds; + } + + private static Map getHostIdsFromClusterMetadata(IUpgradeableInstance instance) + { + Map hostIds = new HashMap<>(); + SimpleQueryResult res = instance.executeInternalWithResult("select broadcast_address, broadcast_port, host_id from system_views.cluster_metadata_directory"); + while(res.hasNext()) + { + Row row = res.next(); + hostIds.put(InetAddressAndPort.getByAddressOverrideDefaults(row.get(0), row.getInteger(1)), row.getUUID(2)); + } + return hostIds; + } +} From 3a5c2463767082a13787ee7cbfb4034d8edd608d Mon Sep 17 00:00:00 2001 From: Sam Tunnicliffe Date: Fri, 8 May 2026 12:25:31 +0100 Subject: [PATCH 4/6] [CASSANDRA-21477] Improve clean up after a failed CMS initialization * Clean up CMS initialization errors which occur after the PreInitialize stage * Initialise gossip/local host id after CMS initialization completes Co-authored-by: Marcus Eriksson --- .../org/apache/cassandra/tcm/Startup.java | 8 +- .../listeners/UpgradeMigrationListener.java | 17 +- .../migration/CMSInitializationException.java | 27 +++ .../cassandra/tcm/migration/Election.java | 20 +++ ...rMetadataUpgradeUnexpectedFailureTest.java | 164 +++++++++++++++--- 5 files changed, 206 insertions(+), 30 deletions(-) create mode 100644 src/java/org/apache/cassandra/tcm/migration/CMSInitializationException.java diff --git a/src/java/org/apache/cassandra/tcm/Startup.java b/src/java/org/apache/cassandra/tcm/Startup.java index 175769798143..57169d98768f 100644 --- a/src/java/org/apache/cassandra/tcm/Startup.java +++ b/src/java/org/apache/cassandra/tcm/Startup.java @@ -65,6 +65,7 @@ import org.apache.cassandra.tcm.log.SystemKeyspaceStorage; import org.apache.cassandra.tcm.membership.NodeId; import org.apache.cassandra.tcm.membership.NodeState; +import org.apache.cassandra.tcm.migration.CMSInitializationException; import org.apache.cassandra.tcm.migration.CMSInitializationRequest; import org.apache.cassandra.tcm.migration.Election; import org.apache.cassandra.tcm.ownership.UniformRangePlacement; @@ -154,7 +155,12 @@ public static void initializeAsFirstCMSNode() ClusterMetadata metadata = ClusterMetadata.current(); assert ClusterMetadataService.state() == LOCAL : String.format("Can't initialize as node hasn't transitioned to CMS state. State: %s.\n%s", ClusterMetadataService.state(), metadata); Initialize initialize = new Initialize(metadata.initializeClusterIdentifier(addr.hashCode())); - ClusterMetadataService.instance().commit(initialize); + ClusterMetadataService.instance().commit(initialize, + m -> { logger.info("INITIALIZE_CMS committed successfully"); return m;}, + (code, message) -> { + logger.info("INITIALIZE_CMS commit failure: ({}) {}", code, message); + throw new CMSInitializationException(); + }); } public static void initializeAsNonCmsNode(Function wrapProcessor) throws StartupException diff --git a/src/java/org/apache/cassandra/tcm/listeners/UpgradeMigrationListener.java b/src/java/org/apache/cassandra/tcm/listeners/UpgradeMigrationListener.java index e66e36a4d2ad..feb191d32a00 100644 --- a/src/java/org/apache/cassandra/tcm/listeners/UpgradeMigrationListener.java +++ b/src/java/org/apache/cassandra/tcm/listeners/UpgradeMigrationListener.java @@ -21,9 +21,11 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import org.apache.cassandra.db.SystemKeyspace; import org.apache.cassandra.gms.Gossiper; import org.apache.cassandra.tcm.ClusterMetadata; import org.apache.cassandra.tcm.Epoch; +import org.apache.cassandra.tcm.membership.NodeId; import org.apache.cassandra.utils.CassandraVersion; /** @@ -42,11 +44,20 @@ public void notifyPostCommit(ClusterMetadata prev, ClusterMetadata next, boolean { if (prev.epoch.equals(Epoch.UPGRADE_GOSSIP)) { - logger.info("Detected upgrade from gossip mode, updating my host id in gossip to {}", next.myNodeId()); - Gossiper.instance.mergeNodeToGossip(next.myNodeId(), next); + logger.info("Detected upgrade from gossip mode"); + return; + } + else if (prev.epoch.equals(Epoch.FIRST) && !next.directory.isEmpty()) // directory is non-empty after initialization during gossip upgrade + { + NodeId localId = next.myNodeId(); + if (localId != null) + { + logger.info("Initialized CMS, updating local host id to {}", next.myNodeId()); + SystemKeyspace.setLocalHostId(next.myNodeId().toUUID()); + Gossiper.instance.mergeNodeToGossip(next.myNodeId(), next); + } if (Gossiper.instance.getQuarantineDisabled()) Gossiper.instance.clearQuarantinedEndpoints(); - return; } CassandraVersion prevMinVersion = prev.directory.clusterMinVersion.cassandraVersion; diff --git a/src/java/org/apache/cassandra/tcm/migration/CMSInitializationException.java b/src/java/org/apache/cassandra/tcm/migration/CMSInitializationException.java new file mode 100644 index 000000000000..c0a1629d784d --- /dev/null +++ b/src/java/org/apache/cassandra/tcm/migration/CMSInitializationException.java @@ -0,0 +1,27 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.cassandra.tcm.migration; + +public class CMSInitializationException extends RuntimeException +{ + public CMSInitializationException() + { + super("CMS initialization failed, see logs for details"); + } +} diff --git a/src/java/org/apache/cassandra/tcm/migration/Election.java b/src/java/org/apache/cassandra/tcm/migration/Election.java index 34ce077583a7..906b647488b7 100644 --- a/src/java/org/apache/cassandra/tcm/migration/Election.java +++ b/src/java/org/apache/cassandra/tcm/migration/Election.java @@ -33,6 +33,8 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import org.apache.cassandra.db.Keyspace; +import org.apache.cassandra.db.Mutation; import org.apache.cassandra.db.SystemKeyspace; import org.apache.cassandra.locator.InetAddressAndPort; import org.apache.cassandra.net.IVerbHandler; @@ -41,8 +43,11 @@ import org.apache.cassandra.net.MessagingService; import org.apache.cassandra.net.Verb; import org.apache.cassandra.schema.DistributedMetadataLogKeyspace; +import org.apache.cassandra.schema.Keyspaces; +import org.apache.cassandra.schema.SchemaConstants; import org.apache.cassandra.schema.SchemaKeyspace; import org.apache.cassandra.tcm.ClusterMetadata; +import org.apache.cassandra.tcm.ClusterMetadataService; import org.apache.cassandra.tcm.Epoch; import org.apache.cassandra.tcm.Startup; import org.apache.cassandra.tcm.membership.Directory; @@ -87,6 +92,8 @@ private Election(MessageDelivery messaging) public void nominateSelf(Set candidates, Set ignoredEndpoints, ClusterMetadata metadata, boolean verifyAllPeersMetadata) { + // Note: this is probably identical to the supplied metadata, but not guaranteed to be + ClusterMetadata priorState = ClusterMetadata.current(); Set sendTo = new HashSet<>(candidates); sendTo.removeAll(ignoredEndpoints); sendTo.remove(FBUtilities.getBroadcastAddressAndPort()); @@ -102,6 +109,19 @@ public void nominateSelf(Set candidates, Set mutations = SchemaKeyspace.convertSchemaDiffToMutations(diff, FBUtilities.timestampMicros()); + SchemaKeyspace.applyChanges(mutations); + + ClusterMetadataService.instance().log().unsafeSetCommittedFromGossip(priorState); throw e; } } diff --git a/test/distributed/org/apache/cassandra/distributed/upgrade/ClusterMetadataUpgradeUnexpectedFailureTest.java b/test/distributed/org/apache/cassandra/distributed/upgrade/ClusterMetadataUpgradeUnexpectedFailureTest.java index 646d4b8222a6..a975465bc2ca 100644 --- a/test/distributed/org/apache/cassandra/distributed/upgrade/ClusterMetadataUpgradeUnexpectedFailureTest.java +++ b/test/distributed/org/apache/cassandra/distributed/upgrade/ClusterMetadataUpgradeUnexpectedFailureTest.java @@ -18,7 +18,6 @@ package org.apache.cassandra.distributed.upgrade; -import java.util.Set; import java.util.concurrent.Callable; import java.util.concurrent.atomic.AtomicBoolean; import java.util.function.Consumer; @@ -30,39 +29,125 @@ import org.junit.Test; +import org.apache.cassandra.db.SystemKeyspace; import org.apache.cassandra.distributed.Constants; import org.apache.cassandra.distributed.UpgradeableCluster; +import org.apache.cassandra.distributed.api.ConsistencyLevel; import org.apache.cassandra.distributed.api.Feature; +import org.apache.cassandra.distributed.api.IInvokableInstance; +import org.apache.cassandra.distributed.api.IUpgradeableInstance; import org.apache.cassandra.locator.InetAddressAndPort; -import org.apache.cassandra.tcm.migration.Election; +import org.apache.cassandra.tcm.ClusterMetadata; +import org.apache.cassandra.tcm.ClusterMetadataService; +import org.apache.cassandra.tcm.log.LocalLog; +import org.apache.cassandra.tcm.migration.CMSInitializationException; +import org.apache.cassandra.utils.Shared; import static net.bytebuddy.matcher.ElementMatchers.named; +import static org.apache.cassandra.schema.SchemaConstants.METADATA_KEYSPACE_NAME; +import static org.apache.cassandra.schema.SchemaConstants.SCHEMA_KEYSPACE_NAME; +import static org.apache.cassandra.schema.SchemaKeyspaceTables.COLUMNS; +import static org.apache.cassandra.schema.SchemaKeyspaceTables.KEYSPACES; +import static org.apache.cassandra.schema.SchemaKeyspaceTables.TABLES; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertNotEquals; +import static org.junit.Assert.assertTrue; public class ClusterMetadataUpgradeUnexpectedFailureTest extends UpgradeTestBase { @Test - public void upgradeFailsUnexpectedlyAfterInitiation() throws Throwable + public void upgradeFailsUnexpectedlyBeforePreInitialize() throws Throwable { + testCMSInitializationError(true, false, "Something unexpected went wrong"); + } + + @Test + public void upgradeFailsUnexpectedlyBeforeInitialize() throws Throwable + { + testCMSInitializationError(false, true, "CMS initialization failed, see logs for details"); + } + + private void testCMSInitializationError(boolean failBeforePreInit, + boolean failAfterPreInit, + String expectedError) throws Throwable + { + BBState.failBeforePreInitialize.set(failBeforePreInit); + BBState.failAfterPreInitialize.set(failAfterPreInit); + Consumer builderUpdater = builder -> builder.withInstanceInitializer(BBInstaller::installUpgradeVersionBB); new TestCase() - .nodes(3) - .nodesToUpgrade(1, 2, 3) - .withConfig((cfg) -> cfg.with(Feature.NETWORK, Feature.GOSSIP) - .set(Constants.KEY_DTEST_FULL_STARTUP, true)) - .upgradesToCurrentFrom(v41) - .withBuilder(builderUpdater) - .setup((cluster) -> { - cluster.schemaChange(withKeyspace("ALTER KEYSPACE %s WITH replication = {'class': 'SimpleStrategy', 'replication_factor':2}")); - cluster.schemaChange("CREATE TABLE " + KEYSPACE + ".tbl (pk int, ck int, v int, PRIMARY KEY (pk, ck))"); - }) - .runAfterClusterUpgrade((cluster) -> { - // we injected a BB helper to trigger an unexpected failure on the first attempt at initialization. - // i.e. an exception that must be caught as opposed to a mismatch in metadata or down peer. - cluster.get(1).nodetoolResult("cms", "initialize").asserts().failure().errorContains("Something unexpected went wrong"); - // handling the failure should have included cleaning up any state so that another attempt can be - // made, which this time should succeed. - cluster.get(1).nodetoolResult("cms", "initialize").asserts().success(); - }).run(); + .nodes(3) + .nodesToUpgrade(1, 2, 3) + .withConfig((cfg) -> cfg.with(Feature.NETWORK, Feature.GOSSIP) + .set(Constants.KEY_DTEST_FULL_STARTUP, true)) + .upgradesToCurrentFrom(v50) + .withBuilder(builderUpdater) + .setup((cluster) -> { + cluster.schemaChange(withKeyspace("ALTER KEYSPACE %s WITH replication = {'class': 'SimpleStrategy', 'replication_factor':2}")); + cluster.schemaChange("CREATE TABLE " + KEYSPACE + ".tbl (pk int, ck int, v int, PRIMARY KEY (pk, ck))"); + for (int i = 0; i < 10; i++) + { + cluster.get(1).coordinator().execute("INSERT into " + KEYSPACE + ".tbl (pk, ck, v) VALUES (?, ?, ?)", + ConsistencyLevel.ALL, + i, i, i); + } + }) + .runAfterClusterUpgrade((cluster) -> { + String oldHostId = getHostId(cluster.get(1)); + // we injected a BB helper to trigger an unexpected failure on the first attempt at initialization. + // i.e. an exception that must be caught as opposed to a mismatch in metadata or down peer. + cluster.get(1).nodetoolResult("cms", "initialize").asserts().failure().errorContains(expectedError); + assertEquals(oldHostId, getHostId(cluster.get(1))); + + // handling the failure should have included cleaning up any state so that another attempt can be + // made, which this time should succeed. + for(IUpgradeableInstance inst : cluster) + inst.nodetoolResult("cms").asserts().success().stdoutContains("Service State: GOSSIP"); + // Basic smoke test + for (int i = 0; i < 10; i++) + { + Object [][] rows = cluster.get(1) + .coordinator() + .execute("SELECT v from " + KEYSPACE + ".tbl WHERE pk = ?", + ConsistencyLevel.ALL, i); + assertEquals(1, rows.length); + assertEquals(i, rows[0][0]); + } + + // Make sure that no trace of the metadata keyspace is present after the CMS initialization failure + assertSchemaTablesContent(cluster, true); + + Object[][] rows = cluster.get(1).coordinator().execute("DESCRIBE FULL SCHEMA", ConsistencyLevel.NODE_LOCAL); + for (Object[] row : rows) + assertFalse(row[0].toString().equalsIgnoreCase(METADATA_KEYSPACE_NAME)); + + assertEquals(oldHostId, getHostId(cluster.get(1))); + // A subsequent initialization should succeed + cluster.get(1).nodetoolResult("cms", "initialize").asserts().success(); + assertNotEquals(oldHostId, getHostId(cluster.get(1))); + assertSchemaTablesContent(cluster, false); + }).run(); + } + + private static void assertSchemaTablesContent(UpgradeableCluster cluster, boolean expectEmpty) + { + for (String schemaTable : new String[] { KEYSPACES, TABLES, COLUMNS }) + { + Object[][] rows = cluster.get(1) + .coordinator() + .execute("SELECT * FROM " + SCHEMA_KEYSPACE_NAME + "." + schemaTable + " WHERE keyspace_name = ?" , + ConsistencyLevel.ALL, METADATA_KEYSPACE_NAME); + if (expectEmpty) + assertEquals(0, rows.length); + else + assertTrue(rows.length >= 1); + } + } + + private static String getHostId(IUpgradeableInstance i) + { + return ((IInvokableInstance)i).callOnInstance(() -> SystemKeyspace.getLocalHostId().toString()); } public static class BBInstaller @@ -71,8 +156,15 @@ public static void installUpgradeVersionBB(ClassLoader classLoader, Integer num) { try { - new ByteBuddy().rebase(Election.class) - .method(named("finish")) + // Fail before the LocalLog is initialized with the PRE_INITIALIZE_CMS + new ByteBuddy().rebase(LocalLog.class) + .method(named("bootstrap")) + .intercept(MethodDelegation.to(BBInterceptor.class)) + .make() + .load(classLoader, ClassLoadingStrategy.Default.INJECTION); + // Fail after the PRE_INITIALIZE_CMS has been enacted, as the INITIALIZE_CMS is being committed + new ByteBuddy().rebase(ClusterMetadataService.class) + .method(named("commit")) .intercept(MethodDelegation.to(BBInterceptor.class)) .make() .load(classLoader, ClassLoadingStrategy.Default.INJECTION); @@ -88,14 +180,20 @@ public static void installUpgradeVersionBB(ClassLoader classLoader, Integer num) } } + @Shared + public static class BBState + { + public static AtomicBoolean failBeforePreInitialize = new AtomicBoolean(false); + public static AtomicBoolean failAfterPreInitialize = new AtomicBoolean(false); + } + public static class BBInterceptor { - private static final AtomicBoolean firstAttempt = new AtomicBoolean(true); @SuppressWarnings("unused") - public static void finish(Set sendTo, @SuperCall Callable zuper) + public static void bootstrap(InetAddressAndPort addr, String datacenterSet, @SuperCall Callable zuper) { - if (firstAttempt.getAndSet(false)) + if (BBState.failBeforePreInitialize.getAndSet(false)) throw new IllegalStateException("Something unexpected went wrong"); try @@ -107,5 +205,19 @@ public static void finish(Set sendTo, @SuperCall Callable zuper) + { + if (BBState.failAfterPreInitialize.getAndSet(false)) + throw new CMSInitializationException(); + try + { + return zuper.call(); + } + catch (Exception e) + { + throw new RuntimeException(e); + } + } } } From b3dff3e035bf58b69ee238591c76f6a4f0c9a64f Mon Sep 17 00:00:00 2001 From: Sam Tunnicliffe Date: Fri, 26 Jun 2026 13:52:09 +0100 Subject: [PATCH 5/6] Don't sync host ids in ClusterMetadata and system tables before CMS initialization --- src/java/org/apache/cassandra/db/SystemPeersValidator.java | 3 +++ 1 file changed, 3 insertions(+) diff --git a/src/java/org/apache/cassandra/db/SystemPeersValidator.java b/src/java/org/apache/cassandra/db/SystemPeersValidator.java index 2f490a4bb4ed..21dbd4b305fd 100644 --- a/src/java/org/apache/cassandra/db/SystemPeersValidator.java +++ b/src/java/org/apache/cassandra/db/SystemPeersValidator.java @@ -34,6 +34,7 @@ import org.apache.cassandra.locator.InetAddressAndPort; import org.apache.cassandra.schema.SchemaConstants; import org.apache.cassandra.tcm.ClusterMetadata; +import org.apache.cassandra.tcm.Epoch; import org.apache.cassandra.tcm.membership.Location; import org.apache.cassandra.tcm.membership.NodeAddresses; import org.apache.cassandra.tcm.membership.NodeId; @@ -68,6 +69,8 @@ public final class SystemPeersValidator public static void validateAndRepair(ClusterMetadata metadata) { + if (metadata.epoch.isBefore(Epoch.FIRST)) + return; Map peersV2Rows = getPeersV2Rows(); Map peersRows = getPeersRows(); From 671f9872d7a87159fd462613eb625ca64d6cd2eb Mon Sep 17 00:00:00 2001 From: Sam Tunnicliffe Date: Fri, 26 Jun 2026 18:40:11 +0100 Subject: [PATCH 6/6] Ignore failing test --- .../distributed/upgrade/ClusterMetadataUpgradeHarryTest.java | 2 ++ 1 file changed, 2 insertions(+) diff --git a/test/distributed/org/apache/cassandra/distributed/upgrade/ClusterMetadataUpgradeHarryTest.java b/test/distributed/org/apache/cassandra/distributed/upgrade/ClusterMetadataUpgradeHarryTest.java index 64810654d5cf..81df43ca8ed8 100644 --- a/test/distributed/org/apache/cassandra/distributed/upgrade/ClusterMetadataUpgradeHarryTest.java +++ b/test/distributed/org/apache/cassandra/distributed/upgrade/ClusterMetadataUpgradeHarryTest.java @@ -24,6 +24,7 @@ import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.atomic.AtomicReference; +import org.junit.Ignore; import org.junit.Test; import org.apache.cassandra.concurrent.Interruptible; @@ -55,6 +56,7 @@ public class ClusterMetadataUpgradeHarryTest extends UpgradeTestBase { @Test + @Ignore("CASSANDRA-21473") public void simpleUpgradeTest() { AtomicReference executor = new AtomicReference<>();