Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions .circleci/config.yml
Original file line number Diff line number Diff line change
Expand Up @@ -374,7 +374,7 @@ workflows:
spark: ["3"]
scala: ["2.13"]
jdk: ["11"]
cassandra: ["5.0.5"]
cassandra: ["5.0.7"]

# Cassandra 5.0 on Spark 4 / Scala 2.13 / JDK 17
- int-test:
Expand All @@ -386,4 +386,4 @@ workflows:
spark: ["4"]
scala: ["2.13"]
jdk: ["17"]
cassandra: ["5.0.5"]
cassandra: ["5.0.7"]
10 changes: 5 additions & 5 deletions .github/workflows/test.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -223,13 +223,13 @@ jobs:
# into each match. To add a new version: add one entry to 'config' and one to
# 'include'.
matrix:
config: ['s2.13-c5.0.5', 's2.12-c4.1.4', 's2.12-c4.0.17', 's2.13-c5.0.5-spark4']
config: ['s2.13-c5.0.7', 's2.12-c4.1.4', 's2.12-c4.0.17', 's2.13-c5.0.7-spark4']
job_index: [0, 1, 2, 3, 4]
job_total: [5]
include:
- config: 's2.13-c5.0.5'
- config: 's2.13-c5.0.7'
scala: '2.13'
cassandra: '5.0.5'
cassandra: '5.0.7'
jdk: '11'
spark: '3'
- config: 's2.12-c4.1.4'
Expand All @@ -242,9 +242,9 @@ jobs:
cassandra: '4.0.17'
jdk: '11'
spark: '3'
- config: 's2.13-c5.0.5-spark4'
- config: 's2.13-c5.0.7-spark4'
scala: '2.13'
cassandra: '5.0.5'
cassandra: '5.0.7'
jdk: '17'
spark: '4'
fail-fast: false
Expand Down
2 changes: 1 addition & 1 deletion build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,7 @@ ext.dependencyLocation = (System.getenv("CASSANDRA_DEP_DIR") ?: "${rootDir}/depe
// - cassandraFullVersionMap values must match the supported_versions default
// NOTE: Both maps must ALSO stay in sync with the values in build-dtest-jars.sh
ext.cassandraVersionEnumMap = ["4.0": "FOURZERO", "4.1": "FOURONE", "5.0": "FIVEZERO"]
ext.cassandraFullVersionMap = ["4.0": "4.0.17", "4.1": "4.1.4", "5.0": "5.0.5"]
ext.cassandraFullVersionMap = ["4.0": "4.0.17", "4.1": "4.1.4", "5.0": "5.0.7"]

// Shared helper: sets implemented_versions and supported_versions system properties on a Test task.
// When majorMinor is provided (e.g. "4.0"), uses that version directly.
Expand Down
2 changes: 1 addition & 1 deletion cassandra-analytics-cdc/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -159,7 +159,7 @@ def configureCdcTestTask = { Test task, String majorMinor = null ->
// Full version format to match CDC's TestVersionSupplier; tests both versions for backward compat.
// 4.1 intentionally excluded from gradlew defaults to keep local iteration fast;
// use testCassandra41 for targeted 4.1 runs. CI covers 4.1 via CASSANDRA_VERSION env var.
task.systemProperty "cassandra.sidecar.versions_to_test", "4.0.17,5.0.5"
task.systemProperty "cassandra.sidecar.versions_to_test", "4.0.17,5.0.7"
}

task.minHeapSize = '1024m'
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableSet;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
import org.apache.commons.lang3.StringUtils;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.MethodSource;
import org.slf4j.Logger;
Expand All @@ -74,6 +75,8 @@
import org.apache.cassandra.spark.data.ReplicationFactor;
import org.apache.cassandra.spark.data.partitioner.CassandraInstance;
import org.apache.cassandra.spark.data.partitioner.Partitioner;
import org.apache.cassandra.spark.data.types.Duration;
import org.apache.cassandra.spark.data.types.TimeUUID;
import org.apache.cassandra.spark.utils.AsyncExecutor;
import org.apache.cassandra.spark.utils.ByteBufferUtils;
import org.apache.cassandra.spark.utils.IOUtils;
Expand All @@ -92,6 +95,7 @@
import static org.apache.cassandra.spark.CommonTestUtils.cql3Type;
import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.Assertions.fail;
import static org.assertj.core.api.Assumptions.assumeThat;
import static org.quicktheories.QuickTheory.qt;
import static org.quicktheories.generators.SourceDSL.arbitrary;

Expand Down Expand Up @@ -442,6 +446,50 @@ public void testList(CassandraVersion version)
.run());
}

