diff --git a/fluss-client/src/main/java/org/apache/fluss/client/table/scanner/log/LogFetchCollector.java b/fluss-client/src/main/java/org/apache/fluss/client/table/scanner/log/LogFetchCollector.java
index 05c43eca08..13b41e3e3c 100644
--- a/fluss-client/src/main/java/org/apache/fluss/client/table/scanner/log/LogFetchCollector.java
+++ b/fluss-client/src/main/java/org/apache/fluss/client/table/scanner/log/LogFetchCollector.java
@@ -77,14 +77,13 @@ public LogFetchCollector(
/**
* Return the fetched log records, empty the record buffer and update the consumed position.
*
- *
NOTE: returning empty records guarantees the consumed position are NOT updated.
- *
* @return The fetched records per partition
* @throws LogOffsetOutOfRangeException If there is OffsetOutOfRange error in fetchResponse and
* the defaultResetPolicy is NONE
*/
public ScanRecords collectFetch(final LogFetchBuffer logFetchBuffer) {
Map> fetched = new HashMap<>();
+ Map lastConsumedOffsets = new HashMap<>();
int recordsRemaining = maxPollRecords;
try {
@@ -120,8 +119,11 @@ public ScanRecords collectFetch(final LogFetchBuffer logFetchBuffer) {
logFetchBuffer.poll();
} else {
List records = fetchRecords(nextInLineFetch, recordsRemaining);
+ TableBucket tableBucket = nextInLineFetch.tableBucket;
+ // Always record the advanced next fetch offset for this bucket, even when
+ // the materialized record list is empty.
+ lastConsumedOffsets.put(tableBucket, nextInLineFetch.nextFetchOffset());
if (!records.isEmpty()) {
- TableBucket tableBucket = nextInLineFetch.tableBucket;
List currentRecords = fetched.get(tableBucket);
if (currentRecords == null) {
fetched.put(tableBucket, records);
@@ -147,7 +149,11 @@ public ScanRecords collectFetch(final LogFetchBuffer logFetchBuffer) {
}
}
- return new ScanRecords(fetched);
+ // Ensure every polled bucket appears in fetched so that buckets() reflects the polled set.
+ for (TableBucket polled : lastConsumedOffsets.keySet()) {
+ fetched.putIfAbsent(polled, Collections.emptyList());
+ }
+ return new ScanRecords(fetched, lastConsumedOffsets);
}
private List fetchRecords(CompletedFetch nextInLineFetch, int maxRecords) {
diff --git a/fluss-client/src/main/java/org/apache/fluss/client/table/scanner/log/LogScannerImpl.java b/fluss-client/src/main/java/org/apache/fluss/client/table/scanner/log/LogScannerImpl.java
index 9a2dbf0b4c..0ee14a78bd 100644
--- a/fluss-client/src/main/java/org/apache/fluss/client/table/scanner/log/LogScannerImpl.java
+++ b/fluss-client/src/main/java/org/apache/fluss/client/table/scanner/log/LogScannerImpl.java
@@ -142,7 +142,8 @@ public ScanRecords poll(Duration timeout) {
long startNanos = System.nanoTime();
do {
ScanRecords scanRecords = pollForFetches();
- if (scanRecords.isEmpty()) {
+ // Gate on buckets() rather than isEmpty() so progress-only polls reach the caller.
+ if (scanRecords.buckets().isEmpty()) {
try {
if (!logFetcher.awaitNotEmpty(startNanos + timeoutNanos)) {
// logFetcher waits for the timeout and no data in buffer,
@@ -249,7 +250,8 @@ public void wakeup() {
private ScanRecords pollForFetches() {
ScanRecords scanRecords = logFetcher.collectFetch();
- if (!scanRecords.isEmpty()) {
+ // Check buckets() (includes progress-only buckets).
+ if (!scanRecords.buckets().isEmpty()) {
return scanRecords;
}
diff --git a/fluss-client/src/main/java/org/apache/fluss/client/table/scanner/log/ScanRecords.java b/fluss-client/src/main/java/org/apache/fluss/client/table/scanner/log/ScanRecords.java
index 9d58c22b49..a46d10d932 100644
--- a/fluss-client/src/main/java/org/apache/fluss/client/table/scanner/log/ScanRecords.java
+++ b/fluss-client/src/main/java/org/apache/fluss/client/table/scanner/log/ScanRecords.java
@@ -22,6 +22,8 @@
import org.apache.fluss.metadata.TableBucket;
import org.apache.fluss.utils.AbstractIterator;
+import javax.annotation.Nullable;
+
import java.util.Collections;
import java.util.Iterator;
import java.util.List;
@@ -41,8 +43,18 @@ public class ScanRecords implements Iterable {
private final Map> records;
+ /** The exclusive upper bound of consumed offsets per polled bucket in this round. */
+ private final Map lastConsumedOffsets;
+
public ScanRecords(Map> records) {
+ this(records, Collections.emptyMap());
+ }
+
+ public ScanRecords(
+ Map> records,
+ Map lastConsumedOffsets) {
this.records = records;
+ this.lastConsumedOffsets = lastConsumedOffsets;
}
/**
@@ -59,15 +71,25 @@ public List records(TableBucket scanBucket) {
}
/**
- * Get the bucket ids which have records contained in this record set.
- *
- * @return the set of partitions with data in this record set (maybe empty if no data was
- * returned)
+ * Get the bucket ids that were polled in this round, including buckets whose record list is
+ * empty but whose log offset still advanced.
*/
public Set buckets() {
return Collections.unmodifiableSet(records.keySet());
}
+ /**
+ * Get the exclusive upper bound of offsets consumed for the given bucket in this poll round.
+ *
+ * @param bucket the bucket to query
+ * @return the exclusive upper bound offset, or {@code null} if the bucket was not polled in
+ * this round
+ */
+ @Nullable
+ public Long lastConsumedOffset(TableBucket bucket) {
+ return lastConsumedOffsets.get(bucket);
+ }
+
/** The number of records for all buckets. */
public int count() {
int count = 0;
@@ -77,8 +99,9 @@ public int count() {
return count;
}
+ /** Returns {@code true} if this {@code ScanRecords} contains no materialized records. */
public boolean isEmpty() {
- return records.isEmpty();
+ return count() == 0;
}
@Override
diff --git a/fluss-client/src/test/java/org/apache/fluss/client/table/scanner/log/LogFetchCollectorTest.java b/fluss-client/src/test/java/org/apache/fluss/client/table/scanner/log/LogFetchCollectorTest.java
index e769ebf4ee..c7cce75c2a 100644
--- a/fluss-client/src/test/java/org/apache/fluss/client/table/scanner/log/LogFetchCollectorTest.java
+++ b/fluss-client/src/test/java/org/apache/fluss/client/table/scanner/log/LogFetchCollectorTest.java
@@ -258,6 +258,9 @@ void testFilteredEmptyResponseAdvancesOffset() {
assertThat(scanRecords.records(tb)).isEmpty();
assertThat(logScannerStatus.getBucketOffset(tb)).isEqualTo(20L);
assertThat(completedFetch.isConsumed()).isTrue();
+ // Empty record list, but bucket exposed via buckets() with an advanced lastConsumedOffset.
+ assertThat(scanRecords.buckets()).contains(tb);
+ assertThat(scanRecords.lastConsumedOffset(tb)).isEqualTo(20L);
}
private DefaultCompletedFetch makeCompletedFetch(
diff --git a/fluss-client/src/test/java/org/apache/fluss/client/table/scanner/log/LogFetcherITCase.java b/fluss-client/src/test/java/org/apache/fluss/client/table/scanner/log/LogFetcherITCase.java
index 50addfcfe0..147be7922c 100644
--- a/fluss-client/src/test/java/org/apache/fluss/client/table/scanner/log/LogFetcherITCase.java
+++ b/fluss-client/src/test/java/org/apache/fluss/client/table/scanner/log/LogFetcherITCase.java
@@ -149,7 +149,10 @@ void testFetchWithSchemaChange() throws Exception {
assertThat(logFetcher.getCompletedFetchesSize()).isEqualTo(2);
});
ScanRecords records = logFetcher.collectFetch();
- assertThat(records.buckets().size()).isEqualTo(1);
+ // Both polled buckets are exposed; tb1 was polled but produced no records.
+ TableBucket tb1 = new TableBucket(tableId, bucketId1);
+ assertThat(records.buckets()).containsExactlyInAnyOrder(tb0, tb1);
+ assertThat(records.records(tb1)).isEmpty();
List scanRecords = records.records(tb0);
assertThat(scanRecords.stream().map(ScanRecord::getRow).collect(Collectors.toList()))
.isEqualTo(expectedRows);
@@ -195,7 +198,8 @@ void testFetchWithSchemaChange() throws Exception {
assertThat(newSchemaLogFetcher.getCompletedFetchesSize()).isEqualTo(2);
});
records = newSchemaLogFetcher.collectFetch();
- assertThat(records.buckets().size()).isEqualTo(1);
+ assertThat(records.buckets()).containsExactlyInAnyOrder(tb0, tb1);
+ assertThat(records.records(tb1)).isEmpty();
assertThat(records.records(tb0)).hasSize(20);
scanRecords = records.records(tb0);
assertThat(scanRecords.stream().map(ScanRecord::getRow).collect(Collectors.toList()))
diff --git a/fluss-client/src/test/java/org/apache/fluss/client/table/scanner/log/ScanRecordsTest.java b/fluss-client/src/test/java/org/apache/fluss/client/table/scanner/log/ScanRecordsTest.java
index db4a326676..a71d3467b0 100644
--- a/fluss-client/src/test/java/org/apache/fluss/client/table/scanner/log/ScanRecordsTest.java
+++ b/fluss-client/src/test/java/org/apache/fluss/client/table/scanner/log/ScanRecordsTest.java
@@ -25,6 +25,8 @@
import java.util.ArrayList;
import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
import java.util.Iterator;
import java.util.LinkedHashMap;
import java.util.List;
@@ -57,4 +59,83 @@ void iterator() {
}
assertThat(c).isEqualTo(4);
}
+
+ /** Verifies the legacy single-arg constructor leaves {@code lastConsumedOffset} undefined. */
+ @Test
+ void legacyConstructorHasNoLastConsumedOffset() {
+ TableBucket tb = new TableBucket(0L, 0);
+ Map> records = new HashMap<>();
+ records.put(tb, Collections.emptyList());
+
+ ScanRecords scanRecords = new ScanRecords(records);
+
+ assertThat(scanRecords.buckets()).containsExactly(tb);
+ assertThat(scanRecords.lastConsumedOffset(tb)).isNull();
+ }
+
+ /**
+ * Verifies buckets with only advanced offsets are still exposed via {@link
+ * ScanRecords#buckets()} and carry their {@code lastConsumedOffset}.
+ */
+ @Test
+ void emptyBucketIsExposedViaBuckets() {
+ TableBucket bucketWithRecords = new TableBucket(0L, 0);
+ TableBucket emptyBucket = new TableBucket(0L, 1);
+
+ Map> records = new HashMap<>();
+ records.put(
+ bucketWithRecords,
+ Collections.singletonList(
+ new ScanRecord(5L, 1000L, ChangeType.INSERT, row(1, "a"))));
+ // Empty-progress buckets also appear in records (as emptyList).
+ records.put(emptyBucket, Collections.emptyList());
+
+ Map lastConsumedOffsets = new HashMap<>();
+ lastConsumedOffsets.put(bucketWithRecords, 6L);
+ lastConsumedOffsets.put(emptyBucket, 10L);
+
+ ScanRecords scanRecords = new ScanRecords(records, lastConsumedOffsets);
+
+ assertThat(scanRecords.buckets()).containsExactlyInAnyOrder(bucketWithRecords, emptyBucket);
+ assertThat(scanRecords.records(emptyBucket)).isEmpty();
+ assertThat(scanRecords.lastConsumedOffset(bucketWithRecords)).isEqualTo(6L);
+ assertThat(scanRecords.lastConsumedOffset(emptyBucket)).isEqualTo(10L);
+ assertThat(scanRecords.lastConsumedOffset(new TableBucket(0L, 99))).isNull();
+ }
+
+ /**
+ * Verifies {@link ScanRecords#isEmpty()} reflects only materialized records, regardless of
+ * advanced offsets.
+ */
+ @Test
+ void isEmptyReflectsOnlyMaterializedRecords() {
+ TableBucket tb = new TableBucket(0L, 0);
+
+ // No records and no progress: both isEmpty() and buckets() must be empty.
+ ScanRecords trulyEmpty = ScanRecords.EMPTY;
+ assertThat(trulyEmpty.isEmpty()).isTrue();
+ assertThat(trulyEmpty.buckets()).isEmpty();
+
+ // Progress-only round: isEmpty() must stay true (no materialized records),
+ // while buckets() must still expose the advanced bucket for callers to detect.
+ Map> emptyRecords = new HashMap<>();
+ emptyRecords.put(tb, Collections.emptyList());
+ Map progressOnly = new HashMap<>();
+ progressOnly.put(tb, 42L);
+ ScanRecords progressOnlyRecords = new ScanRecords(emptyRecords, progressOnly);
+ assertThat(progressOnlyRecords.isEmpty()).isTrue();
+ assertThat(progressOnlyRecords.buckets()).containsExactly(tb);
+ assertThat(progressOnlyRecords.lastConsumedOffset(tb)).isEqualTo(42L);
+
+ // Materialized records present: isEmpty() flips to false; legacy single-arg ctor still
+ // works.
+ Map> records = new HashMap<>();
+ records.put(
+ tb,
+ Collections.singletonList(
+ new ScanRecord(0L, 1000L, ChangeType.INSERT, row(1, "a"))));
+ ScanRecords withRecords = new ScanRecords(records);
+ assertThat(withRecords.isEmpty()).isFalse();
+ assertThat(withRecords.buckets()).containsExactly(tb);
+ }
}
diff --git a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/tiering/source/TieringSplitReader.java b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/tiering/source/TieringSplitReader.java
index d59787e15d..73f4ead211 100644
--- a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/tiering/source/TieringSplitReader.java
+++ b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/tiering/source/TieringSplitReader.java
@@ -355,63 +355,89 @@ private RecordsWithSplitIds> forLogRecords(
Map> writeResults = new HashMap<>();
Map finishedSplitIds = new HashMap<>();
+ // Iterate every polled bucket, including those that only advanced their offset.
for (TableBucket bucket : scanRecords.buckets()) {
- List bucketScanRecords = scanRecords.records(bucket);
- if (bucketScanRecords.isEmpty()) {
- continue;
- }
// no any stopping offset, just skip handle the records for the bucket
Long stoppingOffset = currentTableStoppingOffsets.get(bucket);
if (stoppingOffset == null) {
continue;
}
+
+ List bucketScanRecords = scanRecords.records(bucket);
+ ScanRecord lastRecord = null;
LakeWriter lakeWriter = null;
- for (ScanRecord record : bucketScanRecords) {
- // if record is less than stopping offset
- if (record.logOffset() < stoppingOffset) {
- if (lakeWriter == null) {
- lakeWriter =
- getOrCreateLakeWriter(
- bucket,
- currentTableSplitsByBucket.get(bucket).getPartitionName());
- }
- lakeWriter.write(record);
- if (record.getSizeInBytes() > 0) {
- tieringMetrics.recordBytesRead(record.getSizeInBytes());
+
+ if (!bucketScanRecords.isEmpty()) {
+ for (ScanRecord record : bucketScanRecords) {
+ // only tier records within the split range [start, stoppingOffset).
+ if (record.logOffset() < stoppingOffset) {
+ if (lakeWriter == null) {
+ lakeWriter =
+ getOrCreateLakeWriter(
+ bucket,
+ currentTableSplitsByBucket
+ .get(bucket)
+ .getPartitionName());
+ }
+ lakeWriter.write(record);
+ if (record.getSizeInBytes() > 0) {
+ tieringMetrics.recordBytesRead(record.getSizeInBytes());
+ }
}
}
+ lastRecord = bucketScanRecords.get(bucketScanRecords.size() - 1);
+ }
+
+ // Prefer the scanner-reported lastConsumedOffset; fall back to the last record offset.
+ Long lastConsumedOffset = scanRecords.lastConsumedOffset(bucket);
+ boolean reachedEnd =
+ lastConsumedOffset != null
+ ? lastConsumedOffset >= stoppingOffset
+ : lastRecord != null && lastRecord.logOffset() >= stoppingOffset - 1;
+
+ if (!reachedEnd && lastRecord == null) {
+ continue;
+ }
+
+ // When reachedEnd, the tiered offset is stoppingOffset - 1 (exclusive bound).
+ // Timestamp prefers the current record, then the previously tracked value.
+ long tieredOffset = reachedEnd ? stoppingOffset - 1 : lastRecord.logOffset();
+ long tieredTimestamp;
+ if (lastRecord != null) {
+ tieredTimestamp = lastRecord.timestamp();
+ } else {
+ LogOffsetAndTimestamp latest = currentTableTieredOffsetAndTimestamp.get(bucket);
+ tieredTimestamp = latest != null ? latest.timestamp : UNKNOWN_BUCKET_TIMESTAMP;
}
- ScanRecord lastRecord = bucketScanRecords.get(bucketScanRecords.size() - 1);
currentTableTieredOffsetAndTimestamp.put(
- bucket,
- new LogOffsetAndTimestamp(lastRecord.logOffset(), lastRecord.timestamp()));
- // has arrived into the end of the split,
- if (lastRecord.logOffset() >= stoppingOffset - 1) {
- currentTableStoppingOffsets.remove(bucket);
- if (bucket.getPartitionId() != null) {
- currentLogScanner.unsubscribe(bucket.getPartitionId(), bucket.getBucket());
- } else {
- // todo: should unsubscribe the log split if unsubscribe bucket for
- // un-partitioned table is supported
- }
- TieringSplit currentTieringSplit = currentTableSplitsByBucket.remove(bucket);
- String currentSplitId = currentTieringSplit.splitId();
- // put write result of the bucket
- writeResults.put(
- bucket,
- completeLakeWriter(
- bucket,
- currentTieringSplit.getPartitionName(),
- stoppingOffset,
- lastRecord.timestamp()));
- // put split of the bucket
- finishedSplitIds.put(bucket, currentSplitId);
- LOG.info(
- "Finish tier bucket {} for table {}, split: {}.",
- bucket,
- currentTablePath,
- currentSplitId);
+ bucket, new LogOffsetAndTimestamp(tieredOffset, tieredTimestamp));
+
+ if (!reachedEnd) {
+ continue;
}
+
+ currentTableStoppingOffsets.remove(bucket);
+ if (bucket.getPartitionId() != null) {
+ currentLogScanner.unsubscribe(bucket.getPartitionId(), bucket.getBucket());
+ } else {
+ // todo: should unsubscribe the log split if unsubscribe bucket for
+ // un-partitioned table is supported
+ }
+ TieringSplit currentTieringSplit = currentTableSplitsByBucket.remove(bucket);
+ String currentSplitId = currentTieringSplit.splitId();
+ writeResults.put(
+ bucket,
+ completeLakeWriter(
+ bucket,
+ currentTieringSplit.getPartitionName(),
+ stoppingOffset,
+ tieredTimestamp));
+ finishedSplitIds.put(bucket, currentSplitId);
+ LOG.info(
+ "Finish tier bucket {} for table {}, split: {}.",
+ bucket,
+ currentTablePath,
+ currentSplitId);
}
if (!finishedSplitIds.isEmpty()) {
diff --git a/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/tiering/source/TieringSplitReaderTest.java b/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/tiering/source/TieringSplitReaderTest.java
index 171f521e00..9aad5d1c3d 100644
--- a/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/tiering/source/TieringSplitReaderTest.java
+++ b/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/tiering/source/TieringSplitReaderTest.java
@@ -24,6 +24,7 @@
import org.apache.fluss.client.table.writer.TableWriter;
import org.apache.fluss.client.table.writer.UpsertWriter;
import org.apache.fluss.client.write.HashBucketAssigner;
+import org.apache.fluss.config.ConfigOptions;
import org.apache.fluss.flink.tiering.TestingLakeTieringFactory;
import org.apache.fluss.flink.tiering.TestingWriteResult;
import org.apache.fluss.flink.tiering.source.metrics.TieringMetrics;
@@ -33,12 +34,14 @@
import org.apache.fluss.flink.utils.FlinkTestBase;
import org.apache.fluss.lake.writer.LakeWriter;
import org.apache.fluss.lake.writer.WriterInitContext;
+import org.apache.fluss.metadata.MergeEngineType;
import org.apache.fluss.metadata.TableBucket;
import org.apache.fluss.metadata.TableDescriptor;
import org.apache.fluss.metadata.TablePath;
import org.apache.fluss.record.LogRecord;
import org.apache.fluss.row.InternalRow;
import org.apache.fluss.row.encode.CompactedKeyEncoder;
+import org.apache.fluss.server.replica.Replica;
import org.apache.flink.api.connector.source.SourceSplit;
import org.apache.flink.connector.base.source.reader.RecordsWithSplitIds;
@@ -335,6 +338,93 @@ connection, new ThrowOnEmptyCompleteLakeTieringFactory())) {
}
}
+ /**
+ * Verifies that the tiering service finishes under {@code first_row} merge engine even when
+ * duplicate upserts produce empty WAL batches.
+ */
+ @Test
+ void testTieringFirstRowMergeEngineFinishes() throws Exception {
+ TablePath tablePath = TablePath.of("fluss", "tiering_first_row_finish");
+ TableDescriptor descriptor =
+ TableDescriptor.builder()
+ .schema(DEFAULT_PK_TABLE_SCHEMA)
+ .distributedBy(DEFAULT_BUCKET_NUM, "id")
+ .property(ConfigOptions.TABLE_MERGE_ENGINE, MergeEngineType.FIRST_ROW)
+ .build();
+ long tableId = createTable(tablePath, descriptor);
+
+ // Duplicate upserts under FIRST_ROW: only the first per id yields a CDC
+ // record, the rest become empty WAL batches that still advance the offset.
+ int distinctKeys = 5;
+ int duplicatesPerKey = 10;
+ try (Table table = conn.getTable(tablePath)) {
+ for (int round = 0; round < duplicatesPerKey; round++) {
+ UpsertWriter writer = table.newUpsert().createWriter();
+ for (int id = 0; id < distinctKeys; id++) {
+ writer.upsert(row(id, "v" + round));
+ }
+ writer.flush();
+ }
+ }
+
+ // Build log splits whose stoppingOffset equals the leader's current logEndOffset.
+ List logSplits = new ArrayList<>();
+ Set splitIds = new HashSet<>();
+ long totalLogEndOffset = 0L;
+ for (int bucket = 0; bucket < DEFAULT_BUCKET_NUM; bucket++) {
+ TableBucket tb = new TableBucket(tableId, bucket);
+ Replica leader = FLUSS_CLUSTER_EXTENSION.waitAndGetLeaderReplica(tb);
+ long stoppingOffset = leader.getLogTablet().localLogEndOffset();
+ totalLogEndOffset += stoppingOffset;
+ if (stoppingOffset <= 0) {
+ continue;
+ }
+ TieringLogSplit split =
+ createLogSplit(tablePath, tableId, bucket, EARLIEST_OFFSET, stoppingOffset);
+ logSplits.add(split);
+ splitIds.add(split.splitId());
+ }
+ assertThat(logSplits).isNotEmpty();
+ // Pre-condition: total log offsets must exceed distinct-key count, otherwise
+ // no empty batch was produced.
+ assertThat(totalLogEndOffset)
+ .as(
+ "Expected logEndOffset (%d) to exceed distinctKeys (%d) so that "
+ + "empty batches are produced under FIRST_ROW",
+ totalLogEndOffset, distinctKeys)
+ .isGreaterThan(distinctKeys);
+
+ try (Connection connection =
+ ConnectionFactory.createConnection(
+ FLUSS_CLUSTER_EXTENSION.getClientConfig());
+ TieringSplitReader tieringSplitReader =
+ createTieringReader(connection)) {
+ tieringSplitReader.handleSplitsChanges(new SplitsAddition<>(logSplits));
+
+ // With the fix every split must finish within a few fetch rounds.
+ Set finished = new HashSet<>();
+ int maxRounds = 10;
+ for (int i = 0; i < maxRounds && !finished.containsAll(splitIds); i++) {
+ RecordsWithSplitIds> fetchResult =
+ tieringSplitReader.fetch();
+ finished.addAll(fetchResult.finishedSplits());
+ // drain the iterator so that the reader advances internal state
+ while (fetchResult.nextSplit() != null) {
+ while (fetchResult.nextRecordFromSplit() != null) {
+ // consume
+ }
+ }
+ }
+
+ assertThat(finished)
+ .as(
+ "All tiering splits must finish under FIRST_ROW merge engine "
+ + "with duplicate keys. Finished: %s, expected: %s",
+ finished, splitIds)
+ .containsAll(splitIds);
+ }
+ }
+
private TieringSplitReader createTieringReader(Connection connection) {
final TieringMetrics tieringMetrics =
new TieringMetrics(