diff --git a/CHANGES.txt b/CHANGES.txt index 58d4cf960..e1c21b0d2 100644 --- a/CHANGES.txt +++ b/CHANGES.txt @@ -1,5 +1,6 @@ 0.5.0 ----- + * Exclude IP address from RingInstance equality so node replacement does not fail bulk write jobs (CASSANALYTICS-175) * Regenerate bloom filters for CQLSSTableWriter (CASSANALYTICS-167) * Avoid Spark 4 partitioning warnings during bulk reads (CASSANALYTICS-171) * Spark 4.0 Support (CASSANALYTICS-34) diff --git a/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/RingInstance.java b/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/RingInstance.java index ea8231a02..5e5ed60e9 100644 --- a/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/RingInstance.java +++ b/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/RingInstance.java @@ -123,9 +123,12 @@ public NodeStatus nodeStatus() } /** - * Custom equality that compares the token, fully qualified domain name, the port, the datacenter and the clusterId + * Custom equality that compares the token, fully qualified domain name, the rack, the port, the datacenter + * and the clusterId * - * Note that node state, status, are not part of the calculation. + * Note that node state, status and IP address are not part of the calculation. The IP address is excluded + * because a node can come back with a different IP address (e.g. a pod replacement in Kubernetes) while + * remaining the same logical instance. * * @param other the other instance * @return true if both instances are equal, false otherwise @@ -147,22 +150,22 @@ public boolean equals(@Nullable Object other) && Objects.equals(ringEntry.token(), that.ringEntry.token()) && Objects.equals(ringEntry.fqdn(), that.ringEntry.fqdn()) && Objects.equals(ringEntry.rack(), that.ringEntry.rack()) - && Objects.equals(ringEntry.address(), that.ringEntry.address()) && ringEntry.port() == that.ringEntry.port() && Objects.equals(ringEntry.datacenter(), that.ringEntry.datacenter()); } /** - * Custom hashCode that compares the token, fully qualified domain name, the port, and the datacenter + * Custom hashCode that hashes the token, fully qualified domain name, the rack, the port, the datacenter + * and the clusterId * - * Note that node state and status are not part of the calculation. + * Note that node state, status and IP address are not part of the calculation. * * @return The hashcode of this instance based on the important fields */ @Override public int hashCode() { - return Objects.hash(clusterId, ringEntry.token(), ringEntry.fqdn(), ringEntry.rack(), ringEntry.port(), ringEntry.datacenter(), ringEntry.address()); + return Objects.hash(clusterId, ringEntry.token(), ringEntry.fqdn(), ringEntry.rack(), ringEntry.port(), ringEntry.datacenter()); } @Override diff --git a/cassandra-analytics-core/src/test/java/org/apache/cassandra/spark/bulkwriter/RingInstanceTest.java b/cassandra-analytics-core/src/test/java/org/apache/cassandra/spark/bulkwriter/RingInstanceTest.java index 97362ec3d..b73cc272a 100644 --- a/cassandra-analytics-core/src/test/java/org/apache/cassandra/spark/bulkwriter/RingInstanceTest.java +++ b/cassandra-analytics-core/src/test/java/org/apache/cassandra/spark/bulkwriter/RingInstanceTest.java @@ -99,6 +99,21 @@ public void testEqualsAndHashcodeIgnoreNonCriticalFields() assertThat(instance1.hashCode()).isEqualTo(instance2.hashCode()); } + @Test + public void testEqualsAndHashcodeIgnoreIpAddress() + { + // The same logical instance can come back with a different IP address, + // e.g. when Kubernetes replaces a pod; it should compare equal + RingInstance instance1 = new RingInstance(mockRingEntryBuilder() + .address("127.0.0.1") + .build()); + RingInstance instance2 = new RingInstance(mockRingEntryBuilder() + .address("127.0.0.2") + .build()); + assertThat(instance1).isEqualTo(instance2); + assertThat(instance1.hashCode()).isEqualTo(instance2.hashCode()); + } + @Test public void testEqualsAndHashcodeConsidersClusterId() { diff --git a/cassandra-analytics-core/src/test/java/org/apache/cassandra/spark/bulkwriter/TokenRangeMappingUtils.java b/cassandra-analytics-core/src/test/java/org/apache/cassandra/spark/bulkwriter/TokenRangeMappingUtils.java index 64ee81b74..4864144e3 100644 --- a/cassandra-analytics-core/src/test/java/org/apache/cassandra/spark/bulkwriter/TokenRangeMappingUtils.java +++ b/cassandra-analytics-core/src/test/java/org/apache/cassandra/spark/bulkwriter/TokenRangeMappingUtils.java @@ -85,6 +85,44 @@ public static TokenRangeMapping buildTokenRangeMappingWithFailures new HashSet<>(instances)); } + /** + * Builds the same topology as {@link #buildTokenRangeMapping(int, ImmutableMap, int)}, but every instance + * has a different IP address, simulating nodes coming back with new IPs, e.g. pod replacement in Kubernetes + */ + public static TokenRangeMapping buildTokenRangeMappingWithChangedIpAddresses(int initialToken, + ImmutableMap rfByDC, + int instancesPerDC) + { + List instances = getInstances(initialToken, rfByDC, instancesPerDC) + .stream() + .map(TokenRangeMappingUtils::withChangedIpAddress) + .collect(Collectors.toList()); + ReplicationFactor replicationFactor = getReplicationFactor(rfByDC); + Multimap> tokenRanges = setupTokenRangeMap(Partitioner.Murmur3Partitioner, replicationFactor, instances); + return new TokenRangeMapping<>(Partitioner.Murmur3Partitioner, + tokenRanges, + new HashSet<>(instances)); + } + + private static RingInstance withChangedIpAddress(RingInstance instance) + { + RingEntry entry = instance.ringEntry(); + RingEntry newEntry = new RingEntry.Builder() + .datacenter(entry.datacenter()) + .port(entry.port()) + .address(entry.address().replace("127.", "10.")) + .status(entry.status()) + .state(entry.state()) + .token(entry.token()) + .fqdn(entry.fqdn()) + .rack(entry.rack()) + .owns(entry.owns()) + .load(entry.load()) + .hostId(entry.hostId()) + .build(); + return new RingInstance(newEntry); + } + public static TokenRangeMapping buildTokenRangeMapping(int initialToken, ImmutableMap rfByDC, int instancesPerDC, diff --git a/cassandra-analytics-core/src/test/java/org/apache/cassandra/spark/bulkwriter/cloudstorage/CassandraTopologyMonitorTest.java b/cassandra-analytics-core/src/test/java/org/apache/cassandra/spark/bulkwriter/cloudstorage/CassandraTopologyMonitorTest.java index 0af57dcf7..4ed3b67bb 100644 --- a/cassandra-analytics-core/src/test/java/org/apache/cassandra/spark/bulkwriter/cloudstorage/CassandraTopologyMonitorTest.java +++ b/cassandra-analytics-core/src/test/java/org/apache/cassandra/spark/bulkwriter/cloudstorage/CassandraTopologyMonitorTest.java @@ -59,8 +59,31 @@ void testTopologyChanged() assertThat(noChange.get()).isFalse(); } + // In environments where nodes are not bound to fixed IP addresses, a node that goes down can + // come back with a new IP while remaining the same logical instance — same hostname, tokens and + // data. This is routine in Kubernetes: a rescheduled pod keeps its identity but gets a new IP. + // The write is still correct, so the monitor must not report a topology change and cancel the + // job when instances differ only by IP address. + @Test + void testIpAddressChangeIsNotTopologyChange() + { + ClusterInfo mockClusterInfo = mock(ClusterInfo.class); + when(mockClusterInfo.getTokenRangeMapping(false)) + .thenReturn(buildTopology(10)) + .thenReturn(buildTopologyWithChangedIpAddresses(10)); // same instances, new IP addresses + AtomicBoolean noChange = new AtomicBoolean(true); + CassandraTopologyMonitor monitor = new CassandraTopologyMonitor(mockClusterInfo, event -> noChange.set(false)); + monitor.checkTopologyOnDemand(); + assertThat(noChange.get()).isTrue(); + } + private TokenRangeMapping buildTopology(int instancesCount) { return TokenRangeMappingUtils.buildTokenRangeMapping(0, ImmutableMap.of("DC1", 3), instancesCount); } + + private TokenRangeMapping buildTopologyWithChangedIpAddresses(int instancesCount) + { + return TokenRangeMappingUtils.buildTokenRangeMappingWithChangedIpAddresses(0, ImmutableMap.of("DC1", 3), instancesCount); + } }