From 8c5a448ae188d2e079fa9fe4c55077247157db62 Mon Sep 17 00:00:00 2001 From: luoyuxia Date: Sat, 9 May 2026 15:16:32 +0800 Subject: [PATCH 1/2] [flink] Delay lake writer creation in tiering reader --- .../fluss/flink/tiering/source/TieringSplitReader.java | 10 +++++++--- 1 file changed, 7 insertions(+), 3 deletions(-) diff --git a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/tiering/source/TieringSplitReader.java b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/tiering/source/TieringSplitReader.java index 3a760110e5..59dbee7118 100644 --- a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/tiering/source/TieringSplitReader.java +++ b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/tiering/source/TieringSplitReader.java @@ -369,12 +369,16 @@ private RecordsWithSplitIds> forLogRecords( continue; } LOG.info("tiering table bucket stoppingOffset is not empty {}.", bucket); - LakeWriter lakeWriter = - getOrCreateLakeWriter( - bucket, currentTableSplitsByBucket.get(bucket).getPartitionName()); + LakeWriter lakeWriter = null; for (ScanRecord record : bucketScanRecords) { // if record is less than stopping offset if (record.logOffset() < stoppingOffset) { + if (lakeWriter == null) { + lakeWriter = + getOrCreateLakeWriter( + bucket, + currentTableSplitsByBucket.get(bucket).getPartitionName()); + } lakeWriter.write(record); if (record.getSizeInBytes() > 0) { tieringMetrics.recordBytesRead(record.getSizeInBytes()); From deb3be373228257280107508a647a4eb6b5ab16d Mon Sep 17 00:00:00 2001 From: luoyuxia Date: Sat, 9 May 2026 17:38:06 +0800 Subject: [PATCH 2/2] address anton comments --- .../tiering/source/TieringSplitReader.java | 13 ++- .../source/TieringSplitReaderTest.java | 95 +++++++++++++++++++ 2 files changed, 101 insertions(+), 7 deletions(-) diff --git a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/tiering/source/TieringSplitReader.java b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/tiering/source/TieringSplitReader.java index 59dbee7118..d59787e15d 100644 --- a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/tiering/source/TieringSplitReader.java +++ b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/tiering/source/TieringSplitReader.java @@ -354,21 +354,17 @@ private RecordsWithSplitIds> forLogRecords( ScanRecords scanRecords) throws IOException { Map> writeResults = new HashMap<>(); Map finishedSplitIds = new HashMap<>(); - LOG.info("for log records to tier table {}.", currentTableId); for (TableBucket bucket : scanRecords.buckets()) { - LOG.info("tiering table bucket {}.", bucket); List bucketScanRecords = scanRecords.records(bucket); if (bucketScanRecords.isEmpty()) { continue; } - LOG.info("tiering table bucket is not empty {}.", bucket); // no any stopping offset, just skip handle the records for the bucket Long stoppingOffset = currentTableStoppingOffsets.get(bucket); if (stoppingOffset == null) { continue; } - LOG.info("tiering table bucket stoppingOffset is not empty {}.", bucket); LakeWriter lakeWriter = null; for (ScanRecord record : bucketScanRecords) { // if record is less than stopping offset @@ -514,11 +510,14 @@ private TableBucketWriteResultWithSplitIds finishCurrentSnapshotSplit() throws I private TableBucketWriteResultWithSplitIds forSnapshotSplitRecords( TableBucket bucket, CloseableIterator recordIterator) throws IOException { - LakeWriter lakeWriter = - getOrCreateLakeWriter( - bucket, checkNotNull(currentSnapshotSplit).getPartitionName()); + LakeWriter lakeWriter = null; while (recordIterator.hasNext()) { ScanRecord scanRecord = recordIterator.next().record(); + if (lakeWriter == null) { + lakeWriter = + getOrCreateLakeWriter( + bucket, checkNotNull(currentSnapshotSplit).getPartitionName()); + } lakeWriter.write(scanRecord); if (scanRecord.getSizeInBytes() > 0) { tieringMetrics.recordBytesRead(scanRecord.getSizeInBytes()); diff --git a/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/tiering/source/TieringSplitReaderTest.java b/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/tiering/source/TieringSplitReaderTest.java index 447ada7a7a..171f521e00 100644 --- a/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/tiering/source/TieringSplitReaderTest.java +++ b/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/tiering/source/TieringSplitReaderTest.java @@ -31,8 +31,12 @@ import org.apache.fluss.flink.tiering.source.split.TieringSnapshotSplit; import org.apache.fluss.flink.tiering.source.split.TieringSplit; import org.apache.fluss.flink.utils.FlinkTestBase; +import org.apache.fluss.lake.writer.LakeWriter; +import org.apache.fluss.lake.writer.WriterInitContext; import org.apache.fluss.metadata.TableBucket; +import org.apache.fluss.metadata.TableDescriptor; import org.apache.fluss.metadata.TablePath; +import org.apache.fluss.record.LogRecord; import org.apache.fluss.row.InternalRow; import org.apache.fluss.row.encode.CompactedKeyEncoder; @@ -279,6 +283,58 @@ void testTieringMixTables() throws Exception { } } + @Test + void testLogSplitWithoutWritableRecordsCanCompleteLakeWriter() throws Exception { + TablePath tablePath = TablePath.of("fluss", "tiering_table_without_writable_records"); + TableDescriptor singleBucketPkTableDescriptor = + TableDescriptor.builder() + .schema(DEFAULT_PK_TABLE_SCHEMA) + .distributedBy(1, "id") + .build(); + long tableId = createTable(tablePath, singleBucketPkTableDescriptor); + try (Connection connection = + ConnectionFactory.createConnection( + FLUSS_CLUSTER_EXTENSION.getClientConfig()); + Table table = connection.getTable(tablePath); + TieringSplitReader tieringSplitReader = + createTieringReader( + connection, new ThrowOnEmptyCompleteLakeTieringFactory())) { + int key1 = 1; + int key2 = 2; + int bucket = 0; + TableBucket tableBucket = new TableBucket(tableId, bucket); + + UpsertWriter upsertWriter = table.newUpsert().createWriter(); + // produce offset0 + upsertWriter.upsert(row(key1, "v1")).get(); + // Deleting a non-existent key still advances the log offset, but this range does not + // produce any tierable record. produce offset1 + upsertWriter.delete(row(key2, (Object) null)).get(); + // produce offset2 + upsertWriter.upsert(row(key2, "v2")).get(); + + long startingOffset = 1; + long stoppingOffset = 2; + TieringLogSplit tieringLogSplit = + new TieringLogSplit( + tablePath, tableBucket, null, startingOffset, stoppingOffset, 1); + + // The custom factory fails if complete() is called on a writer that never received any + // record, which captures the regression this test covers. + tieringSplitReader.handleSplitsChanges( + new SplitsAddition(Collections.singletonList(tieringLogSplit))); + + RecordsWithSplitIds> result = + tieringSplitReader.fetch(); + + assertThat(result.nextSplit()).isEqualTo(tieringLogSplit.splitId()); + TableBucketWriteResult writeResult = result.nextRecordFromSplit(); + assertThat(writeResult).isNotNull(); + // expect null write result since no any records written + assertThat(writeResult.writeResult()).isNull(); + } + } + private TieringSplitReader createTieringReader(Connection connection) { final TieringMetrics tieringMetrics = new TieringMetrics( @@ -288,6 +344,15 @@ private TieringSplitReader createTieringReader(Connection co connection, new TestingLakeTieringFactory(), tieringMetrics); } + private TieringSplitReader createTieringReader( + Connection connection, TestingLakeTieringFactory lakeTieringFactory) { + final TieringMetrics tieringMetrics = + new TieringMetrics( + InternalSourceReaderMetricGroup.mock( + new MetricListener().getMetricGroup())); + return new TieringSplitReader<>(connection, lakeTieringFactory, tieringMetrics); + } + private void verifyTieringRows( TieringSplitReader tieringSplitReader, long tableId, @@ -384,4 +449,34 @@ private static int getBucketId(InternalRow row) { HashBucketAssigner hashBucketAssigner = new HashBucketAssigner(DEFAULT_BUCKET_NUM); return hashBucketAssigner.assignBucket(key); } + + private static class ThrowOnEmptyCompleteLakeTieringFactory extends TestingLakeTieringFactory { + + @Override + public LakeWriter createLakeWriter(WriterInitContext writerInitContext) + throws IOException { + return new ThrowOnEmptyCompleteLakeWriter(); + } + } + + private static class ThrowOnEmptyCompleteLakeWriter implements LakeWriter { + + private int writtenRecords; + + @Override + public void write(LogRecord record) throws IOException { + writtenRecords++; + } + + @Override + public TestingWriteResult complete() throws IOException { + if (writtenRecords == 0) { + throw new IOException("complete called without any written records"); + } + return new TestingWriteResult(writtenRecords); + } + + @Override + public void close() throws IOException {} + } }