CASSANALYTICS-31 SAI index support in analytics#220
Conversation
There was a problem hiding this comment.
Could you please add test for new methods that are being added specially for SAI file names?
| */ | ||
| public static boolean isDataComponent(Path path) | ||
| { | ||
| return isDataComponent(path.getFileName().toString()); |
There was a problem hiding this comment.
Should we do a null check?
| private static final Pattern COMPACTION_STRATEGY_PATTERN = Pattern.compile("compaction\\s*=\\s*\\{\\s*'class'\\s*:\\s*'([^']+)'"); | ||
|
|
||
| private static final Pattern MULTI_WHITESPACE_PATTERN = Pattern.compile("\\s+"); | ||
| private static final Pattern SAI_USING_PATTERN = Pattern.compile("USING '[^']*STORAGEATTACHEDINDEX'"); |
There was a problem hiding this comment.
This pattern matches with something like "USING 'PrefixStorageAttachedIndex'" which is incorrect, how about using something like "Pattern.compile("USING '([^']*\.)?STORAGEATTACHEDINDEX'")"
| } | ||
|
|
||
| @VisibleForTesting | ||
| static boolean isCassandra5OrLater(String version) |
There was a problem hiding this comment.
Can u check the correctness of this method for a version like 3.11.x? May be worth adding tests for some of the methods you added here in this class
| * @param indexStatements the CREATE INDEX statements for the table | ||
| * @param lowestCassandraVersion the lowest Cassandra version in the cluster | ||
| */ | ||
| static void validateSecondaryIndexes(TableInfoProvider tableInfo, |
There was a problem hiding this comment.
Can u add tests for this method?
| return; | ||
| } | ||
|
|
||
| throw new UnsupportedAnalyticsOperationException("Bulkwriter doesn't support secondary indexes"); |
There was a problem hiding this comment.
How about changing exception message to ""Bulkwriter doesn't support non-SAI indexes."
| * @param indexStatements the CREATE INDEX statements for the table | ||
| * @param lowestCassandraVersion the lowest Cassandra version in the cluster | ||
| */ | ||
| static void validateSecondaryIndexes(TableInfoProvider tableInfo, |
There was a problem hiding this comment.
Is TableInfoProvider tableInfo, parameter needed? can we may be do
if (indexStatements.isEmpty()) instead of if (!tableInfo.hasSecondaryIndex())
There was a problem hiding this comment.
Since this is making important change to the schema, could you please add tests for this class?
| CqlTable cqlTable = new SchemaBuilder(createStatement, keyspace, replicationFactor, partitioner, | ||
| cassandraTypes -> udts, tableId, indexStatements, enableCdc).build(); | ||
| // SAI is a Cassandra 5.0+ feature: register any Storage Attached Index definitions on the table metadata | ||
| // after the shared SchemaBuilder has built/registered the (index-less) table. | ||
| // buildSchema is invoked repeatedly within a JVM (the index-carrying per-partition RecordWriter included), | ||
| // so this re-applies the SAI definitions whenever a rebuild left the table without them. | ||
| StorageAttachedIndexApplier.maybeApply(keyspace, cqlTable.table(), indexStatements); | ||
| return cqlTable; |
There was a problem hiding this comment.
Schema Is created first and then SAI indexes are attached in the next step, I wondering about the brief moment between two statements where the schema doesn't have SAI indexes.
WDYT about merging both operations into a single CassandraSchema.apply in the 5.0 bridge? you might have to override CassandraSchema.apply call entirely from the 5.0 bridge and don't use SchemaBuilder's constructor at all for the schema update.
I am open to other approaches too
|
|
||
| // This builder always produces an index-less table (indexes are applied later by the 5.0 bridge), so a | ||
| // rebuild never carries indexes even if the caller passed index statements. buildSchema runs repeatedly per | ||
| // table in a JVM; if an earlier call already registered indexes, copy them onto this rebuild so it matches | ||
| // the registered table. | ||
| if (maybeExistingTableMetadata != null | ||
| && !maybeExistingTableMetadata.indexes.isEmpty() | ||
| && tableMetadata.indexes.isEmpty()) | ||
| { | ||
| tableMetadata = tableMetadata.unbuild() | ||
| .indexes(maybeExistingTableMetadata.indexes) | ||
| .build(); | ||
| } | ||
|
|
There was a problem hiding this comment.
RecordWriter is using 5 arg buildSchema() method which is not attaching index statements, you could eliminate copying indexes here if you use the 8 arg buildSchema() method in RecordWriter and can eventually get rid of 5 arg buildSchema() method
cqlTable = writerContext.bridge()
.buildSchema(writerContext.schema().getTableSchema().createStatement,
writerContext.job().qualifiedTableName().keyspace(),
IGNORED_REPLICATION_FACTOR,
writerContext.cluster().getPartitioner(),
writerContext.schema().getUserDefinedTypeStatements(),
null,
writerContext.schema().getTableSchema().getIndexStatements(),
false);
Generate SAI index files as well while producing sstable files, so no need of async rebuilding of SAI indexes after bulk write.
Circle CI : https://app.circleci.com/pipelines/github/skoppu22/cassandra-analytics/148/workflows/5eaeec43-e43b-444c-badb-ef696a3828fc