@ParameterizedTest
@MethodSource("org.apache.cassandra.cdc.test.TestVersionSupplier#testVersions")
public void testVector(CassandraVersion version)
{
assumeThat(bridge.getVersion().versionNumber()).isGreaterThanOrEqualTo(CassandraVersion.FIVEZERO.versionNumber());
qt().forAll(cql3Type(bridge))
// Cassandra VectorType does not support swapping custom subtype serializer,
// so we cannot use AnalyticsTimeUUIDSerializer or AnalyticsDurationSerializer.
.assuming(t -> !t.cqlName().equals(Duration.INSTANCE.name()) && !t.cqlName().equals(TimeUUID.INSTANCE.name()))
.checkAssert(
t ->
testWith(bridge, cdcBridge, commitLogDir, TestSchema.builder(bridge)
.withPartitionKey("pk", bridge.uuid())
.withColumn("c1", bridge.bigint())
.withColumn("c2", bridge.vector(t, 5)))
.withCdcEventChecker((testRows, events) -> {
for (CdcEvent event : events)
{
assertThat(event.getPartitionKeys().size()).isEqualTo(1);
assertThat(event.getPartitionKeys().get(0).columnName).isEqualTo("pk");
assertThat(event.getClusteringKeys()).isNull();
assertThat(event.getStaticColumns()).isNull();
assertThat(event.getValueColumns().stream()
.map(v -> v.columnName)
.collect(Collectors.toList())).isEqualTo(Arrays.asList("c1", "c2"));
Value vectorValue = event.getValueColumns().get(1);
String vectorType = vectorValue.columnType;
assertThat(vectorType.startsWith("vector<")).isTrue();
assertThat(vectorType.endsWith(">")).isTrue();
assertCqlTypeEquals(t.cqlName(),
vectorType.substring(vectorType.indexOf("<") + 1, vectorType.indexOf(","))); // extract the type in vector<?, ?>
String dimensions = StringUtils.substringAfter(vectorType, ",");
dimensions = dimensions.substring(0, dimensions.length() - 1).trim();
assertThat(dimensions).isEqualTo("5");
Object v = bridge.parseType(vectorType).deserializeToJavaType(vectorValue.getValue());
assertThat(v).isInstanceOf(List.class);
List list = (List) v;
assertThat(list.size()).isGreaterThan(0);
assertThat(event.getTtl()).isNull();
}
})
.run());
}

@ParameterizedTest
@MethodSource("org.apache.cassandra.cdc.test.TestVersionSupplier#testVersions")
public void testMap(CassandraVersion version)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ private TestVersionSupplier()

