From d3984d12f18fca638d5b5391c703d2afd02b9ac3 Mon Sep 17 00:00:00 2001 From: duankaixuan <1417048384@qq.com> Date: Thu, 30 Apr 2026 21:51:46 +0800 Subject: [PATCH 1/5] [client] Fix tiering hang on first_row merge engine empty batches --- .../table/scanner/log/LogFetchCollector.java | 12 +- .../client/table/scanner/log/ScanRecords.java | 50 ++++++++ .../scanner/log/LogFetchCollectorTest.java | 6 + .../table/scanner/log/ScanRecordsTest.java | 48 +++++++ .../tiering/source/TieringSplitReader.java | 118 +++++++++++------- .../source/TieringSplitReaderTest.java | 107 ++++++++++++++++ 6 files changed, 292 insertions(+), 49 deletions(-) diff --git a/fluss-client/src/main/java/org/apache/fluss/client/table/scanner/log/LogFetchCollector.java b/fluss-client/src/main/java/org/apache/fluss/client/table/scanner/log/LogFetchCollector.java index 05c43eca08..5a3a9b101b 100644 --- a/fluss-client/src/main/java/org/apache/fluss/client/table/scanner/log/LogFetchCollector.java +++ b/fluss-client/src/main/java/org/apache/fluss/client/table/scanner/log/LogFetchCollector.java @@ -85,6 +85,11 @@ public LogFetchCollector( */ public ScanRecords collectFetch(final LogFetchBuffer logFetchBuffer) { Map> fetched = new HashMap<>(); + // Tracks the next fetch offset for every bucket polled in this round, even when the + // returned record list is empty (e.g. empty WAL batches produced by the FIRST_ROW + // merge engine, see issue #2371). This lets callers (such as the tiering service) + // detect that the log offset has advanced past empty batches. + Map nextOffsets = new HashMap<>(); int recordsRemaining = maxPollRecords; try { @@ -120,8 +125,11 @@ public ScanRecords collectFetch(final LogFetchBuffer logFetchBuffer) { logFetchBuffer.poll(); } else { List records = fetchRecords(nextInLineFetch, recordsRemaining); + TableBucket tableBucket = nextInLineFetch.tableBucket; + // Always record the advanced next fetch offset for this bucket, even when + // the materialized record list is empty. + nextOffsets.put(tableBucket, nextInLineFetch.nextFetchOffset()); if (!records.isEmpty()) { - TableBucket tableBucket = nextInLineFetch.tableBucket; List currentRecords = fetched.get(tableBucket); if (currentRecords == null) { fetched.put(tableBucket, records); @@ -147,7 +155,7 @@ public ScanRecords collectFetch(final LogFetchBuffer logFetchBuffer) { } } - return new ScanRecords(fetched); + return new ScanRecords(fetched, nextOffsets); } private List fetchRecords(CompletedFetch nextInLineFetch, int maxRecords) { diff --git a/fluss-client/src/main/java/org/apache/fluss/client/table/scanner/log/ScanRecords.java b/fluss-client/src/main/java/org/apache/fluss/client/table/scanner/log/ScanRecords.java index 9d58c22b49..f8f89d8046 100644 --- a/fluss-client/src/main/java/org/apache/fluss/client/table/scanner/log/ScanRecords.java +++ b/fluss-client/src/main/java/org/apache/fluss/client/table/scanner/log/ScanRecords.java @@ -22,7 +22,10 @@ import org.apache.fluss.metadata.TableBucket; import org.apache.fluss.utils.AbstractIterator; +import javax.annotation.Nullable; + import java.util.Collections; +import java.util.HashSet; import java.util.Iterator; import java.util.List; import java.util.Map; @@ -41,8 +44,22 @@ public class ScanRecords implements Iterable { private final Map> records; + /** + * The next log offset (exclusive upper bound of consumed records) for each bucket polled in + * this round. Includes buckets that produced zero records but whose log offset advanced (e.g. + * empty WAL batches generated by the FIRST_ROW merge engine when the upserted key already + * exists). See FLUSS-2371. + */ + private final Map nextLogOffsets; + public ScanRecords(Map> records) { + this(records, Collections.emptyMap()); + } + + public ScanRecords( + Map> records, Map nextLogOffsets) { this.records = records; + this.nextLogOffsets = nextLogOffsets; } /** @@ -68,6 +85,39 @@ public Set buckets() { return Collections.unmodifiableSet(records.keySet()); } + /** + * Get all bucket ids that were polled in this round, including buckets whose record list is + * empty but whose log offset still advanced (e.g. empty WAL batches produced by the FIRST_ROW + * merge engine). Batch readers that need to detect end-of-range based on offset progress (such + * as the tiering service) should iterate over this set instead of {@link #buckets()}. + * + * @return the union of buckets exposed via {@link #buckets()} and buckets that only have an + * advanced {@code nextLogOffset}. + */ + public Set polledBuckets() { + if (nextLogOffsets.isEmpty()) { + return buckets(); + } + Set all = new HashSet<>(records.keySet()); + all.addAll(nextLogOffsets.keySet()); + return Collections.unmodifiableSet(all); + } + + /** + * The next log offset that the scanner will fetch from for the given bucket in subsequent + * polls. This offset is always advanced past empty WAL batches even when no {@link ScanRecord} + * is materialized for the bucket, which allows callers (e.g. the tiering service) to detect + * end-of-range correctly even under the FIRST_ROW merge engine. + * + * @param bucket the bucket to query + * @return the exclusive upper bound offset consumed in this round, or {@code null} if the + * bucket was not polled in this round. + */ + @Nullable + public Long nextLogOffset(TableBucket bucket) { + return nextLogOffsets.get(bucket); + } + /** The number of records for all buckets. */ public int count() { int count = 0; diff --git a/fluss-client/src/test/java/org/apache/fluss/client/table/scanner/log/LogFetchCollectorTest.java b/fluss-client/src/test/java/org/apache/fluss/client/table/scanner/log/LogFetchCollectorTest.java index e769ebf4ee..a9f0d55b65 100644 --- a/fluss-client/src/test/java/org/apache/fluss/client/table/scanner/log/LogFetchCollectorTest.java +++ b/fluss-client/src/test/java/org/apache/fluss/client/table/scanner/log/LogFetchCollectorTest.java @@ -258,6 +258,12 @@ void testFilteredEmptyResponseAdvancesOffset() { assertThat(scanRecords.records(tb)).isEmpty(); assertThat(logScannerStatus.getBucketOffset(tb)).isEqualTo(20L); assertThat(completedFetch.isConsumed()).isTrue(); + // Although the materialized record list is empty, the bucket must be exposed + // through polledBuckets() with an advanced nextLogOffset so that batch readers + // (e.g. the tiering service) can detect end-of-range. See issue #2371. + assertThat(scanRecords.buckets()).doesNotContain(tb); + assertThat(scanRecords.polledBuckets()).contains(tb); + assertThat(scanRecords.nextLogOffset(tb)).isEqualTo(20L); } private DefaultCompletedFetch makeCompletedFetch( diff --git a/fluss-client/src/test/java/org/apache/fluss/client/table/scanner/log/ScanRecordsTest.java b/fluss-client/src/test/java/org/apache/fluss/client/table/scanner/log/ScanRecordsTest.java index db4a326676..0ef89193ea 100644 --- a/fluss-client/src/test/java/org/apache/fluss/client/table/scanner/log/ScanRecordsTest.java +++ b/fluss-client/src/test/java/org/apache/fluss/client/table/scanner/log/ScanRecordsTest.java @@ -25,6 +25,8 @@ import java.util.ArrayList; import java.util.Arrays; +import java.util.Collections; +import java.util.HashMap; import java.util.Iterator; import java.util.LinkedHashMap; import java.util.List; @@ -57,4 +59,50 @@ void iterator() { } assertThat(c).isEqualTo(4); } + + /** Verifies the legacy single-arg constructor leaves {@code nextLogOffset} undefined. */ + @Test + void legacyConstructorHasNoNextLogOffset() { + TableBucket tb = new TableBucket(0L, 0); + Map> records = new HashMap<>(); + records.put(tb, Collections.emptyList()); + + ScanRecords scanRecords = new ScanRecords(records); + + assertThat(scanRecords.buckets()).containsExactly(tb); + assertThat(scanRecords.polledBuckets()).containsExactly(tb); + assertThat(scanRecords.nextLogOffset(tb)).isNull(); + } + + /** + * Verifies that buckets which only produced empty WAL batches (no {@link ScanRecord}) are + * exposed via {@link ScanRecords#polledBuckets()} and {@link ScanRecords#nextLogOffset}, while + * the legacy {@link ScanRecords#buckets()} keeps its old semantics. This is the API surface + * relied on by the tiering service to fix issue #2371. + */ + @Test + void emptyBucketIsExposedViaPolledBuckets() { + TableBucket bucketWithRecords = new TableBucket(0L, 0); + TableBucket emptyBucket = new TableBucket(0L, 1); + + Map> records = new HashMap<>(); + records.put( + bucketWithRecords, + Collections.singletonList( + new ScanRecord(5L, 1000L, ChangeType.INSERT, row(1, "a")))); + + Map nextOffsets = new HashMap<>(); + nextOffsets.put(bucketWithRecords, 6L); + nextOffsets.put(emptyBucket, 10L); + + ScanRecords scanRecords = new ScanRecords(records, nextOffsets); + + assertThat(scanRecords.buckets()).containsExactly(bucketWithRecords); + assertThat(scanRecords.polledBuckets()) + .containsExactlyInAnyOrder(bucketWithRecords, emptyBucket); + assertThat(scanRecords.records(emptyBucket)).isEmpty(); + assertThat(scanRecords.nextLogOffset(bucketWithRecords)).isEqualTo(6L); + assertThat(scanRecords.nextLogOffset(emptyBucket)).isEqualTo(10L); + assertThat(scanRecords.nextLogOffset(new TableBucket(0L, 99))).isNull(); + } } diff --git a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/tiering/source/TieringSplitReader.java b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/tiering/source/TieringSplitReader.java index d59787e15d..429eea5480 100644 --- a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/tiering/source/TieringSplitReader.java +++ b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/tiering/source/TieringSplitReader.java @@ -355,63 +355,87 @@ private RecordsWithSplitIds> forLogRecords( Map> writeResults = new HashMap<>(); Map finishedSplitIds = new HashMap<>(); - for (TableBucket bucket : scanRecords.buckets()) { - List bucketScanRecords = scanRecords.records(bucket); - if (bucketScanRecords.isEmpty()) { - continue; - } + // Iterate polledBuckets() (instead of buckets()) so that buckets which only produced + // empty WAL batches (e.g. duplicate upserts under the FIRST_ROW merge engine) are + // also visited. Otherwise the tiering service would loop forever on such buckets + // because their log offset has advanced but no ScanRecord materialized. See + // https://github.com/apache/fluss/issues/2371. + for (TableBucket bucket : scanRecords.polledBuckets()) { // no any stopping offset, just skip handle the records for the bucket Long stoppingOffset = currentTableStoppingOffsets.get(bucket); if (stoppingOffset == null) { continue; } + + List bucketScanRecords = scanRecords.records(bucket); + Long nextLogOffset = scanRecords.nextLogOffset(bucket); LakeWriter lakeWriter = null; - for (ScanRecord record : bucketScanRecords) { - // if record is less than stopping offset - if (record.logOffset() < stoppingOffset) { - if (lakeWriter == null) { - lakeWriter = - getOrCreateLakeWriter( - bucket, - currentTableSplitsByBucket.get(bucket).getPartitionName()); - } - lakeWriter.write(record); - if (record.getSizeInBytes() > 0) { - tieringMetrics.recordBytesRead(record.getSizeInBytes()); + + if (!bucketScanRecords.isEmpty()) { + for (ScanRecord record : bucketScanRecords) { + // if record is less than stopping offset + if (record.logOffset() < stoppingOffset) { + if (lakeWriter == null) { + lakeWriter = + getOrCreateLakeWriter( + bucket, + currentTableSplitsByBucket.get(bucket).getPartitionName()); + } + lakeWriter.write(record); + if (record.getSizeInBytes() > 0) { + tieringMetrics.recordBytesRead(record.getSizeInBytes()); + } } } - } - ScanRecord lastRecord = bucketScanRecords.get(bucketScanRecords.size() - 1); - currentTableTieredOffsetAndTimestamp.put( - bucket, - new LogOffsetAndTimestamp(lastRecord.logOffset(), lastRecord.timestamp())); - // has arrived into the end of the split, - if (lastRecord.logOffset() >= stoppingOffset - 1) { - currentTableStoppingOffsets.remove(bucket); - if (bucket.getPartitionId() != null) { - currentLogScanner.unsubscribe(bucket.getPartitionId(), bucket.getBucket()); - } else { - // todo: should unsubscribe the log split if unsubscribe bucket for - // un-partitioned table is supported - } - TieringSplit currentTieringSplit = currentTableSplitsByBucket.remove(bucket); - String currentSplitId = currentTieringSplit.splitId(); - // put write result of the bucket - writeResults.put( + ScanRecord lastRecord = bucketScanRecords.get(bucketScanRecords.size() - 1); + currentTableTieredOffsetAndTimestamp.put( bucket, - completeLakeWriter( - bucket, - currentTieringSplit.getPartitionName(), - stoppingOffset, - lastRecord.timestamp())); - // put split of the bucket - finishedSplitIds.put(bucket, currentSplitId); - LOG.info( - "Finish tier bucket {} for table {}, split: {}.", - bucket, - currentTablePath, - currentSplitId); + new LogOffsetAndTimestamp(lastRecord.logOffset(), lastRecord.timestamp())); + } + + // Decide whether the split has reached its end. Prefer the scanner-reported + // nextLogOffset because it advances even when the bucket only produced empty + // batches; fall back to the last record offset for backwards compatibility. + boolean reachedEnd; + if (nextLogOffset != null) { + reachedEnd = nextLogOffset >= stoppingOffset; + } else if (!bucketScanRecords.isEmpty()) { + ScanRecord lastRecord = bucketScanRecords.get(bucketScanRecords.size() - 1); + reachedEnd = lastRecord.logOffset() >= stoppingOffset - 1; + } else { + reachedEnd = false; + } + if (!reachedEnd) { + continue; } + + currentTableStoppingOffsets.remove(bucket); + if (bucket.getPartitionId() != null) { + currentLogScanner.unsubscribe(bucket.getPartitionId(), bucket.getBucket()); + } else { + // todo: should unsubscribe the log split if unsubscribe bucket for + // un-partitioned table is supported + } + TieringSplit currentTieringSplit = currentTableSplitsByBucket.remove(bucket); + String currentSplitId = currentTieringSplit.splitId(); + // Use the latest known record timestamp for the bucket; fall back to + // UNKNOWN_BUCKET_TIMESTAMP when the split only contained empty batches and we + // never observed a real record. + LogOffsetAndTimestamp latest = currentTableTieredOffsetAndTimestamp.get(bucket); + long finishTimestamp = latest != null ? latest.timestamp : UNKNOWN_BUCKET_TIMESTAMP; + writeResults.put( + bucket, + completeLakeWriter( + bucket, + currentTieringSplit.getPartitionName(), + stoppingOffset, + finishTimestamp)); + finishedSplitIds.put(bucket, currentSplitId); + LOG.info( + "Finish tier bucket {} for table {}, split: {}.", + bucket, + currentTablePath, + currentSplitId); } if (!finishedSplitIds.isEmpty()) { diff --git a/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/tiering/source/TieringSplitReaderTest.java b/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/tiering/source/TieringSplitReaderTest.java index 171f521e00..2cf0a9dfee 100644 --- a/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/tiering/source/TieringSplitReaderTest.java +++ b/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/tiering/source/TieringSplitReaderTest.java @@ -24,6 +24,7 @@ import org.apache.fluss.client.table.writer.TableWriter; import org.apache.fluss.client.table.writer.UpsertWriter; import org.apache.fluss.client.write.HashBucketAssigner; +import org.apache.fluss.config.ConfigOptions; import org.apache.fluss.flink.tiering.TestingLakeTieringFactory; import org.apache.fluss.flink.tiering.TestingWriteResult; import org.apache.fluss.flink.tiering.source.metrics.TieringMetrics; @@ -33,12 +34,14 @@ import org.apache.fluss.flink.utils.FlinkTestBase; import org.apache.fluss.lake.writer.LakeWriter; import org.apache.fluss.lake.writer.WriterInitContext; +import org.apache.fluss.metadata.MergeEngineType; import org.apache.fluss.metadata.TableBucket; import org.apache.fluss.metadata.TableDescriptor; import org.apache.fluss.metadata.TablePath; import org.apache.fluss.record.LogRecord; import org.apache.fluss.row.InternalRow; import org.apache.fluss.row.encode.CompactedKeyEncoder; +import org.apache.fluss.server.replica.Replica; import org.apache.flink.api.connector.source.SourceSplit; import org.apache.flink.connector.base.source.reader.RecordsWithSplitIds; @@ -335,6 +338,110 @@ connection, new ThrowOnEmptyCompleteLakeTieringFactory())) { } } + /** + * Regression test for Issue #2371: + * tiering service used to hang forever when the source table uses {@code first_row} merge + * engine and contains duplicate keys. + * + *

Root cause: when {@code MergeEngineType=FIRST_ROW} drops a duplicate upsert, the server + * still appends an "empty" WAL batch (recordCount=0) so the log offset advances. The client's + * {@code LogFetchCollector} previously only exposed buckets with non-empty record lists, so the + * {@link org.apache.fluss.client.table.scanner.log.ScanRecords} returned to {@code + * TieringSplitReader#forLogRecords} contained no entry for such buckets and the underlying + * {@code nextFetchOffset} was invisible to the tiering layer. + * + *

The fix exposes per-bucket {@code nextLogOffset} via {@code ScanRecords}, allowing {@code + * forLogRecords} to detect end-of-range even when only empty batches were observed. + */ + @Test + void testTieringFirstRowMergeEngineFinishes() throws Exception { + TablePath tablePath = TablePath.of("fluss", "tiering_first_row_finish"); + TableDescriptor descriptor = + TableDescriptor.builder() + .schema(DEFAULT_PK_TABLE_SCHEMA) + .distributedBy(DEFAULT_BUCKET_NUM, "id") + .property(ConfigOptions.TABLE_MERGE_ENGINE, MergeEngineType.FIRST_ROW) + .build(); + long tableId = createTable(tablePath, descriptor); + + // Write the same primary keys multiple times, flushing after EACH round so + // the writes hit the server as separate batches. Under FIRST_ROW only the + // first upsert per id produces a CDC record; subsequent writes for already + // existing keys become empty WAL batches that still bump the log offset. + int distinctKeys = 5; + int duplicatesPerKey = 10; + try (Table table = conn.getTable(tablePath)) { + for (int round = 0; round < duplicatesPerKey; round++) { + UpsertWriter writer = table.newUpsert().createWriter(); + for (int id = 0; id < distinctKeys; id++) { + writer.upsert(row(id, "v" + round)); + } + writer.flush(); + } + } + + // Build log splits whose stoppingOffset equals the leader's current logEndOffset. + // For at least one bucket logEndOffset > number of CDC records, which is the + // condition that triggers the hang. + List logSplits = new ArrayList<>(); + Set splitIds = new HashSet<>(); + long totalLogEndOffset = 0L; + for (int bucket = 0; bucket < DEFAULT_BUCKET_NUM; bucket++) { + TableBucket tb = new TableBucket(tableId, bucket); + Replica leader = FLUSS_CLUSTER_EXTENSION.waitAndGetLeaderReplica(tb); + long stoppingOffset = leader.getLogTablet().localLogEndOffset(); + totalLogEndOffset += stoppingOffset; + if (stoppingOffset <= 0) { + continue; + } + TieringLogSplit split = + createLogSplit(tablePath, tableId, bucket, EARLIEST_OFFSET, stoppingOffset); + logSplits.add(split); + splitIds.add(split.splitId()); + } + assertThat(logSplits).isNotEmpty(); + // Sanity-check the precondition for the bug: total log offsets across buckets + // must exceed the number of distinct-key CDC records, otherwise no empty + // batch was produced and the bug cannot manifest. + assertThat(totalLogEndOffset) + .as( + "Expected logEndOffset (%d) to exceed distinctKeys (%d) so that " + + "empty batches are produced under FIRST_ROW", + totalLogEndOffset, distinctKeys) + .isGreaterThan(distinctKeys); + + try (Connection connection = + ConnectionFactory.createConnection( + FLUSS_CLUSTER_EXTENSION.getClientConfig()); + TieringSplitReader tieringSplitReader = + createTieringReader(connection)) { + tieringSplitReader.handleSplitsChanges(new SplitsAddition<>(logSplits)); + + // Drive the reader. With the fix in place every split must finish within a + // few fetch rounds even though most polled batches are empty under FIRST_ROW. + Set finished = new HashSet<>(); + int maxRounds = 10; + for (int i = 0; i < maxRounds && !finished.containsAll(splitIds); i++) { + RecordsWithSplitIds> fetchResult = + tieringSplitReader.fetch(); + finished.addAll(fetchResult.finishedSplits()); + // drain the iterator so that the reader advances internal state + while (fetchResult.nextSplit() != null) { + while (fetchResult.nextRecordFromSplit() != null) { + // consume + } + } + } + + assertThat(finished) + .as( + "All tiering splits must finish under FIRST_ROW merge engine " + + "with duplicate keys (issue #2371). Finished: %s, expected: %s", + finished, splitIds) + .containsAll(splitIds); + } + } + private TieringSplitReader createTieringReader(Connection connection) { final TieringMetrics tieringMetrics = new TieringMetrics( From 470aee00f766237b8c3370f82b189c6f287e6afe Mon Sep 17 00:00:00 2001 From: duankaixuan <1417048384@qq.com> Date: Fri, 8 May 2026 15:42:30 +0800 Subject: [PATCH 2/5] Address feedback --- .../table/scanner/log/LogFetchCollector.java | 10 +++++-- .../client/table/scanner/log/ScanRecords.java | 13 +++++++- .../table/scanner/log/ScanRecordsTest.java | 30 +++++++++++++++++++ 3 files changed, 50 insertions(+), 3 deletions(-) diff --git a/fluss-client/src/main/java/org/apache/fluss/client/table/scanner/log/LogFetchCollector.java b/fluss-client/src/main/java/org/apache/fluss/client/table/scanner/log/LogFetchCollector.java index 5a3a9b101b..b119cd3f9b 100644 --- a/fluss-client/src/main/java/org/apache/fluss/client/table/scanner/log/LogFetchCollector.java +++ b/fluss-client/src/main/java/org/apache/fluss/client/table/scanner/log/LogFetchCollector.java @@ -77,9 +77,15 @@ public LogFetchCollector( /** * Return the fetched log records, empty the record buffer and update the consumed position. * - *

NOTE: returning empty records guarantees the consumed position are NOT updated. + *

The returned {@link ScanRecords#records(TableBucket)} may be empty for a bucket even when + * the consumed position was advanced — e.g. when a {@link LogRecordBatch} is fully filtered out + * or when the FIRST_ROW merge engine emits empty WAL batches. In such cases the next fetch + * offset is still surfaced via {@link ScanRecords#nextLogOffset(TableBucket)} so that callers + * (e.g. the tiering service) can detect end-of-range based on offset progress alone. See FLUSS-2371. * - * @return The fetched records per partition + * @return The fetched records per partition, plus the next log offset for every bucket polled + * in this round. * @throws LogOffsetOutOfRangeException If there is OffsetOutOfRange error in fetchResponse and * the defaultResetPolicy is NONE */ diff --git a/fluss-client/src/main/java/org/apache/fluss/client/table/scanner/log/ScanRecords.java b/fluss-client/src/main/java/org/apache/fluss/client/table/scanner/log/ScanRecords.java index f8f89d8046..c6604a9af8 100644 --- a/fluss-client/src/main/java/org/apache/fluss/client/table/scanner/log/ScanRecords.java +++ b/fluss-client/src/main/java/org/apache/fluss/client/table/scanner/log/ScanRecords.java @@ -127,8 +127,19 @@ public int count() { return count; } + /** + * Returns {@code true} if this {@code ScanRecords} carries neither materialized records nor any + * advanced {@code nextLogOffset} information. + * + *

Note that a poll round can make progress (advance the next fetch offset for a bucket) + * without producing any {@link ScanRecord}, e.g. when the FIRST_ROW merge engine emits empty + * WAL batches because the upserted key already exists. Such "progress-only" results are + * intentionally NOT considered empty so that callers gating on {@code isEmpty()} (e.g. {@link + * LogScanner#poll}) do not discard the {@code nextLogOffset} information and re-introduce the + * tiering hang fixed by FLUSS-2371. + */ public boolean isEmpty() { - return records.isEmpty(); + return records.isEmpty() && nextLogOffsets.isEmpty(); } @Override diff --git a/fluss-client/src/test/java/org/apache/fluss/client/table/scanner/log/ScanRecordsTest.java b/fluss-client/src/test/java/org/apache/fluss/client/table/scanner/log/ScanRecordsTest.java index 0ef89193ea..002bc33214 100644 --- a/fluss-client/src/test/java/org/apache/fluss/client/table/scanner/log/ScanRecordsTest.java +++ b/fluss-client/src/test/java/org/apache/fluss/client/table/scanner/log/ScanRecordsTest.java @@ -105,4 +105,34 @@ void emptyBucketIsExposedViaPolledBuckets() { assertThat(scanRecords.nextLogOffset(emptyBucket)).isEqualTo(10L); assertThat(scanRecords.nextLogOffset(new TableBucket(0L, 99))).isNull(); } + + /** + * Verifies that {@link ScanRecords#isEmpty()} treats a "progress-only" poll round (no + * materialized records but advanced {@code nextLogOffset}) as non-empty, so that {@link + * LogScannerImpl#poll} does not block in {@code awaitNotEmpty} and discard the offset progress + * (regression guard for issue #2371). + */ + @Test + void isEmptyConsidersAdvancedNextLogOffsets() { + TableBucket tb = new TableBucket(0L, 0); + + // Both records and nextLogOffsets are empty -> truly empty. + ScanRecords trulyEmpty = ScanRecords.EMPTY; + assertThat(trulyEmpty.isEmpty()).isTrue(); + + // Records empty, but a bucket advanced its next fetch offset -> NOT empty. + Map progressOnly = new HashMap<>(); + progressOnly.put(tb, 42L); + ScanRecords progressOnlyRecords = new ScanRecords(Collections.emptyMap(), progressOnly); + assertThat(progressOnlyRecords.isEmpty()).isFalse(); + + // Records non-empty -> NOT empty (legacy behaviour preserved). + Map> records = new HashMap<>(); + records.put( + tb, + Collections.singletonList( + new ScanRecord(0L, 1000L, ChangeType.INSERT, row(1, "a")))); + ScanRecords withRecords = new ScanRecords(records); + assertThat(withRecords.isEmpty()).isFalse(); + } } From 6e387657bc09e32c2578719a410fd4bbeb1b3380 Mon Sep 17 00:00:00 2001 From: duankaixuan <1417048384@qq.com> Date: Fri, 8 May 2026 18:44:56 +0800 Subject: [PATCH 3/5] Revert isEmpty() semantics change, gate on polledBuckets() in LogScannerImpl --- .../table/scanner/log/LogFetchCollector.java | 5 ++++ .../table/scanner/log/LogScannerImpl.java | 12 +++++++-- .../client/table/scanner/log/ScanRecords.java | 14 ++--------- .../table/scanner/log/ScanRecordsTest.java | 25 ++++++++++++------- 4 files changed, 33 insertions(+), 23 deletions(-) diff --git a/fluss-client/src/main/java/org/apache/fluss/client/table/scanner/log/LogFetchCollector.java b/fluss-client/src/main/java/org/apache/fluss/client/table/scanner/log/LogFetchCollector.java index b119cd3f9b..4477932c9a 100644 --- a/fluss-client/src/main/java/org/apache/fluss/client/table/scanner/log/LogFetchCollector.java +++ b/fluss-client/src/main/java/org/apache/fluss/client/table/scanner/log/LogFetchCollector.java @@ -84,6 +84,11 @@ public LogFetchCollector( * (e.g. the tiering service) can detect end-of-range based on offset progress alone. See FLUSS-2371. * + *

Note that {@link ScanRecords#isEmpty()} only reflects whether any materialized records + * were produced and does NOT account for such progress-only rounds. Callers that need to detect + * offset progress should use {@link ScanRecords#polledBuckets()} or {@link + * ScanRecords#nextLogOffset(TableBucket)} instead. + * * @return The fetched records per partition, plus the next log offset for every bucket polled * in this round. * @throws LogOffsetOutOfRangeException If there is OffsetOutOfRange error in fetchResponse and diff --git a/fluss-client/src/main/java/org/apache/fluss/client/table/scanner/log/LogScannerImpl.java b/fluss-client/src/main/java/org/apache/fluss/client/table/scanner/log/LogScannerImpl.java index 9a2dbf0b4c..2762c8f1a2 100644 --- a/fluss-client/src/main/java/org/apache/fluss/client/table/scanner/log/LogScannerImpl.java +++ b/fluss-client/src/main/java/org/apache/fluss/client/table/scanner/log/LogScannerImpl.java @@ -142,7 +142,12 @@ public ScanRecords poll(Duration timeout) { long startNanos = System.nanoTime(); do { ScanRecords scanRecords = pollForFetches(); - if (scanRecords.isEmpty()) { + // Gate on polledBuckets() (records OR advanced nextLogOffsets) rather than + // isEmpty() so that a "progress-only" poll round (e.g. empty WAL batches + // produced by the FIRST_ROW merge engine, see FLUSS-2371) is surfaced to the + // caller instead of being discarded here, which would drop the nextLogOffset + // information and re-introduce the tiering hang. + if (scanRecords.polledBuckets().isEmpty()) { try { if (!logFetcher.awaitNotEmpty(startNanos + timeoutNanos)) { // logFetcher waits for the timeout and no data in buffer, @@ -249,7 +254,10 @@ public void wakeup() { private ScanRecords pollForFetches() { ScanRecords scanRecords = logFetcher.collectFetch(); - if (!scanRecords.isEmpty()) { + // Check polledBuckets() instead of isEmpty() so a progress-only poll (advanced + // nextLogOffsets with no materialized records) is returned to the caller rather + // than being re-collected, which would drop the nextLogOffset information. + if (!scanRecords.polledBuckets().isEmpty()) { return scanRecords; } diff --git a/fluss-client/src/main/java/org/apache/fluss/client/table/scanner/log/ScanRecords.java b/fluss-client/src/main/java/org/apache/fluss/client/table/scanner/log/ScanRecords.java index c6604a9af8..2f11202d4c 100644 --- a/fluss-client/src/main/java/org/apache/fluss/client/table/scanner/log/ScanRecords.java +++ b/fluss-client/src/main/java/org/apache/fluss/client/table/scanner/log/ScanRecords.java @@ -127,19 +127,9 @@ public int count() { return count; } - /** - * Returns {@code true} if this {@code ScanRecords} carries neither materialized records nor any - * advanced {@code nextLogOffset} information. - * - *

Note that a poll round can make progress (advance the next fetch offset for a bucket) - * without producing any {@link ScanRecord}, e.g. when the FIRST_ROW merge engine emits empty - * WAL batches because the upserted key already exists. Such "progress-only" results are - * intentionally NOT considered empty so that callers gating on {@code isEmpty()} (e.g. {@link - * LogScanner#poll}) do not discard the {@code nextLogOffset} information and re-introduce the - * tiering hang fixed by FLUSS-2371. - */ + /** Returns {@code true} if this {@code ScanRecords} contains no materialized records. */ public boolean isEmpty() { - return records.isEmpty() && nextLogOffsets.isEmpty(); + return records.isEmpty(); } @Override diff --git a/fluss-client/src/test/java/org/apache/fluss/client/table/scanner/log/ScanRecordsTest.java b/fluss-client/src/test/java/org/apache/fluss/client/table/scanner/log/ScanRecordsTest.java index 002bc33214..b6894ea341 100644 --- a/fluss-client/src/test/java/org/apache/fluss/client/table/scanner/log/ScanRecordsTest.java +++ b/fluss-client/src/test/java/org/apache/fluss/client/table/scanner/log/ScanRecordsTest.java @@ -107,26 +107,32 @@ void emptyBucketIsExposedViaPolledBuckets() { } /** - * Verifies that {@link ScanRecords#isEmpty()} treats a "progress-only" poll round (no - * materialized records but advanced {@code nextLogOffset}) as non-empty, so that {@link - * LogScannerImpl#poll} does not block in {@code awaitNotEmpty} and discard the offset progress - * (regression guard for issue #2371). + * Verifies that {@link ScanRecords#isEmpty()} reflects only whether materialized records are + * present, regardless of advanced {@code nextLogOffsets}. The "progress-only" gating used by + * {@link LogScannerImpl#poll} (regression guard for issue #2371) is built on top of {@link + * ScanRecords#polledBuckets()} instead, so the public {@code isEmpty()} contract stays + * compatible with callers that treat empty as "no records to consume". */ @Test - void isEmptyConsidersAdvancedNextLogOffsets() { + void isEmptyReflectsOnlyMaterializedRecords() { TableBucket tb = new TableBucket(0L, 0); - // Both records and nextLogOffsets are empty -> truly empty. + // Both records and nextLogOffsets are empty -> empty. ScanRecords trulyEmpty = ScanRecords.EMPTY; assertThat(trulyEmpty.isEmpty()).isTrue(); + assertThat(trulyEmpty.polledBuckets()).isEmpty(); - // Records empty, but a bucket advanced its next fetch offset -> NOT empty. + // Records empty, but a bucket advanced its next fetch offset -> still isEmpty(), + // but polledBuckets() must surface the advanced bucket so LogScannerImpl can + // forward the progress to its caller. Map progressOnly = new HashMap<>(); progressOnly.put(tb, 42L); ScanRecords progressOnlyRecords = new ScanRecords(Collections.emptyMap(), progressOnly); - assertThat(progressOnlyRecords.isEmpty()).isFalse(); + assertThat(progressOnlyRecords.isEmpty()).isTrue(); + assertThat(progressOnlyRecords.polledBuckets()).containsExactly(tb); + assertThat(progressOnlyRecords.nextLogOffset(tb)).isEqualTo(42L); - // Records non-empty -> NOT empty (legacy behaviour preserved). + // Records non-empty -> NOT empty. Map> records = new HashMap<>(); records.put( tb, @@ -134,5 +140,6 @@ void isEmptyConsidersAdvancedNextLogOffsets() { new ScanRecord(0L, 1000L, ChangeType.INSERT, row(1, "a")))); ScanRecords withRecords = new ScanRecords(records); assertThat(withRecords.isEmpty()).isFalse(); + assertThat(withRecords.polledBuckets()).containsExactly(tb); } } From 996de583ea969a8d708e303a95fa576e7dc6e49c Mon Sep 17 00:00:00 2001 From: duankaixuan <1417048384@qq.com> Date: Sun, 10 May 2026 17:34:08 +0800 Subject: [PATCH 4/5] Address fallback --- .../table/scanner/log/LogFetchCollector.java | 29 +++------ .../table/scanner/log/LogScannerImpl.java | 14 ++-- .../client/table/scanner/log/ScanRecords.java | 54 ++++------------ .../scanner/log/LogFetchCollectorTest.java | 9 +-- .../table/scanner/log/ScanRecordsTest.java | 64 +++++++++---------- .../tiering/source/TieringSplitReader.java | 60 ++++++++--------- .../source/TieringSplitReaderTest.java | 33 +++------- 7 files changed, 97 insertions(+), 166 deletions(-) diff --git a/fluss-client/src/main/java/org/apache/fluss/client/table/scanner/log/LogFetchCollector.java b/fluss-client/src/main/java/org/apache/fluss/client/table/scanner/log/LogFetchCollector.java index 4477932c9a..13b41e3e3c 100644 --- a/fluss-client/src/main/java/org/apache/fluss/client/table/scanner/log/LogFetchCollector.java +++ b/fluss-client/src/main/java/org/apache/fluss/client/table/scanner/log/LogFetchCollector.java @@ -77,30 +77,13 @@ public LogFetchCollector( /** * Return the fetched log records, empty the record buffer and update the consumed position. * - *

The returned {@link ScanRecords#records(TableBucket)} may be empty for a bucket even when - * the consumed position was advanced — e.g. when a {@link LogRecordBatch} is fully filtered out - * or when the FIRST_ROW merge engine emits empty WAL batches. In such cases the next fetch - * offset is still surfaced via {@link ScanRecords#nextLogOffset(TableBucket)} so that callers - * (e.g. the tiering service) can detect end-of-range based on offset progress alone. See FLUSS-2371. - * - *

Note that {@link ScanRecords#isEmpty()} only reflects whether any materialized records - * were produced and does NOT account for such progress-only rounds. Callers that need to detect - * offset progress should use {@link ScanRecords#polledBuckets()} or {@link - * ScanRecords#nextLogOffset(TableBucket)} instead. - * - * @return The fetched records per partition, plus the next log offset for every bucket polled - * in this round. + * @return The fetched records per partition * @throws LogOffsetOutOfRangeException If there is OffsetOutOfRange error in fetchResponse and * the defaultResetPolicy is NONE */ public ScanRecords collectFetch(final LogFetchBuffer logFetchBuffer) { Map> fetched = new HashMap<>(); - // Tracks the next fetch offset for every bucket polled in this round, even when the - // returned record list is empty (e.g. empty WAL batches produced by the FIRST_ROW - // merge engine, see issue #2371). This lets callers (such as the tiering service) - // detect that the log offset has advanced past empty batches. - Map nextOffsets = new HashMap<>(); + Map lastConsumedOffsets = new HashMap<>(); int recordsRemaining = maxPollRecords; try { @@ -139,7 +122,7 @@ public ScanRecords collectFetch(final LogFetchBuffer logFetchBuffer) { TableBucket tableBucket = nextInLineFetch.tableBucket; // Always record the advanced next fetch offset for this bucket, even when // the materialized record list is empty. - nextOffsets.put(tableBucket, nextInLineFetch.nextFetchOffset()); + lastConsumedOffsets.put(tableBucket, nextInLineFetch.nextFetchOffset()); if (!records.isEmpty()) { List currentRecords = fetched.get(tableBucket); if (currentRecords == null) { @@ -166,7 +149,11 @@ public ScanRecords collectFetch(final LogFetchBuffer logFetchBuffer) { } } - return new ScanRecords(fetched, nextOffsets); + // Ensure every polled bucket appears in fetched so that buckets() reflects the polled set. + for (TableBucket polled : lastConsumedOffsets.keySet()) { + fetched.putIfAbsent(polled, Collections.emptyList()); + } + return new ScanRecords(fetched, lastConsumedOffsets); } private List fetchRecords(CompletedFetch nextInLineFetch, int maxRecords) { diff --git a/fluss-client/src/main/java/org/apache/fluss/client/table/scanner/log/LogScannerImpl.java b/fluss-client/src/main/java/org/apache/fluss/client/table/scanner/log/LogScannerImpl.java index 2762c8f1a2..0ee14a78bd 100644 --- a/fluss-client/src/main/java/org/apache/fluss/client/table/scanner/log/LogScannerImpl.java +++ b/fluss-client/src/main/java/org/apache/fluss/client/table/scanner/log/LogScannerImpl.java @@ -142,12 +142,8 @@ public ScanRecords poll(Duration timeout) { long startNanos = System.nanoTime(); do { ScanRecords scanRecords = pollForFetches(); - // Gate on polledBuckets() (records OR advanced nextLogOffsets) rather than - // isEmpty() so that a "progress-only" poll round (e.g. empty WAL batches - // produced by the FIRST_ROW merge engine, see FLUSS-2371) is surfaced to the - // caller instead of being discarded here, which would drop the nextLogOffset - // information and re-introduce the tiering hang. - if (scanRecords.polledBuckets().isEmpty()) { + // Gate on buckets() rather than isEmpty() so progress-only polls reach the caller. + if (scanRecords.buckets().isEmpty()) { try { if (!logFetcher.awaitNotEmpty(startNanos + timeoutNanos)) { // logFetcher waits for the timeout and no data in buffer, @@ -254,10 +250,8 @@ public void wakeup() { private ScanRecords pollForFetches() { ScanRecords scanRecords = logFetcher.collectFetch(); - // Check polledBuckets() instead of isEmpty() so a progress-only poll (advanced - // nextLogOffsets with no materialized records) is returned to the caller rather - // than being re-collected, which would drop the nextLogOffset information. - if (!scanRecords.polledBuckets().isEmpty()) { + // Check buckets() (includes progress-only buckets). + if (!scanRecords.buckets().isEmpty()) { return scanRecords; } diff --git a/fluss-client/src/main/java/org/apache/fluss/client/table/scanner/log/ScanRecords.java b/fluss-client/src/main/java/org/apache/fluss/client/table/scanner/log/ScanRecords.java index 2f11202d4c..a46d10d932 100644 --- a/fluss-client/src/main/java/org/apache/fluss/client/table/scanner/log/ScanRecords.java +++ b/fluss-client/src/main/java/org/apache/fluss/client/table/scanner/log/ScanRecords.java @@ -25,7 +25,6 @@ import javax.annotation.Nullable; import java.util.Collections; -import java.util.HashSet; import java.util.Iterator; import java.util.List; import java.util.Map; @@ -44,22 +43,18 @@ public class ScanRecords implements Iterable { private final Map> records; - /** - * The next log offset (exclusive upper bound of consumed records) for each bucket polled in - * this round. Includes buckets that produced zero records but whose log offset advanced (e.g. - * empty WAL batches generated by the FIRST_ROW merge engine when the upserted key already - * exists). See FLUSS-2371. - */ - private final Map nextLogOffsets; + /** The exclusive upper bound of consumed offsets per polled bucket in this round. */ + private final Map lastConsumedOffsets; public ScanRecords(Map> records) { this(records, Collections.emptyMap()); } public ScanRecords( - Map> records, Map nextLogOffsets) { + Map> records, + Map lastConsumedOffsets) { this.records = records; - this.nextLogOffsets = nextLogOffsets; + this.lastConsumedOffsets = lastConsumedOffsets; } /** @@ -76,46 +71,23 @@ public List records(TableBucket scanBucket) { } /** - * Get the bucket ids which have records contained in this record set. - * - * @return the set of partitions with data in this record set (maybe empty if no data was - * returned) + * Get the bucket ids that were polled in this round, including buckets whose record list is + * empty but whose log offset still advanced. */ public Set buckets() { return Collections.unmodifiableSet(records.keySet()); } /** - * Get all bucket ids that were polled in this round, including buckets whose record list is - * empty but whose log offset still advanced (e.g. empty WAL batches produced by the FIRST_ROW - * merge engine). Batch readers that need to detect end-of-range based on offset progress (such - * as the tiering service) should iterate over this set instead of {@link #buckets()}. - * - * @return the union of buckets exposed via {@link #buckets()} and buckets that only have an - * advanced {@code nextLogOffset}. - */ - public Set polledBuckets() { - if (nextLogOffsets.isEmpty()) { - return buckets(); - } - Set all = new HashSet<>(records.keySet()); - all.addAll(nextLogOffsets.keySet()); - return Collections.unmodifiableSet(all); - } - - /** - * The next log offset that the scanner will fetch from for the given bucket in subsequent - * polls. This offset is always advanced past empty WAL batches even when no {@link ScanRecord} - * is materialized for the bucket, which allows callers (e.g. the tiering service) to detect - * end-of-range correctly even under the FIRST_ROW merge engine. + * Get the exclusive upper bound of offsets consumed for the given bucket in this poll round. * * @param bucket the bucket to query - * @return the exclusive upper bound offset consumed in this round, or {@code null} if the - * bucket was not polled in this round. + * @return the exclusive upper bound offset, or {@code null} if the bucket was not polled in + * this round */ @Nullable - public Long nextLogOffset(TableBucket bucket) { - return nextLogOffsets.get(bucket); + public Long lastConsumedOffset(TableBucket bucket) { + return lastConsumedOffsets.get(bucket); } /** The number of records for all buckets. */ @@ -129,7 +101,7 @@ public int count() { /** Returns {@code true} if this {@code ScanRecords} contains no materialized records. */ public boolean isEmpty() { - return records.isEmpty(); + return count() == 0; } @Override diff --git a/fluss-client/src/test/java/org/apache/fluss/client/table/scanner/log/LogFetchCollectorTest.java b/fluss-client/src/test/java/org/apache/fluss/client/table/scanner/log/LogFetchCollectorTest.java index a9f0d55b65..c7cce75c2a 100644 --- a/fluss-client/src/test/java/org/apache/fluss/client/table/scanner/log/LogFetchCollectorTest.java +++ b/fluss-client/src/test/java/org/apache/fluss/client/table/scanner/log/LogFetchCollectorTest.java @@ -258,12 +258,9 @@ void testFilteredEmptyResponseAdvancesOffset() { assertThat(scanRecords.records(tb)).isEmpty(); assertThat(logScannerStatus.getBucketOffset(tb)).isEqualTo(20L); assertThat(completedFetch.isConsumed()).isTrue(); - // Although the materialized record list is empty, the bucket must be exposed - // through polledBuckets() with an advanced nextLogOffset so that batch readers - // (e.g. the tiering service) can detect end-of-range. See issue #2371. - assertThat(scanRecords.buckets()).doesNotContain(tb); - assertThat(scanRecords.polledBuckets()).contains(tb); - assertThat(scanRecords.nextLogOffset(tb)).isEqualTo(20L); + // Empty record list, but bucket exposed via buckets() with an advanced lastConsumedOffset. + assertThat(scanRecords.buckets()).contains(tb); + assertThat(scanRecords.lastConsumedOffset(tb)).isEqualTo(20L); } private DefaultCompletedFetch makeCompletedFetch( diff --git a/fluss-client/src/test/java/org/apache/fluss/client/table/scanner/log/ScanRecordsTest.java b/fluss-client/src/test/java/org/apache/fluss/client/table/scanner/log/ScanRecordsTest.java index b6894ea341..a71d3467b0 100644 --- a/fluss-client/src/test/java/org/apache/fluss/client/table/scanner/log/ScanRecordsTest.java +++ b/fluss-client/src/test/java/org/apache/fluss/client/table/scanner/log/ScanRecordsTest.java @@ -60,9 +60,9 @@ void iterator() { assertThat(c).isEqualTo(4); } - /** Verifies the legacy single-arg constructor leaves {@code nextLogOffset} undefined. */ + /** Verifies the legacy single-arg constructor leaves {@code lastConsumedOffset} undefined. */ @Test - void legacyConstructorHasNoNextLogOffset() { + void legacyConstructorHasNoLastConsumedOffset() { TableBucket tb = new TableBucket(0L, 0); Map> records = new HashMap<>(); records.put(tb, Collections.emptyList()); @@ -70,18 +70,15 @@ void legacyConstructorHasNoNextLogOffset() { ScanRecords scanRecords = new ScanRecords(records); assertThat(scanRecords.buckets()).containsExactly(tb); - assertThat(scanRecords.polledBuckets()).containsExactly(tb); - assertThat(scanRecords.nextLogOffset(tb)).isNull(); + assertThat(scanRecords.lastConsumedOffset(tb)).isNull(); } /** - * Verifies that buckets which only produced empty WAL batches (no {@link ScanRecord}) are - * exposed via {@link ScanRecords#polledBuckets()} and {@link ScanRecords#nextLogOffset}, while - * the legacy {@link ScanRecords#buckets()} keeps its old semantics. This is the API surface - * relied on by the tiering service to fix issue #2371. + * Verifies buckets with only advanced offsets are still exposed via {@link + * ScanRecords#buckets()} and carry their {@code lastConsumedOffset}. */ @Test - void emptyBucketIsExposedViaPolledBuckets() { + void emptyBucketIsExposedViaBuckets() { TableBucket bucketWithRecords = new TableBucket(0L, 0); TableBucket emptyBucket = new TableBucket(0L, 1); @@ -90,49 +87,48 @@ void emptyBucketIsExposedViaPolledBuckets() { bucketWithRecords, Collections.singletonList( new ScanRecord(5L, 1000L, ChangeType.INSERT, row(1, "a")))); + // Empty-progress buckets also appear in records (as emptyList). + records.put(emptyBucket, Collections.emptyList()); - Map nextOffsets = new HashMap<>(); - nextOffsets.put(bucketWithRecords, 6L); - nextOffsets.put(emptyBucket, 10L); + Map lastConsumedOffsets = new HashMap<>(); + lastConsumedOffsets.put(bucketWithRecords, 6L); + lastConsumedOffsets.put(emptyBucket, 10L); - ScanRecords scanRecords = new ScanRecords(records, nextOffsets); + ScanRecords scanRecords = new ScanRecords(records, lastConsumedOffsets); - assertThat(scanRecords.buckets()).containsExactly(bucketWithRecords); - assertThat(scanRecords.polledBuckets()) - .containsExactlyInAnyOrder(bucketWithRecords, emptyBucket); + assertThat(scanRecords.buckets()).containsExactlyInAnyOrder(bucketWithRecords, emptyBucket); assertThat(scanRecords.records(emptyBucket)).isEmpty(); - assertThat(scanRecords.nextLogOffset(bucketWithRecords)).isEqualTo(6L); - assertThat(scanRecords.nextLogOffset(emptyBucket)).isEqualTo(10L); - assertThat(scanRecords.nextLogOffset(new TableBucket(0L, 99))).isNull(); + assertThat(scanRecords.lastConsumedOffset(bucketWithRecords)).isEqualTo(6L); + assertThat(scanRecords.lastConsumedOffset(emptyBucket)).isEqualTo(10L); + assertThat(scanRecords.lastConsumedOffset(new TableBucket(0L, 99))).isNull(); } /** - * Verifies that {@link ScanRecords#isEmpty()} reflects only whether materialized records are - * present, regardless of advanced {@code nextLogOffsets}. The "progress-only" gating used by - * {@link LogScannerImpl#poll} (regression guard for issue #2371) is built on top of {@link - * ScanRecords#polledBuckets()} instead, so the public {@code isEmpty()} contract stays - * compatible with callers that treat empty as "no records to consume". + * Verifies {@link ScanRecords#isEmpty()} reflects only materialized records, regardless of + * advanced offsets. */ @Test void isEmptyReflectsOnlyMaterializedRecords() { TableBucket tb = new TableBucket(0L, 0); - // Both records and nextLogOffsets are empty -> empty. + // No records and no progress: both isEmpty() and buckets() must be empty. ScanRecords trulyEmpty = ScanRecords.EMPTY; assertThat(trulyEmpty.isEmpty()).isTrue(); - assertThat(trulyEmpty.polledBuckets()).isEmpty(); + assertThat(trulyEmpty.buckets()).isEmpty(); - // Records empty, but a bucket advanced its next fetch offset -> still isEmpty(), - // but polledBuckets() must surface the advanced bucket so LogScannerImpl can - // forward the progress to its caller. + // Progress-only round: isEmpty() must stay true (no materialized records), + // while buckets() must still expose the advanced bucket for callers to detect. + Map> emptyRecords = new HashMap<>(); + emptyRecords.put(tb, Collections.emptyList()); Map progressOnly = new HashMap<>(); progressOnly.put(tb, 42L); - ScanRecords progressOnlyRecords = new ScanRecords(Collections.emptyMap(), progressOnly); + ScanRecords progressOnlyRecords = new ScanRecords(emptyRecords, progressOnly); assertThat(progressOnlyRecords.isEmpty()).isTrue(); - assertThat(progressOnlyRecords.polledBuckets()).containsExactly(tb); - assertThat(progressOnlyRecords.nextLogOffset(tb)).isEqualTo(42L); + assertThat(progressOnlyRecords.buckets()).containsExactly(tb); + assertThat(progressOnlyRecords.lastConsumedOffset(tb)).isEqualTo(42L); - // Records non-empty -> NOT empty. + // Materialized records present: isEmpty() flips to false; legacy single-arg ctor still + // works. Map> records = new HashMap<>(); records.put( tb, @@ -140,6 +136,6 @@ void isEmptyReflectsOnlyMaterializedRecords() { new ScanRecord(0L, 1000L, ChangeType.INSERT, row(1, "a")))); ScanRecords withRecords = new ScanRecords(records); assertThat(withRecords.isEmpty()).isFalse(); - assertThat(withRecords.polledBuckets()).containsExactly(tb); + assertThat(withRecords.buckets()).containsExactly(tb); } } diff --git a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/tiering/source/TieringSplitReader.java b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/tiering/source/TieringSplitReader.java index 429eea5480..73f4ead211 100644 --- a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/tiering/source/TieringSplitReader.java +++ b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/tiering/source/TieringSplitReader.java @@ -355,12 +355,8 @@ private RecordsWithSplitIds> forLogRecords( Map> writeResults = new HashMap<>(); Map finishedSplitIds = new HashMap<>(); - // Iterate polledBuckets() (instead of buckets()) so that buckets which only produced - // empty WAL batches (e.g. duplicate upserts under the FIRST_ROW merge engine) are - // also visited. Otherwise the tiering service would loop forever on such buckets - // because their log offset has advanced but no ScanRecord materialized. See - // https://github.com/apache/fluss/issues/2371. - for (TableBucket bucket : scanRecords.polledBuckets()) { + // Iterate every polled bucket, including those that only advanced their offset. + for (TableBucket bucket : scanRecords.buckets()) { // no any stopping offset, just skip handle the records for the bucket Long stoppingOffset = currentTableStoppingOffsets.get(bucket); if (stoppingOffset == null) { @@ -368,18 +364,20 @@ private RecordsWithSplitIds> forLogRecords( } List bucketScanRecords = scanRecords.records(bucket); - Long nextLogOffset = scanRecords.nextLogOffset(bucket); + ScanRecord lastRecord = null; LakeWriter lakeWriter = null; if (!bucketScanRecords.isEmpty()) { for (ScanRecord record : bucketScanRecords) { - // if record is less than stopping offset + // only tier records within the split range [start, stoppingOffset). if (record.logOffset() < stoppingOffset) { if (lakeWriter == null) { lakeWriter = getOrCreateLakeWriter( bucket, - currentTableSplitsByBucket.get(bucket).getPartitionName()); + currentTableSplitsByBucket + .get(bucket) + .getPartitionName()); } lakeWriter.write(record); if (record.getSizeInBytes() > 0) { @@ -387,24 +385,33 @@ private RecordsWithSplitIds> forLogRecords( } } } - ScanRecord lastRecord = bucketScanRecords.get(bucketScanRecords.size() - 1); - currentTableTieredOffsetAndTimestamp.put( - bucket, - new LogOffsetAndTimestamp(lastRecord.logOffset(), lastRecord.timestamp())); + lastRecord = bucketScanRecords.get(bucketScanRecords.size() - 1); } - // Decide whether the split has reached its end. Prefer the scanner-reported - // nextLogOffset because it advances even when the bucket only produced empty - // batches; fall back to the last record offset for backwards compatibility. - boolean reachedEnd; - if (nextLogOffset != null) { - reachedEnd = nextLogOffset >= stoppingOffset; - } else if (!bucketScanRecords.isEmpty()) { - ScanRecord lastRecord = bucketScanRecords.get(bucketScanRecords.size() - 1); - reachedEnd = lastRecord.logOffset() >= stoppingOffset - 1; + // Prefer the scanner-reported lastConsumedOffset; fall back to the last record offset. + Long lastConsumedOffset = scanRecords.lastConsumedOffset(bucket); + boolean reachedEnd = + lastConsumedOffset != null + ? lastConsumedOffset >= stoppingOffset + : lastRecord != null && lastRecord.logOffset() >= stoppingOffset - 1; + + if (!reachedEnd && lastRecord == null) { + continue; + } + + // When reachedEnd, the tiered offset is stoppingOffset - 1 (exclusive bound). + // Timestamp prefers the current record, then the previously tracked value. + long tieredOffset = reachedEnd ? stoppingOffset - 1 : lastRecord.logOffset(); + long tieredTimestamp; + if (lastRecord != null) { + tieredTimestamp = lastRecord.timestamp(); } else { - reachedEnd = false; + LogOffsetAndTimestamp latest = currentTableTieredOffsetAndTimestamp.get(bucket); + tieredTimestamp = latest != null ? latest.timestamp : UNKNOWN_BUCKET_TIMESTAMP; } + currentTableTieredOffsetAndTimestamp.put( + bucket, new LogOffsetAndTimestamp(tieredOffset, tieredTimestamp)); + if (!reachedEnd) { continue; } @@ -418,18 +425,13 @@ private RecordsWithSplitIds> forLogRecords( } TieringSplit currentTieringSplit = currentTableSplitsByBucket.remove(bucket); String currentSplitId = currentTieringSplit.splitId(); - // Use the latest known record timestamp for the bucket; fall back to - // UNKNOWN_BUCKET_TIMESTAMP when the split only contained empty batches and we - // never observed a real record. - LogOffsetAndTimestamp latest = currentTableTieredOffsetAndTimestamp.get(bucket); - long finishTimestamp = latest != null ? latest.timestamp : UNKNOWN_BUCKET_TIMESTAMP; writeResults.put( bucket, completeLakeWriter( bucket, currentTieringSplit.getPartitionName(), stoppingOffset, - finishTimestamp)); + tieredTimestamp)); finishedSplitIds.put(bucket, currentSplitId); LOG.info( "Finish tier bucket {} for table {}, split: {}.", diff --git a/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/tiering/source/TieringSplitReaderTest.java b/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/tiering/source/TieringSplitReaderTest.java index 2cf0a9dfee..9aad5d1c3d 100644 --- a/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/tiering/source/TieringSplitReaderTest.java +++ b/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/tiering/source/TieringSplitReaderTest.java @@ -339,19 +339,8 @@ connection, new ThrowOnEmptyCompleteLakeTieringFactory())) { } /** - * Regression test for Issue #2371: - * tiering service used to hang forever when the source table uses {@code first_row} merge - * engine and contains duplicate keys. - * - *

Root cause: when {@code MergeEngineType=FIRST_ROW} drops a duplicate upsert, the server - * still appends an "empty" WAL batch (recordCount=0) so the log offset advances. The client's - * {@code LogFetchCollector} previously only exposed buckets with non-empty record lists, so the - * {@link org.apache.fluss.client.table.scanner.log.ScanRecords} returned to {@code - * TieringSplitReader#forLogRecords} contained no entry for such buckets and the underlying - * {@code nextFetchOffset} was invisible to the tiering layer. - * - *

The fix exposes per-bucket {@code nextLogOffset} via {@code ScanRecords}, allowing {@code - * forLogRecords} to detect end-of-range even when only empty batches were observed. + * Verifies that the tiering service finishes under {@code first_row} merge engine even when + * duplicate upserts produce empty WAL batches. */ @Test void testTieringFirstRowMergeEngineFinishes() throws Exception { @@ -364,10 +353,8 @@ void testTieringFirstRowMergeEngineFinishes() throws Exception { .build(); long tableId = createTable(tablePath, descriptor); - // Write the same primary keys multiple times, flushing after EACH round so - // the writes hit the server as separate batches. Under FIRST_ROW only the - // first upsert per id produces a CDC record; subsequent writes for already - // existing keys become empty WAL batches that still bump the log offset. + // Duplicate upserts under FIRST_ROW: only the first per id yields a CDC + // record, the rest become empty WAL batches that still advance the offset. int distinctKeys = 5; int duplicatesPerKey = 10; try (Table table = conn.getTable(tablePath)) { @@ -381,8 +368,6 @@ void testTieringFirstRowMergeEngineFinishes() throws Exception { } // Build log splits whose stoppingOffset equals the leader's current logEndOffset. - // For at least one bucket logEndOffset > number of CDC records, which is the - // condition that triggers the hang. List logSplits = new ArrayList<>(); Set splitIds = new HashSet<>(); long totalLogEndOffset = 0L; @@ -400,9 +385,8 @@ void testTieringFirstRowMergeEngineFinishes() throws Exception { splitIds.add(split.splitId()); } assertThat(logSplits).isNotEmpty(); - // Sanity-check the precondition for the bug: total log offsets across buckets - // must exceed the number of distinct-key CDC records, otherwise no empty - // batch was produced and the bug cannot manifest. + // Pre-condition: total log offsets must exceed distinct-key count, otherwise + // no empty batch was produced. assertThat(totalLogEndOffset) .as( "Expected logEndOffset (%d) to exceed distinctKeys (%d) so that " @@ -417,8 +401,7 @@ void testTieringFirstRowMergeEngineFinishes() throws Exception { createTieringReader(connection)) { tieringSplitReader.handleSplitsChanges(new SplitsAddition<>(logSplits)); - // Drive the reader. With the fix in place every split must finish within a - // few fetch rounds even though most polled batches are empty under FIRST_ROW. + // With the fix every split must finish within a few fetch rounds. Set finished = new HashSet<>(); int maxRounds = 10; for (int i = 0; i < maxRounds && !finished.containsAll(splitIds); i++) { @@ -436,7 +419,7 @@ void testTieringFirstRowMergeEngineFinishes() throws Exception { assertThat(finished) .as( "All tiering splits must finish under FIRST_ROW merge engine " - + "with duplicate keys (issue #2371). Finished: %s, expected: %s", + + "with duplicate keys. Finished: %s, expected: %s", finished, splitIds) .containsAll(splitIds); } From a39b8fed0e288e0d3983f81e11b5d63183d9b4a0 Mon Sep 17 00:00:00 2001 From: duankaixuan <1417048384@qq.com> Date: Sun, 10 May 2026 22:41:36 +0800 Subject: [PATCH 5/5] Adapt LogFetcherITCase to ScanRecords#buckets() polled-set semantics --- .../fluss/client/table/scanner/log/LogFetcherITCase.java | 8 ++++++-- 1 file changed, 6 insertions(+), 2 deletions(-) diff --git a/fluss-client/src/test/java/org/apache/fluss/client/table/scanner/log/LogFetcherITCase.java b/fluss-client/src/test/java/org/apache/fluss/client/table/scanner/log/LogFetcherITCase.java index 50addfcfe0..147be7922c 100644 --- a/fluss-client/src/test/java/org/apache/fluss/client/table/scanner/log/LogFetcherITCase.java +++ b/fluss-client/src/test/java/org/apache/fluss/client/table/scanner/log/LogFetcherITCase.java @@ -149,7 +149,10 @@ void testFetchWithSchemaChange() throws Exception { assertThat(logFetcher.getCompletedFetchesSize()).isEqualTo(2); }); ScanRecords records = logFetcher.collectFetch(); - assertThat(records.buckets().size()).isEqualTo(1); + // Both polled buckets are exposed; tb1 was polled but produced no records. + TableBucket tb1 = new TableBucket(tableId, bucketId1); + assertThat(records.buckets()).containsExactlyInAnyOrder(tb0, tb1); + assertThat(records.records(tb1)).isEmpty(); List scanRecords = records.records(tb0); assertThat(scanRecords.stream().map(ScanRecord::getRow).collect(Collectors.toList())) .isEqualTo(expectedRows); @@ -195,7 +198,8 @@ void testFetchWithSchemaChange() throws Exception { assertThat(newSchemaLogFetcher.getCompletedFetchesSize()).isEqualTo(2); }); records = newSchemaLogFetcher.collectFetch(); - assertThat(records.buckets().size()).isEqualTo(1); + assertThat(records.buckets()).containsExactlyInAnyOrder(tb0, tb1); + assertThat(records.records(tb1)).isEmpty(); assertThat(records.records(tb0)).hasSize(20); scanRecords = records.records(tb0); assertThat(scanRecords.stream().map(ScanRecord::getRow).collect(Collectors.toList()))