Skip to content

[client] Fix tiering hang on first_row merge engine empty batches#3242

Open
Kaixuan-Duan wants to merge 3 commits intoapache:mainfrom
Kaixuan-Duan:issue-2371-tiering-stuck
Open

[client] Fix tiering hang on first_row merge engine empty batches#3242
Kaixuan-Duan wants to merge 3 commits intoapache:mainfrom
Kaixuan-Duan:issue-2371-tiering-stuck

Conversation

@Kaixuan-Duan
Copy link
Copy Markdown
Contributor

Purpose

Linked issue: close #2371

Brief change log

  • fluss-client
    • ScanRecords: add a Map<TableBucket, Long> nextLogOffsets field with a new two-arg constructor (legacy single-arg constructor preserved for backwards compatibility); expose two new accessors:
      • polledBuckets() — union of buckets that produced records and buckets that only advanced their next fetch offset.
      • nextLogOffset(bucket) — exclusive upper bound of consumed offsets in this poll round, or null if the bucket was not polled.
    • LogFetchCollector#collectFetch: always record the advanced nextFetchOffset per polled bucket, even when the materialized record list is empty, and pack it into the new ScanRecords constructor.
  • fluss-flink-common
    • TieringSplitReader#forLogRecords: iterate scanRecords.polledBuckets() instead of scanRecords.buckets(). Determine end-of-range by comparing the scanner-reported nextLogOffset against the bucket's stoppingOffset (with the legacy lastRecord.logOffset() >= stoppingOffset - 1 check kept as a fallback for callers that don't supply nextLogOffset). Tolerate splits that finish with no real record observed by falling back to UNKNOWN_BUCKET_TIMESTAMP when computing the finish timestamp.

Tests

./mvnw -pl fluss-client,fluss-flink/fluss-flink-common \
-Dtest='ScanRecordsTest,LogFetchCollectorTest,TieringSplitReaderTest#testTieringFirstRowMergeEngineFinishes' \
-DfailIfNoTests=false test

API and Format

Documentation

@Kaixuan-Duan
Copy link
Copy Markdown
Contributor Author

@luoyuxia @zuston Hi, I have tried to resolve issue #2371. PTAL

Copy link
Copy Markdown
Contributor

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