public static Stream<CassandraVersion> testVersions()
{
String versions = System.getProperty("cassandra.sidecar.versions_to_test", "4.0.17,5.0.5");
String versions = System.getProperty("cassandra.sidecar.versions_to_test", "4.0.17,5.0.7");
return Arrays.stream(versions.split(","))
.map(String::trim)
.map(v -> CassandraVersion.fromVersion(v).orElseThrow(() -> new IllegalArgumentException("Unsupported version: " + v)));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,12 +36,12 @@
* NOTE: The following values need to stay in sync with:
* - build.gradle:
* - ext.cassandraVersionEnumMap = ["4.0": "FOURZERO", "4.1": "FOURONE", "5.0": "FIVEZERO"]
* - ext.cassandraFullVersionMap = ["4.0": "4.0.17", "4.1": "4.1.4", "5.0": "5.0.5"]
* - ext.cassandraFullVersionMap = ["4.0": "4.0.17", "4.1": "4.1.4", "5.0": "5.0.7"]
* - build-dtest-jars.sh:
* - CANDIDATE_BRANCHES=(
* "cassandra-4.0:cassandra-4.0.17"
* "cassandra-4.1:99d9faeef57c9cf5240d11eac9db5b283e45a4f9"
* "cassandra-5.0:cassandra-5.0.5"
* "cassandra-5.0:cassandra-5.0.7"
*/
public enum CassandraVersion
{
Expand Down Expand Up @@ -98,7 +98,7 @@ public String jarBaseName()

// NOTE: These default versions must stay in sync with cassandraFullVersionMap in build.gradle.
String providedSupportedVersionsOrDefault = System.getProperty("cassandra.analytics.bridges.supported_versions",
"cassandra-4.0.17,cassandra-5.0.5");
"cassandra-4.0.17,cassandra-5.0.7");
supportedVersions = Arrays.stream(providedSupportedVersionsOrDefault.split(","))
.filter(version -> CassandraVersion.fromVersion(version)
.filter(v -> v.sstableFormats.contains(sstableFormat))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@
public abstract class CassandraTypes
{
public static final Pattern COLLECTION_PATTERN = Pattern.compile("^(set|list|map|tuple)<(.+)>$", Pattern.CASE_INSENSITIVE);
public static final Pattern VECTOR_PATTERN = Pattern.compile("^(vector)<(.+),(.+)>$", Pattern.CASE_INSENSITIVE);
public static final Pattern FROZEN_PATTERN = Pattern.compile("^frozen<(.*)>$", Pattern.CASE_INSENSITIVE);

private final UDTs udts = new UDTs();
Expand Down Expand Up @@ -133,6 +134,8 @@ public List<CqlField.NativeType> supportedTypes()

public abstract CqlField.CqlList list(CqlField.CqlType type);

public abstract CqlField.CqlVector vector(CqlField.CqlType type, int dimensions);

public abstract CqlField.CqlSet set(CqlField.CqlType type);

public abstract CqlField.CqlMap map(CqlField.CqlType keyType, CqlField.CqlType valueType);
Expand Down Expand Up @@ -189,6 +192,14 @@ public CqlField.CqlType parseType(String type, Map<String, CqlField.CqlUdt> udts
.map(collectionType -> parseType(collectionType, udts))
.toArray(CqlField.CqlType[]::new));
}
Matcher vectorMatcher = VECTOR_PATTERN.matcher(type);
if (vectorMatcher.find())
{
// CQL vector
String subType = vectorMatcher.group(2);
int dimensions = Integer.parseInt(vectorMatcher.group(3).trim());
return vector(parseType(subType, udts), dimensions);
}
Matcher frozenMatcher = FROZEN_PATTERN.matcher(type);
if (frozenMatcher.find())
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,7 @@ public interface CqlType extends Serializable
{
enum InternalType
{
NativeCql, Set, List, Map, Frozen, Udt, Tuple;
NativeCql, Set, List, Map, Frozen, Udt, Tuple, Vector;

public static InternalType fromString(String name)
{
Expand All @@ -77,6 +77,8 @@ public static InternalType fromString(String name)
return Set;
case "list":
return List;
case "vector":
return Vector;
case "map":
return Map;
case "tuple":
Expand Down Expand Up @@ -237,6 +239,10 @@ public interface CqlList extends CqlCollection
{
}

public interface CqlVector extends CqlCollection
{
}

public interface CqlTuple extends CqlCollection
{
ByteBuffer serializeTuple(Object[] values);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ public final class CqlUtils
"min_index_interval",
"max_index_interval"
);
private static final Pattern REPLICATION_FACTOR_PATTERN = Pattern.compile("WITH REPLICATION = (\\{[^\\}]*\\})");
private static final Pattern REPLICATION_FACTOR_PATTERN = Pattern.compile("WITH REPLICATION = (\\{[^\\}]*\\})", Pattern.CASE_INSENSITIVE);
// Initialize a mapper allowing single quotes to process the RF string from the CREATE KEYSPACE statement
private static final ObjectMapper MAPPER = new ObjectMapper().configure(JsonParser.Feature.ALLOW_SINGLE_QUOTES, true);
private static final Pattern ESCAPED_WHITESPACE_PATTERN = Pattern.compile("(\\\\r|\\\\n|\\\\r\\n)+");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,7 @@ public final class SqlToCqlTypeConverter implements Serializable
public static final String UDT = "udt";
public static final String VARCHAR = "varchar";
public static final String VARINT = "varint";
public static final String VECTOR = "vector";
private static final Logger LOGGER = LoggerFactory.getLogger(SqlToCqlTypeConverter.class);
private static final NoOp<Object> NO_OP_CONVERTER = new NoOp<>();
private static final LongConverter LONG_CONVERTER = new LongConverter();
Expand Down Expand Up @@ -165,6 +166,7 @@ public static Converter<?> getConverter(CqlField.CqlType cqlType)
case TINYINT:
return NO_OP_CONVERTER;
case LIST:
case VECTOR:
return new ListConverter<>((CqlField.CqlCollection) cqlType);
case MAP:
assert cqlType instanceof CqlField.CqlMap;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@
import org.apache.cassandra.spark.utils.RandomUtils;

import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.Assumptions.assumeThat;
import static org.quicktheories.QuickTheory.qt;
import static org.quicktheories.generators.SourceDSL.arbitrary;
import static org.quicktheories.generators.SourceDSL.booleans;
Expand Down Expand Up @@ -172,6 +173,32 @@ public void testCqlFieldList(CassandraBridge bridge)
});
}

@ParameterizedTest
@MethodSource("org.apache.cassandra.bridge.VersionRunner#bridges")
public void testCqlFieldVector(CassandraBridge bridge)
{
assumeThat(bridge.getVersion().versionNumber()).isGreaterThanOrEqualTo(CassandraVersion.FIVEZERO.versionNumber());
qt().withExamples(25)
.forAll(booleans().all(), booleans().all(), TestUtils.cql3Type(bridge), integers().all())
.checkAssert((isPartitionKey, isClusteringKey, cqlType, position) -> {
CqlField.CqlVector vectorType = bridge.vector(cqlType, 5);
CqlField field = new CqlField(isPartitionKey,
isClusteringKey && !isPartitionKey,
false,
RandomUtils.randomAlphanumeric(5, 20),
vectorType,
position);
Output out = serialize(bridge.getVersion(), field);
CqlField deserialized = deserialize(bridge.getVersion(), out, CqlField.class);
assertThat(deserialized).isEqualTo(field);
assertThat(deserialized.name()).isEqualTo(field.name());
assertThat(deserialized.type()).isEqualTo(field.type());
assertThat(deserialized.position()).isEqualTo(field.position());
assertThat(deserialized.isPartitionKey()).isEqualTo(field.isPartitionKey());
assertThat(deserialized.isClusteringColumn()).isEqualTo(field.isClusteringColumn());
});
}

@ParameterizedTest
@MethodSource("org.apache.cassandra.bridge.VersionRunner#bridges")
public void testCqlFieldMap(CassandraBridge bridge)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -95,7 +95,7 @@ public interface CommitResultSupplier extends BiFunction<List<String>, String, D
{
}

public static final String DEFAULT_CASSANDRA_VERSION = "cassandra-5.0.5";
public static final String DEFAULT_CASSANDRA_VERSION = "cassandra-5.0.7";

private final UUID jobId;
private boolean skipClean = false;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -309,7 +309,7 @@ void testWriteWithSubRanges(String version)
@MethodSource("data")
void testWriteWithDataInMultipleSubRanges(String version)
{
version = "cassandra-5.0.5";
version = "cassandra-5.0.7";
setUp(version);
MockBulkWriterContext m = Mockito.spy(writerContext);
TokenPartitioner mtp = Mockito.mock(TokenPartitioner.class);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,7 @@ private void setup(ConsistencyLevel.CL consistencyLevel)
{
digestAlgorithm = new XXHash32DigestAlgorithm();
tableWriter = new MockTableWriter(folder);
writerContext = new MockBulkWriterContext(TOKEN_RANGE_MAPPING, "cassandra-5.0.5", consistencyLevel);
writerContext = new MockBulkWriterContext(TOKEN_RANGE_MAPPING, "cassandra-5.0.7", consistencyLevel);
writerContext.setReplicationFactor(new ReplicationFactor(NetworkTopologyStrategy, rfOptions));
transportContext = (TransportContext.DirectDataBulkWriterContext) writerContext.transportContext();
}
Expand Down
Loading
Loading