CASSANALYTICS-24 and CASSANALYTICS-25 SSTable version based bridge determination#169
CASSANALYTICS-24 and CASSANALYTICS-25 SSTable version based bridge determination#169skoppu22 wants to merge 10 commits into
Conversation
sarankk
left a comment
There was a problem hiding this comment.
Thanks Shailaja, took a first pass. Reviewed half the files.
| this.name = name; | ||
| this.jarBaseName = jarBaseName; | ||
| this.sstableFormats = new HashSet<>(Arrays.asList(sstableFormats)); | ||
| this.nativeSStableVersions = Collections.unmodifiableList(Arrays.asList(nativeSStableVersions)); |
There was a problem hiding this comment.
Nit
| this.nativeSStableVersions = Collections.unmodifiableList(Arrays.asList(nativeSStableVersions)); | |
| this.nativeSStableVersions = List.of(nativeSStableVersions); |
| THREEZERO(30, "3.0", "three-zero", new String[]{"big"}, | ||
| new String[]{ | ||
| // Cassandra 3.x native sstable versions | ||
| // order is important, used to determine the latest version |
There was a problem hiding this comment.
Array based ordering will be hard to debug in case of issues and hard to maintain as well.
There was a problem hiding this comment.
After reading SSTableVersionAnalyzer we could do away without the internal ordering between the sstable versions. Since we are still interested in the associated cassandra version. We could treat the sstable versions within the Cassandra version on same level. Wdyt?
There was a problem hiding this comment.
Yes, regardless of which sstable version found in a given Cassandra version, we choose that cassandra version as long as there is no sstable version from higher cassandra version. So the order of sstable versions within a cassandra version doesn't matter. Updated the sorting logic not to depend on array index.
|
|
||
| /** | ||
| * Analyzes SSTable versions on a cluster to determine the appropriate | ||
| * Cassandra bridge to load for bulk write operations. |
There was a problem hiding this comment.
We are using version analyzer for both read and write
| * Cassandra bridge to load for bulk write operations. | |
| * Cassandra bridge to load for bulk read/write operations. |
| if (!v2Opt.isPresent()) | ||
| { | ||
| throw new IllegalArgumentException( | ||
| String.format("Unknown SSTable version: %s. Cannot determine Cassandra version.", v2)); |
There was a problem hiding this comment.
Instead of throwing even if one format is incorrect, we could log these as errors and throw in the end if no max version is found.
There was a problem hiding this comment.
We should error and abort job if encountered unrecognized sstable versions. We can log error and ask the user to run the job using fallback mechanism if the user thinks it is safe to run the job. I added error to log this.
|
|
||
| // Calculate previous major version: (majorVersion - 1) * 10 | ||
| // E.g., 5 -> 40, 4 -> 30, 3 -> 20 | ||
| return (majorVersion - 1) * 10; |
There was a problem hiding this comment.
Nit: How about we keep an ordered list of CassandraVersion enums and return the previous one, instead of computing the version.
There was a problem hiding this comment.
C* 5.0 supports can read C* 4.0, 4.1, 4.2 ...
C* 4.x can read C* 3.0, 3.1, ..
We cannot add all minor versions and won't be able to keep up with C* release cycle if we do so. Also this PR goal is to make C* analytics independent of C* version numbers. Hence dynamically calculating this makes analytics independent of C* releases.
| void testGetSSTableVersionIndexValidVersion() | ||
| { | ||
| int index = CassandraVersion.FOURZERO.getSSTableVersionIndex("big-na"); | ||
| assertThat(index).isEqualTo(0); |
There was a problem hiding this comment.
We could avoid the index checks if we treat the SSTable versions within a Cassandra version on the same level.
| public class SSTableVersionAnalyzerTest | ||
| { | ||
| @Test | ||
| void testDetermineBridgeVersionForWriteFallbackDisabledSingleVersion() |
There was a problem hiding this comment.
Nit: Could we make these tests parameterized, passing in expected result as well?
| if (sstableVersionsOnCluster == null || sstableVersionsOnCluster.isEmpty()) | ||
| { | ||
| throw new IllegalStateException(String.format( | ||
| "Unable to retrieve SSTable versions from cluster. " + |
There was a problem hiding this comment.
Why not support fallback here and return Cassandra version based bridge?
There was a problem hiding this comment.
Here we are asking user to run the job using fallback mechanism if they thinks that's appropriate. We do not use fallback mechanism when spark.cassandra_analytics.bridge.disable_sstable_version_based is false. There must be a reason why sstable version based bridge selection is failing. User can evaluate the cluster state and try with fallback mechanism only if they think that's fine for their use case or cluster situation.
| Set<SidecarInstance> instances) | ||
| { | ||
| return instances.stream() | ||
| .map(instance -> client |
| * @param instances all Sidecar instances | ||
| * @return completable futures with GossipInfoResponse | ||
| */ | ||
| public static List<CompletableFuture<GossipInfoResponse>> gossipInfoFromAllNodes(SidecarClient client, |
There was a problem hiding this comment.
Not for addressing in this PR, but would be good to have a cluster wide gossip call available in Sidecar.
| return CassandraClusterInfo.getLowestCassandraVersion(conf, null); | ||
| } | ||
|
|
||
| @Override | ||
| protected Set<String> getSSTableVersionsOnCluster(@NotNull BulkSparkConf conf) | ||
| { | ||
| return CassandraClusterInfo.getSSTableVersionsOnCluster(conf, null); | ||
| } | ||
|
|
||
| @Override | ||
| protected ClusterInfo buildClusterInfo(CassandraVersion bridgeVersion) | ||
| { | ||
| return new CassandraClusterInfo(bulkSparkConf(), bridgeVersion); |
There was a problem hiding this comment.
We seem to buildCassandraContext thrice among these 3 method, each cassandra context initializes its own sidecar client. We should avoid this and combine them.
There was a problem hiding this comment.
Looking into the possibility, seems difficult as we need to build context once and store somewhere to reuse it.
There was a problem hiding this comment.
Fixed this. There is a chicken and egg problem here. Context is needed to determine the bridge, and context contains determined bridge version. Hence creating preliminary context initially (with null bridge version) which is used for bridge determination, then we create final context with the bridge value. Also made changes to reuse nodesettings.
| for (String clusterId : coordinatedWriteConf.clusters().keySet()) | ||
| { | ||
| Set<String> sstableVersions = CassandraClusterInfo.getSSTableVersionsOnCluster(conf, clusterId); | ||
| aggregatedSSTableVersions.addAll(sstableVersions); |
There was a problem hiding this comment.
the check we do comparing lowest and highest Cassandra version in getLowestCassandraVersion is missing when we get bridge based on SSTable versions.
There was a problem hiding this comment.
We do that above in getLowestCassandraVersion function, line 393 sorts versions, line 396 picks the lowest and line 397 picks the highest.
Whereas this function is getSSTableVersionsOnCluster, to get all sstable versions on the cluster. Then bridge determination logic sorts them, picks highest version and determines bridge version accordingly.
Determines the Cassandra bridge version from the highest SSTable version found on the cluster, making analytics independent of the Cassandra server version. Applied to both bulk writer and reader paths.
590d24d to
399af9a
Compare
| * @return new {@link CassandraClusterInfoGroup} instance | ||
| */ | ||
| public static CassandraClusterInfoGroup fromBulkSparkConf(BulkSparkConf conf) | ||
| public static CassandraClusterInfoGroup fromBulkSparkConf(BulkSparkConf conf, CassandraVersion bridgeVersion) |
There was a problem hiding this comment.
NIT, bridgeVersion seems always passed null.
There was a problem hiding this comment.
Removed bridgeVersion param
|
|
||
| // Initialize SSTable versions and bridge version | ||
| CassandraVersion bridgeVersion = initializeSSTableVersionsAndBridgeVersion(cassandraVersion); | ||
| bridge = CassandraBridgeFactory.get(bridgeVersion); |
There was a problem hiding this comment.
Should we add here the log analogical to write-side, e.g. "Selected bridge version: {}, SSTable versions: {}?
In the determineBridgeVersionForRead we have below log which is not present in determineBridgeVersionForWrite.
LOGGER.debug("Determined bridge version {} for read based on SSTable versions on cluster: {}", bridgeVersion.versionName(), sstableVersionsOnCluster);
I would suggest to make both logs about determined sstable version "symmetric".
There was a problem hiding this comment.
Write had log message in the caller. Now removed that and added comment in determineBridgeVersionForWrite. Also made log messages in determineBridgeVersionForRead and determineBridgeVersionForWrite as info as it gets printed only once in driver and helps in debugging.
| } | ||
| else | ||
| { | ||
| this.sstableVersionsOnCluster = retrieveSSTableVersionsFromCluster(); |
There was a problem hiding this comment.
HashSet is not guaranteed here, compare comment above.
| if ("bti".equals(sstableFormat())) | ||
| { | ||
| // BTI (bti-da) is a Cassandra 5.0+ format; skip on older versions. | ||
| assumeTrue(testVersion.version().startsWith("5."), |
There was a problem hiding this comment.
NIT: Just to support Cassandra 6, maybe we can do:
Semver version = new Semver(testVersion.version(), Semver.SemverType.LOOSE);
assumeTrue(version.isGreaterThanOrEqualTo(new Semver("5.0", Semver.SemverType.LOOSE)),
"BTI format (bti-da) requires Cassandra 5.0+, but test version is " + testVersion.version());
|
Using this branch I was able to bundle DS Cassandra bridges that process |
yifan-c
left a comment
There was a problem hiding this comment.
Submit since there are a handful of comments already. Note that it is not a full review.
| public static final String CASSANDRA_VERSION = SETTING_PREFIX + "cassandra.version"; | ||
| // Disable SSTable version-based bridge determination. When true, falls back to using cassandra.version for bridge selection. | ||
| // This provides a safety fallback mechanism if SSTable version detection fails or encounters issues. | ||
| public static final String DISABLE_SSTABLE_VERSION_BASED_BRIDGE = SETTING_PREFIX + "bridge.disable_sstable_version_based"; |
There was a problem hiding this comment.
- add one more space before
=to align with the other lines. - The exact same config is defined again in SSTableVersionAnalyzer. It is error-prone. The spark config key should be defined in one single place, and it should be declared in
BulkSparkConf. SSTableVersionAnalyzershould be relocated toanayltics-coresubproject. The reason is that thecommonsubproject should be spark-agnostic.SSTableVersionAnalyzercurrently reference to spark config key. Plus, the original version determination code is inanalytics-coretoo.
There was a problem hiding this comment.
SSTableVersionAnalyzer should live in cassandra-analytics-core, not common. Its only callers (AbstractBulkWriterContext, CassandraDataLayer) are in core, and the version-determination logic it replaces (CassandraClusterInfo.getVersionFromSidecar) has always been in core.
Placing it in common also forces the DISABLE_SSTABLE_VERSION_BASED_BRIDGE Spark config key into the common module, which should remain Spark-agnostic. Move SSTableVersionAnalyzer (and its test) to core, define the constant solely in BulkSparkConf, and reference BulkSparkConf.DISABLE_SSTABLE_VERSION_BASED_BRIDGE from the analyzer's error messages.
| // Check for fallback mode | ||
| Optional<CassandraVersion> fallback = resolveFallbackVersion(cassandraVersion, isSSTableVersionBasedBridgeDisabled); | ||
| if (fallback.isPresent()) | ||
| { | ||
| return fallback.get(); | ||
| } |
There was a problem hiding this comment.
The method resolveFallbackVersion is misleading. "Fallback" implies the method is triggered when SSTable version-based selection has failed, but it actually represents the original pre-patch behavior — bridge selection driven by the Cassandra release version — which the user opts into explicitly via configuration. Nothing has failed when this path is taken.
I'd suggest rename the method to resolveLegacyVersionBasedBridge and the variable to version.
| .orElseThrow(() -> new IllegalStateException( | ||
| String.format("Unknown SSTable version: %s. Cannot determine bridge version. " + | ||
| "SSTable versions on cluster: %s. " + | ||
| "To retry the job using a fallback Cassandra version, " + |
There was a problem hiding this comment.
The "fallback" wording is confusing. Refer to my previous comment for the reasoning.
There is no fallback that happens automatically; therefore it is not a fallback.
If my understanding is correct, it is to ask user to retry with the original cassandra.version-based bridge selection. Please update the error message to reflect the fact.
| metadata -> new RingInstance(metadata, clusterId())); | ||
| } | ||
|
|
||
| public String getVersionFromFeature() |
There was a problem hiding this comment.
The behavior is changed unexpectedly with the method deletion.
getVersionFromFeature is used in getLowestCassandraVersion to allow version override via feature flag. The override behavior should be kept, i.e. a feature flag can override the determined cassandra version from the sstable analyzer.
This patch should only introduce the new sstable based cassandra version determination logic.
There was a problem hiding this comment.
With bridge determination feature getLowestCassandraVersion is only used for logging in the driver. Executors doesn't need it. If I remember correctly (opened PR more than a month ago, might have forgotten a bit), I might have modified getLowestCassandraVersion as one time call as not needed on executors. Let me try to recollect and restore it.
There was a problem hiding this comment.
I have restored getVersionFromFeature. But old style getLowestCassandraVersion with cache value is not needed as it now gets invoked only on driver
|
|
||
| // Determine bridge version | ||
| this.bridgeVersion = SSTableVersionAnalyzer.determineBridgeVersionForWrite( | ||
| sstableVersionsOnCluster, |
There was a problem hiding this comment.
sstableVersionsOnCluster can be null when isSSTableVersionBasedBridgeDisabled == true. But SSTableVersionAnalyzer.determineBridgeVersionForWrite disallows null value.
In fact, you may not need the boolean parameter isSSTableVersionBasedBridgeDisabled in the method SSTableVersionAnalyzer.determineBridgeVersionForWrite. When sstableVersionsOnCluster is null, it already indicate the feature is disabled.
Can you update the corresponding code paths to simplify the implementation?
There was a problem hiding this comment.
- determineBridgeVersionForWrite first checks for the isSSTableVersionBasedBridgeDisabled, if true uses fallback logic without looking at sstableVersionsOnCluster value.
- if isSSTableVersionBasedBridgeDisabled is false, then we must have retrieved sstableVersionsOnCluster and there should be atleast one sstable on the cluster, hence we shouldn't have it null or empty.
- We have feature flag available here, better to use it instead of hacking on sstableVersionsOnCluster being null which can be error case as well by any chance.
| // Validate that Kryo registrator exists for this bridge version | ||
| KryoRegister.validateKryoRegistratorExists(this.bridgeVersion, lowestCassandraVersion); |
There was a problem hiding this comment.
I believe KryoRegistrator is not really the concern; instead, the real concern is to ensure the bridgeVersion determined is recognized.
To make the API clean, I'd rather suggest the change to ensure that SSTableVersionAnalyzer always returns a recognized bridgeVersion or throw when unable to determine.
There was a problem hiding this comment.
We added this call KryoRegister.validateKryoRegistratorExists when we were adding only one cassandra.version registrator class to KRYO_SERIALIZERS, which may not be the same as determined bridgerversion. Now I have modified KryoRegister to add all implemented registrator classes to KRYO_SERIALIZERS, so this check is not needed. We can leave it as it is just to verify upfront or we can remove it.
SSTableVersionAnalyzer already ensures only recognized bridgeversion is returned.
There was a problem hiding this comment.
Please remove the unnecessary code. In general, less lines of code the better.
| /** | ||
| * Retrieves the lowest Cassandra version from all contained clusters. | ||
| * | ||
| * @return lowest Cassandra version string across all clusters | ||
| */ | ||
| public String getLowestCassandraVersion() | ||
| { | ||
| Map<String, String> clusterVersions = new HashMap<>(); | ||
| for (ClusterInfo ci : clusterInfos) | ||
| { | ||
| CassandraClusterInfo cci = (CassandraClusterInfo) ci; | ||
| clusterVersions.put(ci.clusterId(), cci.getLowestCassandraVersion()); | ||
| } | ||
|
|
||
| // Find the lowest version across all clusters | ||
| List<CassandraVersionFeatures> versions = clusterVersions.values() | ||
| .stream() | ||
| .map(CassandraVersionFeatures::cassandraVersionFeaturesFromCassandraVersion) | ||
| .sorted() | ||
| .collect(Collectors.toList()); | ||
|
|
||
| CassandraVersionFeatures first = versions.get(0); | ||
| CassandraVersionFeatures last = versions.get(versions.size() - 1); | ||
| Preconditions.checkState(first.getMajorVersion() == last.getMajorVersion(), | ||
| "Cluster versions are not compatible. lowest=%s and highest=%s", | ||
| first.getRawVersionString(), last.getRawVersionString()); | ||
|
|
||
| return first.getRawVersionString(); | ||
| } | ||
|
|
There was a problem hiding this comment.
Why is the original implementation getLowestCassandraVersion in this class changed?
I do not see an obvious benefit; but the unrelated change is making code review hard. Please revert the change, unless there is a pressing reason.
There was a problem hiding this comment.
Now lowestCassandraVersion is driver only, not needed on executors. Hence rewritten getLowestCassandraVersion for one time call.
There was a problem hiding this comment.
It is not true. Cassandra version should be determined on task start (on executors) and fail the task early.
| aggregatedSSTableVersions.addAll(cci.getSSTableVersionsOnCluster()); | ||
| } | ||
| return aggregatedSSTableVersions; | ||
| } |
There was a problem hiding this comment.
Follow the pattern in the original getLowestCassandraVersion, i.e. return cached if available, otherwise get first, finally aggregate when there are multiple.
There was a problem hiding this comment.
getSSTableVersionsOnCluster is called only on driver. Executors get determined bridge version. So there is no cached value to return.
| CassandraVersionFeatures first = versions.get(0); | ||
| CassandraVersionFeatures last = versions.get(versions.size() - 1); | ||
| Preconditions.checkState(first.getMajorVersion() == last.getMajorVersion(), | ||
| "Cluster versions are not compatible. lowest=%s and highest=%s", | ||
| first.getRawVersionString(), last.getRawVersionString()); |
There was a problem hiding this comment.
This check should be lifted when sstable-based bridge determination is enabled. Mismatching major version no longer mean incompatibility necessarily.
There was a problem hiding this comment.
On the cluster, if one node is running 5.0 and another node running 4.x, the bulk writer selects highest supported format 5.0 as bridge and writes BTI or BIG-oa sstables. 4.x node will fail to read them. This will impact live node operations reading that table as well.
The assumption behind this new feature is
- either all nodes in a cluster are in 4.x version
- or all nodes in a cluster in 5.0 version and SCM is cassandra_4 or upgrading or none or combination of these.
In both cases, lowest and highest major version should match. Hence we need to have this check in bridge selection logic for write as well, which I missed adding.
Other option is, we need to select lowest supported sstable format for write, so all nodes understands the sstables being written. With this we need to allow lowest and highest major versions to differ as long as lowest sstable version can be read on highest cassandra version node. i.e, we need to change bridge determination logic to lowest sstable version for bulk write and highest sstable version for bulk read.
There was a problem hiding this comment.
The version determination logic in the SSTableVersionAnalyzer in wrong. Writer should pick the lowest compatible bridge; reader should pick the highest compatible bridge. Please check out the fix in https://github.com/yifan-c/cassandra-analytics/commits/simplify-sstable-bridge/
There was a problem hiding this comment.
- Create getBridgeVersion which first picks version override if available, otherwise next picks bridge based on sstables version (if feature enabled), otherwise picks bridge legacy style
- Modified determineBridgeVersionForWrite to pick lowest version as bridge, this picks lowest compatible version for a cluster
- CassandraClusterInfoGroup's getBridgeVersion picks lowest compatible version across clusters and verifies highest version can read sstables written by selected bridge version
- Modified CassandraVersion enum to have lowestCompatibleVersionNumber instead of calculating dynamically
- Modified bulk reader's validateSStableVersions (runs on executors) to ensure bridge version can support sstables seen (previously was just checking sstable versions seen are same as driver seen), this allows different sstable versions to be written by the time executors run and works as long as bridge can support them
Jira tasks: CASSANALYTICS-24 and CASSANALYTICS-25
This PR adds sstable version based bridge determination feature to C* analytics. This feature allows C* analytics to choose bridge (which C* version jar to load) based on sstable versions existing on the cluster, instead of strictly depending on running C* versions. This is needed especially to support C* 5.0, which supports multiple compatibility modes.
CI run: https://app.circleci.com/pipelines/github/skoppu22/cassandra-analytics/37/workflows/da025a29-00c0-41f2-95e8-63c2b7f1f276