This PR addresses a tiering hang for tables using the FIRST_ROW merge engine where the log offset can advance via “empty” WAL batches (recordCount=0) even when no ScanRecord is materialized, causing the tiering layer to repeatedly poll the same range (Issue #2371).

Changes:

  • Extend fluss-client ScanRecords to carry per-bucket nextLogOffsets, and expose polledBuckets() + nextLogOffset(bucket) so callers can observe offset progress even when no records were produced.
  • Update LogFetchCollector to always record nextFetchOffset for each polled bucket and construct ScanRecords with these offsets.
  • Update Flink TieringSplitReader#forLogRecords to iterate polledBuckets() and determine end-of-range using nextLogOffset (with fallback to last-record checks), plus add a regression test reproducing Issue #2371.

Reviewed changes

Copilot reviewed 6 out of 6 changed files in this pull request and generated 2 comments.

Show a summary per file
File Description
fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/tiering/source/TieringSplitReaderTest.java Adds regression test to ensure tiering completes under FIRST_ROW with duplicate keys/empty batches.
fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/tiering/source/TieringSplitReader.java Uses polledBuckets() and nextLogOffset to finish splits even when only empty batches occur.
fluss-client/src/test/java/org/apache/fluss/client/table/scanner/log/ScanRecordsTest.java Adds unit tests for legacy constructor behavior and new polledBuckets()/nextLogOffset semantics.
fluss-client/src/test/java/org/apache/fluss/client/table/scanner/log/LogFetchCollectorTest.java Verifies empty/filtered responses still expose offset advancement via polledBuckets()/nextLogOffset.
fluss-client/src/main/java/org/apache/fluss/client/table/scanner/log/ScanRecords.java Introduces nextLogOffsets, new constructor, polledBuckets(), and nextLogOffset(bucket).
fluss-client/src/main/java/org/apache/fluss/client/table/scanner/log/LogFetchCollector.java Always records advanced nextFetchOffset per polled bucket and returns it in ScanRecords.

💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.

Comment on lines +97 to +104
public Set<TableBucket> polledBuckets() {
if (nextLogOffsets.isEmpty()) {
return buckets();
}
Set<TableBucket> all = new HashSet<>(records.keySet());
all.addAll(nextLogOffsets.keySet());
return Collections.unmodifiableSet(all);
}
Comment on lines +88 to +92
// Tracks the next fetch offset for every bucket polled in this round, even when the
// returned record list is empty (e.g. empty WAL batches produced by the FIRST_ROW
// merge engine, see issue #2371). This lets callers (such as the tiering service)
// detect that the log offset has advanced past empty batches.
Map<TableBucket, Long> nextOffsets = new HashMap<>();
@Kaixuan-Duan
Copy link
Copy Markdown
Contributor Author

@luoyuxia Hi, I have addressed the feedback. PTAL

@luoyuxia
Copy link
Copy Markdown
Contributor

luoyuxia commented May 8, 2026

@Kaixuan-Duan Thanks for the pr. Will take a look when i got some time

Copy link
Copy Markdown
Contributor

@luoyuxia luoyuxia left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@Kaixuan-Duan Thanks for the pr. Left minor comments. PTAL

* Return the fetched log records, empty the record buffer and update the consumed position.
*
* <p>NOTE: returning empty records guarantees the consumed position are NOT updated.
* <p>The returned {@link ScanRecords#records(TableBucket)} may be empty for a bucket even when
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The comments looks too long..Too long comments may break attention. I think we can revert the changes. It may not important to add these comments.

* empty WAL batches generated by the FIRST_ROW merge engine when the upserted key already
* exists). See <a href="https://github.com/apache/fluss/issues/2371">FLUSS-2371</a>.
*/
private final Map<TableBucket, Long> nextLogOffsets;
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nextLogOffsets -> lastConsumedOffsets?
Also comment that it's a exclusive offset..

* @return the union of buckets exposed via {@link #buckets()} and buckets that only have an
* advanced {@code nextLogOffset}.
*/
public Set<TableBucket> polledBuckets() {
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm wondering whether can we just remove this method?
We can put empty records(no actaull record, but with offset advance) into records?

*/
@Nullable
public Long nextLogOffset(TableBucket bucket) {
return nextLogOffsets.get(bucket);
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

when bucket was not polled in this round?
If not polled in this round, will it be put into nextLogOffsets?

}

/**
* Regression test for <a href="https://github.com/apache/fluss/issues/2371">Issue #2371</a>:
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

comments is too long to me.

continue;
}
LOG.info("tiering table bucket is not empty {}.", bucket);
// Iterate polledBuckets() (instead of buckets()) so that buckets which only produced
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm wondering if we can change it like following, which I think may be more clear:

for (TableBucket bucket : scanRecords.polledBuckets()) {
            LOG.info("tiering table bucket {}.", bucket);
            List<ScanRecord> bucketScanRecords = scanRecords.records(bucket);

            // no any stopping offset, just skip handle the records for the bucket
            Long stoppingOffset = currentTableStoppingOffsets.get(bucket);
            if (stoppingOffset == null) {
                continue;
            }
            LOG.info("tiering table bucket stoppingOffset is not empty {}.", bucket);

            ScanRecord lastRecord = null;
            if (!bucketScanRecords.isEmpty()) {
                LOG.info("tiering table bucket is not empty {}.", bucket);
                LakeWriter<WriteResult> lakeWriter =
                        getOrCreateLakeWriter(
                                bucket, currentTableSplitsByBucket.get(bucket).getPartitionName());
                for (ScanRecord record : bucketScanRecords) {
                    // Only tier records that are within the split range [start, stoppingOffset).
                    if (record.logOffset() < stoppingOffset) {
                        lakeWriter.write(record);
                        if (record.getSizeInBytes() > 0) {
                            tieringMetrics.recordBytesRead(record.getSizeInBytes());
                        }
                    }
                }
                lastRecord = bucketScanRecords.get(bucketScanRecords.size() - 1);
            }

            Long nextFetchOffset = scanRecords.nextLogOffset(bucket);

            // Prefer the scanner-reported next fetch offset because it advances even when this poll
            // round only observes empty WAL batches. Fall back to the last materialized record
            // offset
            // for callers that do not provide nextFetchOffset.
            boolean reachedEnd =
                    nextFetchOffset != null
                            ? nextFetchOffset >= stoppingOffset
                            : lastRecord != null && lastRecord.logOffset() >= stoppingOffset - 1;
            if (!reachedEnd && lastRecord == null) {
                continue;
            }

            // Track the latest tiered offset/timestamp for this bucket.
            // Once the split reaches the end, the correct tiered offset is stoppingOffset - 1,
            // because stoppingOffset is exclusive and records at/after it are not part of this
            // split.
            long tieredOffset = reachedEnd ? stoppingOffset - 1 : lastRecord.logOffset();
            long tieredTimestamp;
            if (lastRecord != null) {
                tieredTimestamp = lastRecord.timestamp();
            } else {
                LogOffsetAndTimestamp latest = currentTableTieredOffsetAndTimestamp.get(bucket);
                tieredTimestamp = latest != null ? latest.timestamp : UNKNOWN_BUCKET_TIMESTAMP;
            }
            currentTableTieredOffsetAndTimestamp.put(
                    bucket, new LogOffsetAndTimestamp(tieredOffset, tieredTimestamp));

            if (!reachedEnd) {
                continue;
            }

            currentTableStoppingOffsets.remove(bucket);
            if (bucket.getPartitionId() != null) {
                currentLogScanner.unsubscribe(bucket.getPartitionId(), bucket.getBucket());
            } else {
                // todo: should unsubscribe the log split if unsubscribe bucket for
                // un-partitioned table is supported
            }
            TieringSplit currentTieringSplit = currentTableSplitsByBucket.remove(bucket);
            String currentSplitId = currentTieringSplit.splitId();
            writeResults.put(
                    bucket,
                    completeLakeWriter(
                            bucket,
                            currentTieringSplit.getPartitionName(),
                            stoppingOffset,
                            tieredTimestamp));
            finishedSplitIds.put(bucket, currentSplitId);
            LOG.info(
                    "Finish tier bucket {} for table {}, split: {}.",
                    bucket,
                    currentTablePath,
                    currentSplitId);
        }

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

Tiering may stuck for first row merge engine table

3 participants