Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -354,27 +354,27 @@ private RecordsWithSplitIds<TableBucketWriteResult<WriteResult>> forLogRecords(
ScanRecords scanRecords) throws IOException {
Map<TableBucket, TableBucketWriteResult<WriteResult>> writeResults = new HashMap<>();
Map<TableBucket, String> finishedSplitIds = new HashMap<>();
LOG.info("for log records to tier table {}.", currentTableId);

for (TableBucket bucket : scanRecords.buckets()) {
LOG.info("tiering table bucket {}.", bucket);
List<ScanRecord> bucketScanRecords = scanRecords.records(bucket);
if (bucketScanRecords.isEmpty()) {
continue;
}
LOG.info("tiering table bucket is not empty {}.", bucket);
// no any stopping offset, just skip handle the records for the bucket
Long stoppingOffset = currentTableStoppingOffsets.get(bucket);
if (stoppingOffset == null) {
continue;
}
LOG.info("tiering table bucket stoppingOffset is not empty {}.", bucket);
LakeWriter<WriteResult> lakeWriter =
getOrCreateLakeWriter(
bucket, currentTableSplitsByBucket.get(bucket).getPartitionName());
LakeWriter<WriteResult> lakeWriter = null;
for (ScanRecord record : bucketScanRecords) {
// if record is less than stopping offset
if (record.logOffset() < stoppingOffset) {
if (lakeWriter == null) {
lakeWriter =
getOrCreateLakeWriter(
bucket,
currentTableSplitsByBucket.get(bucket).getPartitionName());
}
lakeWriter.write(record);
if (record.getSizeInBytes() > 0) {
tieringMetrics.recordBytesRead(record.getSizeInBytes());
Expand Down Expand Up @@ -510,11 +510,14 @@ private TableBucketWriteResultWithSplitIds finishCurrentSnapshotSplit() throws I

private TableBucketWriteResultWithSplitIds forSnapshotSplitRecords(
TableBucket bucket, CloseableIterator<RecordAndPos> recordIterator) throws IOException {
LakeWriter<WriteResult> lakeWriter =
getOrCreateLakeWriter(
bucket, checkNotNull(currentSnapshotSplit).getPartitionName());
LakeWriter<WriteResult> lakeWriter = null;
while (recordIterator.hasNext()) {
ScanRecord scanRecord = recordIterator.next().record();
if (lakeWriter == null) {
lakeWriter =
getOrCreateLakeWriter(
bucket, checkNotNull(currentSnapshotSplit).getPartitionName());
}
lakeWriter.write(scanRecord);
if (scanRecord.getSizeInBytes() > 0) {
tieringMetrics.recordBytesRead(scanRecord.getSizeInBytes());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,8 +31,12 @@
import org.apache.fluss.flink.tiering.source.split.TieringSnapshotSplit;
import org.apache.fluss.flink.tiering.source.split.TieringSplit;
import org.apache.fluss.flink.utils.FlinkTestBase;
import org.apache.fluss.lake.writer.LakeWriter;
import org.apache.fluss.lake.writer.WriterInitContext;
import org.apache.fluss.metadata.TableBucket;
import org.apache.fluss.metadata.TableDescriptor;
import org.apache.fluss.metadata.TablePath;
import org.apache.fluss.record.LogRecord;
import org.apache.fluss.row.InternalRow;
import org.apache.fluss.row.encode.CompactedKeyEncoder;

Expand Down Expand Up @@ -279,6 +283,58 @@ void testTieringMixTables() throws Exception {
}
}

@Test
void testLogSplitWithoutWritableRecordsCanCompleteLakeWriter() throws Exception {
TablePath tablePath = TablePath.of("fluss", "tiering_table_without_writable_records");
TableDescriptor singleBucketPkTableDescriptor =
TableDescriptor.builder()
.schema(DEFAULT_PK_TABLE_SCHEMA)
.distributedBy(1, "id")
.build();
long tableId = createTable(tablePath, singleBucketPkTableDescriptor);
try (Connection connection =
ConnectionFactory.createConnection(
FLUSS_CLUSTER_EXTENSION.getClientConfig());
Table table = connection.getTable(tablePath);
TieringSplitReader<TestingWriteResult> tieringSplitReader =
createTieringReader(
connection, new ThrowOnEmptyCompleteLakeTieringFactory())) {
int key1 = 1;
int key2 = 2;
int bucket = 0;
TableBucket tableBucket = new TableBucket(tableId, bucket);

UpsertWriter upsertWriter = table.newUpsert().createWriter();
// produce offset0
upsertWriter.upsert(row(key1, "v1")).get();
// Deleting a non-existent key still advances the log offset, but this range does not
// produce any tierable record. produce offset1
upsertWriter.delete(row(key2, (Object) null)).get();
// produce offset2
upsertWriter.upsert(row(key2, "v2")).get();

long startingOffset = 1;
long stoppingOffset = 2;
TieringLogSplit tieringLogSplit =
new TieringLogSplit(
tablePath, tableBucket, null, startingOffset, stoppingOffset, 1);

// The custom factory fails if complete() is called on a writer that never received any
// record, which captures the regression this test covers.
tieringSplitReader.handleSplitsChanges(
new SplitsAddition<TieringSplit>(Collections.singletonList(tieringLogSplit)));

RecordsWithSplitIds<TableBucketWriteResult<TestingWriteResult>> result =
tieringSplitReader.fetch();

assertThat(result.nextSplit()).isEqualTo(tieringLogSplit.splitId());
TableBucketWriteResult<TestingWriteResult> writeResult = result.nextRecordFromSplit();
assertThat(writeResult).isNotNull();
// expect null write result since no any records written
assertThat(writeResult.writeResult()).isNull();
}
}

private TieringSplitReader<TestingWriteResult> createTieringReader(Connection connection) {
final TieringMetrics tieringMetrics =
new TieringMetrics(
Expand All @@ -288,6 +344,15 @@ private TieringSplitReader<TestingWriteResult> createTieringReader(Connection co
connection, new TestingLakeTieringFactory(), tieringMetrics);
}

private TieringSplitReader<TestingWriteResult> createTieringReader(
Connection connection, TestingLakeTieringFactory lakeTieringFactory) {
final TieringMetrics tieringMetrics =
new TieringMetrics(
InternalSourceReaderMetricGroup.mock(
new MetricListener().getMetricGroup()));
return new TieringSplitReader<>(connection, lakeTieringFactory, tieringMetrics);
}

private void verifyTieringRows(
TieringSplitReader<TestingWriteResult> tieringSplitReader,
long tableId,
Expand Down Expand Up @@ -384,4 +449,34 @@ private static int getBucketId(InternalRow row) {
HashBucketAssigner hashBucketAssigner = new HashBucketAssigner(DEFAULT_BUCKET_NUM);
return hashBucketAssigner.assignBucket(key);
}

private static class ThrowOnEmptyCompleteLakeTieringFactory extends TestingLakeTieringFactory {

@Override
public LakeWriter<TestingWriteResult> createLakeWriter(WriterInitContext writerInitContext)
throws IOException {
return new ThrowOnEmptyCompleteLakeWriter();
}
}

private static class ThrowOnEmptyCompleteLakeWriter implements LakeWriter<TestingWriteResult> {

private int writtenRecords;

@Override
public void write(LogRecord record) throws IOException {
writtenRecords++;
}

@Override
public TestingWriteResult complete() throws IOException {
if (writtenRecords == 0) {
throw new IOException("complete called without any written records");
}
return new TestingWriteResult(writtenRecords);
}

@Override
public void close() throws IOException {}
}
}