diff --git a/fluss-client/src/main/java/org/apache/fluss/client/table/scanner/log/LogFetchCollector.java b/fluss-client/src/main/java/org/apache/fluss/client/table/scanner/log/LogFetchCollector.java index 05c43eca08..13b41e3e3c 100644 --- a/fluss-client/src/main/java/org/apache/fluss/client/table/scanner/log/LogFetchCollector.java +++ b/fluss-client/src/main/java/org/apache/fluss/client/table/scanner/log/LogFetchCollector.java @@ -77,14 +77,13 @@ public LogFetchCollector( /** * Return the fetched log records, empty the record buffer and update the consumed position. * - *

NOTE: returning empty records guarantees the consumed position are NOT updated. - * * @return The fetched records per partition * @throws LogOffsetOutOfRangeException If there is OffsetOutOfRange error in fetchResponse and * the defaultResetPolicy is NONE */ public ScanRecords collectFetch(final LogFetchBuffer logFetchBuffer) { Map> fetched = new HashMap<>(); + Map lastConsumedOffsets = new HashMap<>(); int recordsRemaining = maxPollRecords; try { @@ -120,8 +119,11 @@ public ScanRecords collectFetch(final LogFetchBuffer logFetchBuffer) { logFetchBuffer.poll(); } else { List records = fetchRecords(nextInLineFetch, recordsRemaining); + TableBucket tableBucket = nextInLineFetch.tableBucket; + // Always record the advanced next fetch offset for this bucket, even when + // the materialized record list is empty. + lastConsumedOffsets.put(tableBucket, nextInLineFetch.nextFetchOffset()); if (!records.isEmpty()) { - TableBucket tableBucket = nextInLineFetch.tableBucket; List currentRecords = fetched.get(tableBucket); if (currentRecords == null) { fetched.put(tableBucket, records); @@ -147,7 +149,11 @@ public ScanRecords collectFetch(final LogFetchBuffer logFetchBuffer) { } } - return new ScanRecords(fetched); + // Ensure every polled bucket appears in fetched so that buckets() reflects the polled set. + for (TableBucket polled : lastConsumedOffsets.keySet()) { + fetched.putIfAbsent(polled, Collections.emptyList()); + } + return new ScanRecords(fetched, lastConsumedOffsets); } private List fetchRecords(CompletedFetch nextInLineFetch, int maxRecords) { diff --git a/fluss-client/src/main/java/org/apache/fluss/client/table/scanner/log/LogScannerImpl.java b/fluss-client/src/main/java/org/apache/fluss/client/table/scanner/log/LogScannerImpl.java index 9a2dbf0b4c..0ee14a78bd 100644 --- a/fluss-client/src/main/java/org/apache/fluss/client/table/scanner/log/LogScannerImpl.java +++ b/fluss-client/src/main/java/org/apache/fluss/client/table/scanner/log/LogScannerImpl.java @@ -142,7 +142,8 @@ public ScanRecords poll(Duration timeout) { long startNanos = System.nanoTime(); do { ScanRecords scanRecords = pollForFetches(); - if (scanRecords.isEmpty()) { + // Gate on buckets() rather than isEmpty() so progress-only polls reach the caller. + if (scanRecords.buckets().isEmpty()) { try { if (!logFetcher.awaitNotEmpty(startNanos + timeoutNanos)) { // logFetcher waits for the timeout and no data in buffer, @@ -249,7 +250,8 @@ public void wakeup() { private ScanRecords pollForFetches() { ScanRecords scanRecords = logFetcher.collectFetch(); - if (!scanRecords.isEmpty()) { + // Check buckets() (includes progress-only buckets). + if (!scanRecords.buckets().isEmpty()) { return scanRecords; } diff --git a/fluss-client/src/main/java/org/apache/fluss/client/table/scanner/log/ScanRecords.java b/fluss-client/src/main/java/org/apache/fluss/client/table/scanner/log/ScanRecords.java index 9d58c22b49..a46d10d932 100644 --- a/fluss-client/src/main/java/org/apache/fluss/client/table/scanner/log/ScanRecords.java +++ b/fluss-client/src/main/java/org/apache/fluss/client/table/scanner/log/ScanRecords.java @@ -22,6 +22,8 @@ import org.apache.fluss.metadata.TableBucket; import org.apache.fluss.utils.AbstractIterator; +import javax.annotation.Nullable; + import java.util.Collections; import java.util.Iterator; import java.util.List; @@ -41,8 +43,18 @@ public class ScanRecords implements Iterable { private final Map> records; + /** The exclusive upper bound of consumed offsets per polled bucket in this round. */ + private final Map lastConsumedOffsets; + public ScanRecords(Map> records) { + this(records, Collections.emptyMap()); + } + + public ScanRecords( + Map> records, + Map lastConsumedOffsets) { this.records = records; + this.lastConsumedOffsets = lastConsumedOffsets; } /** @@ -59,15 +71,25 @@ public List records(TableBucket scanBucket) { } /** - * Get the bucket ids which have records contained in this record set. - * - * @return the set of partitions with data in this record set (maybe empty if no data was - * returned) + * Get the bucket ids that were polled in this round, including buckets whose record list is + * empty but whose log offset still advanced. */ public Set buckets() { return Collections.unmodifiableSet(records.keySet()); } + /** + * Get the exclusive upper bound of offsets consumed for the given bucket in this poll round. + * + * @param bucket the bucket to query + * @return the exclusive upper bound offset, or {@code null} if the bucket was not polled in + * this round + */ + @Nullable + public Long lastConsumedOffset(TableBucket bucket) { + return lastConsumedOffsets.get(bucket); + } + /** The number of records for all buckets. */ public int count() { int count = 0; @@ -77,8 +99,9 @@ public int count() { return count; } + /** Returns {@code true} if this {@code ScanRecords} contains no materialized records. */ public boolean isEmpty() { - return records.isEmpty(); + return count() == 0; } @Override diff --git a/fluss-client/src/test/java/org/apache/fluss/client/table/scanner/log/LogFetchCollectorTest.java b/fluss-client/src/test/java/org/apache/fluss/client/table/scanner/log/LogFetchCollectorTest.java index e769ebf4ee..c7cce75c2a 100644 --- a/fluss-client/src/test/java/org/apache/fluss/client/table/scanner/log/LogFetchCollectorTest.java +++ b/fluss-client/src/test/java/org/apache/fluss/client/table/scanner/log/LogFetchCollectorTest.java @@ -258,6 +258,9 @@ void testFilteredEmptyResponseAdvancesOffset() { assertThat(scanRecords.records(tb)).isEmpty(); assertThat(logScannerStatus.getBucketOffset(tb)).isEqualTo(20L); assertThat(completedFetch.isConsumed()).isTrue(); + // Empty record list, but bucket exposed via buckets() with an advanced lastConsumedOffset. + assertThat(scanRecords.buckets()).contains(tb); + assertThat(scanRecords.lastConsumedOffset(tb)).isEqualTo(20L); } private DefaultCompletedFetch makeCompletedFetch( diff --git a/fluss-client/src/test/java/org/apache/fluss/client/table/scanner/log/LogFetcherITCase.java b/fluss-client/src/test/java/org/apache/fluss/client/table/scanner/log/LogFetcherITCase.java index 50addfcfe0..147be7922c 100644 --- a/fluss-client/src/test/java/org/apache/fluss/client/table/scanner/log/LogFetcherITCase.java +++ b/fluss-client/src/test/java/org/apache/fluss/client/table/scanner/log/LogFetcherITCase.java @@ -149,7 +149,10 @@ void testFetchWithSchemaChange() throws Exception { assertThat(logFetcher.getCompletedFetchesSize()).isEqualTo(2); }); ScanRecords records = logFetcher.collectFetch(); - assertThat(records.buckets().size()).isEqualTo(1); + // Both polled buckets are exposed; tb1 was polled but produced no records. + TableBucket tb1 = new TableBucket(tableId, bucketId1); + assertThat(records.buckets()).containsExactlyInAnyOrder(tb0, tb1); + assertThat(records.records(tb1)).isEmpty(); List scanRecords = records.records(tb0); assertThat(scanRecords.stream().map(ScanRecord::getRow).collect(Collectors.toList())) .isEqualTo(expectedRows); @@ -195,7 +198,8 @@ void testFetchWithSchemaChange() throws Exception { assertThat(newSchemaLogFetcher.getCompletedFetchesSize()).isEqualTo(2); }); records = newSchemaLogFetcher.collectFetch(); - assertThat(records.buckets().size()).isEqualTo(1); + assertThat(records.buckets()).containsExactlyInAnyOrder(tb0, tb1); + assertThat(records.records(tb1)).isEmpty(); assertThat(records.records(tb0)).hasSize(20); scanRecords = records.records(tb0); assertThat(scanRecords.stream().map(ScanRecord::getRow).collect(Collectors.toList())) diff --git a/fluss-client/src/test/java/org/apache/fluss/client/table/scanner/log/ScanRecordsTest.java b/fluss-client/src/test/java/org/apache/fluss/client/table/scanner/log/ScanRecordsTest.java index db4a326676..a71d3467b0 100644 --- a/fluss-client/src/test/java/org/apache/fluss/client/table/scanner/log/ScanRecordsTest.java +++ b/fluss-client/src/test/java/org/apache/fluss/client/table/scanner/log/ScanRecordsTest.java @@ -25,6 +25,8 @@ import java.util.ArrayList; import java.util.Arrays; +import java.util.Collections; +import java.util.HashMap; import java.util.Iterator; import java.util.LinkedHashMap; import java.util.List; @@ -57,4 +59,83 @@ void iterator() { } assertThat(c).isEqualTo(4); } + + /** Verifies the legacy single-arg constructor leaves {@code lastConsumedOffset} undefined. */ + @Test + void legacyConstructorHasNoLastConsumedOffset() { + TableBucket tb = new TableBucket(0L, 0); + Map> records = new HashMap<>(); + records.put(tb, Collections.emptyList()); + + ScanRecords scanRecords = new ScanRecords(records); + + assertThat(scanRecords.buckets()).containsExactly(tb); + assertThat(scanRecords.lastConsumedOffset(tb)).isNull(); + } + + /** + * Verifies buckets with only advanced offsets are still exposed via {@link + * ScanRecords#buckets()} and carry their {@code lastConsumedOffset}. + */ + @Test + void emptyBucketIsExposedViaBuckets() { + TableBucket bucketWithRecords = new TableBucket(0L, 0); + TableBucket emptyBucket = new TableBucket(0L, 1); + + Map> records = new HashMap<>(); + records.put( + bucketWithRecords, + Collections.singletonList( + new ScanRecord(5L, 1000L, ChangeType.INSERT, row(1, "a")))); + // Empty-progress buckets also appear in records (as emptyList). + records.put(emptyBucket, Collections.emptyList()); + + Map lastConsumedOffsets = new HashMap<>(); + lastConsumedOffsets.put(bucketWithRecords, 6L); + lastConsumedOffsets.put(emptyBucket, 10L); + + ScanRecords scanRecords = new ScanRecords(records, lastConsumedOffsets); + + assertThat(scanRecords.buckets()).containsExactlyInAnyOrder(bucketWithRecords, emptyBucket); + assertThat(scanRecords.records(emptyBucket)).isEmpty(); + assertThat(scanRecords.lastConsumedOffset(bucketWithRecords)).isEqualTo(6L); + assertThat(scanRecords.lastConsumedOffset(emptyBucket)).isEqualTo(10L); + assertThat(scanRecords.lastConsumedOffset(new TableBucket(0L, 99))).isNull(); + } + + /** + * Verifies {@link ScanRecords#isEmpty()} reflects only materialized records, regardless of + * advanced offsets. + */ + @Test + void isEmptyReflectsOnlyMaterializedRecords() { + TableBucket tb = new TableBucket(0L, 0); + + // No records and no progress: both isEmpty() and buckets() must be empty. + ScanRecords trulyEmpty = ScanRecords.EMPTY; + assertThat(trulyEmpty.isEmpty()).isTrue(); + assertThat(trulyEmpty.buckets()).isEmpty(); + + // Progress-only round: isEmpty() must stay true (no materialized records), + // while buckets() must still expose the advanced bucket for callers to detect. + Map> emptyRecords = new HashMap<>(); + emptyRecords.put(tb, Collections.emptyList()); + Map progressOnly = new HashMap<>(); + progressOnly.put(tb, 42L); + ScanRecords progressOnlyRecords = new ScanRecords(emptyRecords, progressOnly); + assertThat(progressOnlyRecords.isEmpty()).isTrue(); + assertThat(progressOnlyRecords.buckets()).containsExactly(tb); + assertThat(progressOnlyRecords.lastConsumedOffset(tb)).isEqualTo(42L); + + // Materialized records present: isEmpty() flips to false; legacy single-arg ctor still + // works. + Map> records = new HashMap<>(); + records.put( + tb, + Collections.singletonList( + new ScanRecord(0L, 1000L, ChangeType.INSERT, row(1, "a")))); + ScanRecords withRecords = new ScanRecords(records); + assertThat(withRecords.isEmpty()).isFalse(); + assertThat(withRecords.buckets()).containsExactly(tb); + } } diff --git a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/tiering/source/TieringSplitReader.java b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/tiering/source/TieringSplitReader.java index d59787e15d..73f4ead211 100644 --- a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/tiering/source/TieringSplitReader.java +++ b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/tiering/source/TieringSplitReader.java @@ -355,63 +355,89 @@ private RecordsWithSplitIds> forLogRecords( Map> writeResults = new HashMap<>(); Map finishedSplitIds = new HashMap<>(); + // Iterate every polled bucket, including those that only advanced their offset. for (TableBucket bucket : scanRecords.buckets()) { - List bucketScanRecords = scanRecords.records(bucket); - if (bucketScanRecords.isEmpty()) { - continue; - } // no any stopping offset, just skip handle the records for the bucket Long stoppingOffset = currentTableStoppingOffsets.get(bucket); if (stoppingOffset == null) { continue; } + + List bucketScanRecords = scanRecords.records(bucket); + ScanRecord lastRecord = null; LakeWriter lakeWriter = null; - for (ScanRecord record : bucketScanRecords) { - // if record is less than stopping offset - if (record.logOffset() < stoppingOffset) { - if (lakeWriter == null) { - lakeWriter = - getOrCreateLakeWriter( - bucket, - currentTableSplitsByBucket.get(bucket).getPartitionName()); - } - lakeWriter.write(record); - if (record.getSizeInBytes() > 0) { - tieringMetrics.recordBytesRead(record.getSizeInBytes()); + + if (!bucketScanRecords.isEmpty()) { + for (ScanRecord record : bucketScanRecords) { + // only tier records within the split range [start, stoppingOffset). + if (record.logOffset() < stoppingOffset) { + if (lakeWriter == null) { + lakeWriter = + getOrCreateLakeWriter( + bucket, + currentTableSplitsByBucket + .get(bucket) + .getPartitionName()); + } + lakeWriter.write(record); + if (record.getSizeInBytes() > 0) { + tieringMetrics.recordBytesRead(record.getSizeInBytes()); + } } } + lastRecord = bucketScanRecords.get(bucketScanRecords.size() - 1); + } + + // Prefer the scanner-reported lastConsumedOffset; fall back to the last record offset. + Long lastConsumedOffset = scanRecords.lastConsumedOffset(bucket); + boolean reachedEnd = + lastConsumedOffset != null + ? lastConsumedOffset >= stoppingOffset + : lastRecord != null && lastRecord.logOffset() >= stoppingOffset - 1; + + if (!reachedEnd && lastRecord == null) { + continue; + } + + // When reachedEnd, the tiered offset is stoppingOffset - 1 (exclusive bound). + // Timestamp prefers the current record, then the previously tracked value. + long tieredOffset = reachedEnd ? stoppingOffset - 1 : lastRecord.logOffset(); + long tieredTimestamp; + if (lastRecord != null) { + tieredTimestamp = lastRecord.timestamp(); + } else { + LogOffsetAndTimestamp latest = currentTableTieredOffsetAndTimestamp.get(bucket); + tieredTimestamp = latest != null ? latest.timestamp : UNKNOWN_BUCKET_TIMESTAMP; } - ScanRecord lastRecord = bucketScanRecords.get(bucketScanRecords.size() - 1); currentTableTieredOffsetAndTimestamp.put( - bucket, - new LogOffsetAndTimestamp(lastRecord.logOffset(), lastRecord.timestamp())); - // has arrived into the end of the split, - if (lastRecord.logOffset() >= stoppingOffset - 1) { - currentTableStoppingOffsets.remove(bucket); - if (bucket.getPartitionId() != null) { - currentLogScanner.unsubscribe(bucket.getPartitionId(), bucket.getBucket()); - } else { - // todo: should unsubscribe the log split if unsubscribe bucket for - // un-partitioned table is supported - } - TieringSplit currentTieringSplit = currentTableSplitsByBucket.remove(bucket); - String currentSplitId = currentTieringSplit.splitId(); - // put write result of the bucket - writeResults.put( - bucket, - completeLakeWriter( - bucket, - currentTieringSplit.getPartitionName(), - stoppingOffset, - lastRecord.timestamp())); - // put split of the bucket - finishedSplitIds.put(bucket, currentSplitId); - LOG.info( - "Finish tier bucket {} for table {}, split: {}.", - bucket, - currentTablePath, - currentSplitId); + bucket, new LogOffsetAndTimestamp(tieredOffset, tieredTimestamp)); + + if (!reachedEnd) { + continue; } + + currentTableStoppingOffsets.remove(bucket); + if (bucket.getPartitionId() != null) { + currentLogScanner.unsubscribe(bucket.getPartitionId(), bucket.getBucket()); + } else { + // todo: should unsubscribe the log split if unsubscribe bucket for + // un-partitioned table is supported + } + TieringSplit currentTieringSplit = currentTableSplitsByBucket.remove(bucket); + String currentSplitId = currentTieringSplit.splitId(); + writeResults.put( + bucket, + completeLakeWriter( + bucket, + currentTieringSplit.getPartitionName(), + stoppingOffset, + tieredTimestamp)); + finishedSplitIds.put(bucket, currentSplitId); + LOG.info( + "Finish tier bucket {} for table {}, split: {}.", + bucket, + currentTablePath, + currentSplitId); } if (!finishedSplitIds.isEmpty()) { diff --git a/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/tiering/source/TieringSplitReaderTest.java b/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/tiering/source/TieringSplitReaderTest.java index 171f521e00..9aad5d1c3d 100644 --- a/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/tiering/source/TieringSplitReaderTest.java +++ b/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/tiering/source/TieringSplitReaderTest.java @@ -24,6 +24,7 @@ import org.apache.fluss.client.table.writer.TableWriter; import org.apache.fluss.client.table.writer.UpsertWriter; import org.apache.fluss.client.write.HashBucketAssigner; +import org.apache.fluss.config.ConfigOptions; import org.apache.fluss.flink.tiering.TestingLakeTieringFactory; import org.apache.fluss.flink.tiering.TestingWriteResult; import org.apache.fluss.flink.tiering.source.metrics.TieringMetrics; @@ -33,12 +34,14 @@ import org.apache.fluss.flink.utils.FlinkTestBase; import org.apache.fluss.lake.writer.LakeWriter; import org.apache.fluss.lake.writer.WriterInitContext; +import org.apache.fluss.metadata.MergeEngineType; import org.apache.fluss.metadata.TableBucket; import org.apache.fluss.metadata.TableDescriptor; import org.apache.fluss.metadata.TablePath; import org.apache.fluss.record.LogRecord; import org.apache.fluss.row.InternalRow; import org.apache.fluss.row.encode.CompactedKeyEncoder; +import org.apache.fluss.server.replica.Replica; import org.apache.flink.api.connector.source.SourceSplit; import org.apache.flink.connector.base.source.reader.RecordsWithSplitIds; @@ -335,6 +338,93 @@ connection, new ThrowOnEmptyCompleteLakeTieringFactory())) { } } + /** + * Verifies that the tiering service finishes under {@code first_row} merge engine even when + * duplicate upserts produce empty WAL batches. + */ + @Test + void testTieringFirstRowMergeEngineFinishes() throws Exception { + TablePath tablePath = TablePath.of("fluss", "tiering_first_row_finish"); + TableDescriptor descriptor = + TableDescriptor.builder() + .schema(DEFAULT_PK_TABLE_SCHEMA) + .distributedBy(DEFAULT_BUCKET_NUM, "id") + .property(ConfigOptions.TABLE_MERGE_ENGINE, MergeEngineType.FIRST_ROW) + .build(); + long tableId = createTable(tablePath, descriptor); + + // Duplicate upserts under FIRST_ROW: only the first per id yields a CDC + // record, the rest become empty WAL batches that still advance the offset. + int distinctKeys = 5; + int duplicatesPerKey = 10; + try (Table table = conn.getTable(tablePath)) { + for (int round = 0; round < duplicatesPerKey; round++) { + UpsertWriter writer = table.newUpsert().createWriter(); + for (int id = 0; id < distinctKeys; id++) { + writer.upsert(row(id, "v" + round)); + } + writer.flush(); + } + } + + // Build log splits whose stoppingOffset equals the leader's current logEndOffset. + List logSplits = new ArrayList<>(); + Set splitIds = new HashSet<>(); + long totalLogEndOffset = 0L; + for (int bucket = 0; bucket < DEFAULT_BUCKET_NUM; bucket++) { + TableBucket tb = new TableBucket(tableId, bucket); + Replica leader = FLUSS_CLUSTER_EXTENSION.waitAndGetLeaderReplica(tb); + long stoppingOffset = leader.getLogTablet().localLogEndOffset(); + totalLogEndOffset += stoppingOffset; + if (stoppingOffset <= 0) { + continue; + } + TieringLogSplit split = + createLogSplit(tablePath, tableId, bucket, EARLIEST_OFFSET, stoppingOffset); + logSplits.add(split); + splitIds.add(split.splitId()); + } + assertThat(logSplits).isNotEmpty(); + // Pre-condition: total log offsets must exceed distinct-key count, otherwise + // no empty batch was produced. + assertThat(totalLogEndOffset) + .as( + "Expected logEndOffset (%d) to exceed distinctKeys (%d) so that " + + "empty batches are produced under FIRST_ROW", + totalLogEndOffset, distinctKeys) + .isGreaterThan(distinctKeys); + + try (Connection connection = + ConnectionFactory.createConnection( + FLUSS_CLUSTER_EXTENSION.getClientConfig()); + TieringSplitReader tieringSplitReader = + createTieringReader(connection)) { + tieringSplitReader.handleSplitsChanges(new SplitsAddition<>(logSplits)); + + // With the fix every split must finish within a few fetch rounds. + Set finished = new HashSet<>(); + int maxRounds = 10; + for (int i = 0; i < maxRounds && !finished.containsAll(splitIds); i++) { + RecordsWithSplitIds> fetchResult = + tieringSplitReader.fetch(); + finished.addAll(fetchResult.finishedSplits()); + // drain the iterator so that the reader advances internal state + while (fetchResult.nextSplit() != null) { + while (fetchResult.nextRecordFromSplit() != null) { + // consume + } + } + } + + assertThat(finished) + .as( + "All tiering splits must finish under FIRST_ROW merge engine " + + "with duplicate keys. Finished: %s, expected: %s", + finished, splitIds) + .containsAll(splitIds); + } + } + private TieringSplitReader createTieringReader(Connection connection) { final TieringMetrics tieringMetrics = new TieringMetrics(