Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -77,14 +77,13 @@ public LogFetchCollector(
/**
* Return the fetched log records, empty the record buffer and update the consumed position.
*
* <p>NOTE: returning empty records guarantees the consumed position are NOT updated.
*
* @return The fetched records per partition
* @throws LogOffsetOutOfRangeException If there is OffsetOutOfRange error in fetchResponse and
* the defaultResetPolicy is NONE
*/
public ScanRecords collectFetch(final LogFetchBuffer logFetchBuffer) {
Map<TableBucket, List<ScanRecord>> fetched = new HashMap<>();
Map<TableBucket, Long> lastConsumedOffsets = new HashMap<>();
int recordsRemaining = maxPollRecords;

try {
Expand Down Expand Up @@ -120,8 +119,11 @@ public ScanRecords collectFetch(final LogFetchBuffer logFetchBuffer) {
logFetchBuffer.poll();
} else {
List<ScanRecord> records = fetchRecords(nextInLineFetch, recordsRemaining);
TableBucket tableBucket = nextInLineFetch.tableBucket;
// Always record the advanced next fetch offset for this bucket, even when
// the materialized record list is empty.
lastConsumedOffsets.put(tableBucket, nextInLineFetch.nextFetchOffset());
if (!records.isEmpty()) {
TableBucket tableBucket = nextInLineFetch.tableBucket;
List<ScanRecord> currentRecords = fetched.get(tableBucket);
if (currentRecords == null) {
fetched.put(tableBucket, records);
Expand All @@ -147,7 +149,11 @@ public ScanRecords collectFetch(final LogFetchBuffer logFetchBuffer) {
}
}

return new ScanRecords(fetched);
// Ensure every polled bucket appears in fetched so that buckets() reflects the polled set.
for (TableBucket polled : lastConsumedOffsets.keySet()) {
fetched.putIfAbsent(polled, Collections.emptyList());
}
return new ScanRecords(fetched, lastConsumedOffsets);
}

private List<ScanRecord> fetchRecords(CompletedFetch nextInLineFetch, int maxRecords) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -142,7 +142,8 @@ public ScanRecords poll(Duration timeout) {
long startNanos = System.nanoTime();
do {
ScanRecords scanRecords = pollForFetches();
if (scanRecords.isEmpty()) {
// Gate on buckets() rather than isEmpty() so progress-only polls reach the caller.
if (scanRecords.buckets().isEmpty()) {
try {
if (!logFetcher.awaitNotEmpty(startNanos + timeoutNanos)) {
// logFetcher waits for the timeout and no data in buffer,
Expand Down Expand Up @@ -249,7 +250,8 @@ public void wakeup() {

private ScanRecords pollForFetches() {
ScanRecords scanRecords = logFetcher.collectFetch();
if (!scanRecords.isEmpty()) {
// Check buckets() (includes progress-only buckets).
if (!scanRecords.buckets().isEmpty()) {
return scanRecords;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,8 @@
import org.apache.fluss.metadata.TableBucket;
import org.apache.fluss.utils.AbstractIterator;

import javax.annotation.Nullable;

import java.util.Collections;
import java.util.Iterator;
import java.util.List;
Expand All @@ -41,8 +43,18 @@ public class ScanRecords implements Iterable<ScanRecord> {

private final Map<TableBucket, List<ScanRecord>> records;

/** The exclusive upper bound of consumed offsets per polled bucket in this round. */
private final Map<TableBucket, Long> lastConsumedOffsets;

public ScanRecords(Map<TableBucket, List<ScanRecord>> records) {
this(records, Collections.emptyMap());
}

public ScanRecords(
Map<TableBucket, List<ScanRecord>> records,
Map<TableBucket, Long> lastConsumedOffsets) {
this.records = records;
this.lastConsumedOffsets = lastConsumedOffsets;
}

/**
Expand All @@ -59,15 +71,25 @@ public List<ScanRecord> records(TableBucket scanBucket) {
}

/**
* Get the bucket ids which have records contained in this record set.
*
* @return the set of partitions with data in this record set (maybe empty if no data was
* returned)
* Get the bucket ids that were polled in this round, including buckets whose record list is
* empty but whose log offset still advanced.
*/
public Set<TableBucket> buckets() {
return Collections.unmodifiableSet(records.keySet());
}

/**
* Get the exclusive upper bound of offsets consumed for the given bucket in this poll round.
*
* @param bucket the bucket to query
* @return the exclusive upper bound offset, or {@code null} if the bucket was not polled in
* this round
*/
@Nullable
public Long lastConsumedOffset(TableBucket bucket) {
return lastConsumedOffsets.get(bucket);
}

/** The number of records for all buckets. */
public int count() {
int count = 0;
Expand All @@ -77,8 +99,9 @@ public int count() {
return count;
}

/** Returns {@code true} if this {@code ScanRecords} contains no materialized records. */
public boolean isEmpty() {
return records.isEmpty();
return count() == 0;
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -258,6 +258,9 @@ void testFilteredEmptyResponseAdvancesOffset() {
assertThat(scanRecords.records(tb)).isEmpty();
assertThat(logScannerStatus.getBucketOffset(tb)).isEqualTo(20L);
assertThat(completedFetch.isConsumed()).isTrue();
// Empty record list, but bucket exposed via buckets() with an advanced lastConsumedOffset.
assertThat(scanRecords.buckets()).contains(tb);
assertThat(scanRecords.lastConsumedOffset(tb)).isEqualTo(20L);
}

private DefaultCompletedFetch makeCompletedFetch(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -149,7 +149,10 @@ void testFetchWithSchemaChange() throws Exception {
assertThat(logFetcher.getCompletedFetchesSize()).isEqualTo(2);
});
ScanRecords records = logFetcher.collectFetch();
assertThat(records.buckets().size()).isEqualTo(1);
// Both polled buckets are exposed; tb1 was polled but produced no records.
TableBucket tb1 = new TableBucket(tableId, bucketId1);
assertThat(records.buckets()).containsExactlyInAnyOrder(tb0, tb1);
assertThat(records.records(tb1)).isEmpty();
List<ScanRecord> scanRecords = records.records(tb0);
assertThat(scanRecords.stream().map(ScanRecord::getRow).collect(Collectors.toList()))
.isEqualTo(expectedRows);
Expand Down Expand Up @@ -195,7 +198,8 @@ void testFetchWithSchemaChange() throws Exception {
assertThat(newSchemaLogFetcher.getCompletedFetchesSize()).isEqualTo(2);
});
records = newSchemaLogFetcher.collectFetch();
assertThat(records.buckets().size()).isEqualTo(1);
assertThat(records.buckets()).containsExactlyInAnyOrder(tb0, tb1);
assertThat(records.records(tb1)).isEmpty();
assertThat(records.records(tb0)).hasSize(20);
scanRecords = records.records(tb0);
assertThat(scanRecords.stream().map(ScanRecord::getRow).collect(Collectors.toList()))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,8 @@

import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.Iterator;
import java.util.LinkedHashMap;
import java.util.List;
Expand Down Expand Up @@ -57,4 +59,83 @@ void iterator() {
}
assertThat(c).isEqualTo(4);
}

/** Verifies the legacy single-arg constructor leaves {@code lastConsumedOffset} undefined. */
@Test
void legacyConstructorHasNoLastConsumedOffset() {
TableBucket tb = new TableBucket(0L, 0);
Map<TableBucket, List<ScanRecord>> records = new HashMap<>();
records.put(tb, Collections.emptyList());

ScanRecords scanRecords = new ScanRecords(records);

assertThat(scanRecords.buckets()).containsExactly(tb);
assertThat(scanRecords.lastConsumedOffset(tb)).isNull();
}

/**
* Verifies buckets with only advanced offsets are still exposed via {@link
* ScanRecords#buckets()} and carry their {@code lastConsumedOffset}.
*/
@Test
void emptyBucketIsExposedViaBuckets() {
TableBucket bucketWithRecords = new TableBucket(0L, 0);
TableBucket emptyBucket = new TableBucket(0L, 1);

Map<TableBucket, List<ScanRecord>> records = new HashMap<>();
records.put(
bucketWithRecords,
Collections.singletonList(
new ScanRecord(5L, 1000L, ChangeType.INSERT, row(1, "a"))));
// Empty-progress buckets also appear in records (as emptyList).
records.put(emptyBucket, Collections.emptyList());

Map<TableBucket, Long> lastConsumedOffsets = new HashMap<>();
lastConsumedOffsets.put(bucketWithRecords, 6L);
lastConsumedOffsets.put(emptyBucket, 10L);

ScanRecords scanRecords = new ScanRecords(records, lastConsumedOffsets);

assertThat(scanRecords.buckets()).containsExactlyInAnyOrder(bucketWithRecords, emptyBucket);
assertThat(scanRecords.records(emptyBucket)).isEmpty();
assertThat(scanRecords.lastConsumedOffset(bucketWithRecords)).isEqualTo(6L);
assertThat(scanRecords.lastConsumedOffset(emptyBucket)).isEqualTo(10L);
assertThat(scanRecords.lastConsumedOffset(new TableBucket(0L, 99))).isNull();
}

/**
* Verifies {@link ScanRecords#isEmpty()} reflects only materialized records, regardless of
* advanced offsets.
*/
@Test
void isEmptyReflectsOnlyMaterializedRecords() {
TableBucket tb = new TableBucket(0L, 0);

// No records and no progress: both isEmpty() and buckets() must be empty.
ScanRecords trulyEmpty = ScanRecords.EMPTY;
assertThat(trulyEmpty.isEmpty()).isTrue();
assertThat(trulyEmpty.buckets()).isEmpty();

// Progress-only round: isEmpty() must stay true (no materialized records),
// while buckets() must still expose the advanced bucket for callers to detect.
Map<TableBucket, List<ScanRecord>> emptyRecords = new HashMap<>();
emptyRecords.put(tb, Collections.emptyList());
Map<TableBucket, Long> progressOnly = new HashMap<>();
progressOnly.put(tb, 42L);
ScanRecords progressOnlyRecords = new ScanRecords(emptyRecords, progressOnly);
assertThat(progressOnlyRecords.isEmpty()).isTrue();
assertThat(progressOnlyRecords.buckets()).containsExactly(tb);
assertThat(progressOnlyRecords.lastConsumedOffset(tb)).isEqualTo(42L);

// Materialized records present: isEmpty() flips to false; legacy single-arg ctor still
// works.
Map<TableBucket, List<ScanRecord>> records = new HashMap<>();
records.put(
tb,
Collections.singletonList(
new ScanRecord(0L, 1000L, ChangeType.INSERT, row(1, "a"))));
ScanRecords withRecords = new ScanRecords(records);
assertThat(withRecords.isEmpty()).isFalse();
assertThat(withRecords.buckets()).containsExactly(tb);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -355,63 +355,89 @@ private RecordsWithSplitIds<TableBucketWriteResult<WriteResult>> forLogRecords(
Map<TableBucket, TableBucketWriteResult<WriteResult>> writeResults = new HashMap<>();
Map<TableBucket, String> finishedSplitIds = new HashMap<>();

// Iterate every polled bucket, including those that only advanced their offset.
for (TableBucket bucket : scanRecords.buckets()) {
List<ScanRecord> bucketScanRecords = scanRecords.records(bucket);
if (bucketScanRecords.isEmpty()) {
continue;
}
// no any stopping offset, just skip handle the records for the bucket
Long stoppingOffset = currentTableStoppingOffsets.get(bucket);
if (stoppingOffset == null) {
continue;
}

List<ScanRecord> bucketScanRecords = scanRecords.records(bucket);
ScanRecord lastRecord = null;
LakeWriter<WriteResult> lakeWriter = null;
for (ScanRecord record : bucketScanRecords) {
// if record is less than stopping offset
if (record.logOffset() < stoppingOffset) {
if (lakeWriter == null) {
lakeWriter =
getOrCreateLakeWriter(
bucket,
currentTableSplitsByBucket.get(bucket).getPartitionName());
}
lakeWriter.write(record);
if (record.getSizeInBytes() > 0) {
tieringMetrics.recordBytesRead(record.getSizeInBytes());

if (!bucketScanRecords.isEmpty()) {
for (ScanRecord record : bucketScanRecords) {
// only tier records within the split range [start, stoppingOffset).
if (record.logOffset() < stoppingOffset) {
if (lakeWriter == null) {
lakeWriter =
getOrCreateLakeWriter(
bucket,
currentTableSplitsByBucket
.get(bucket)
.getPartitionName());
}
lakeWriter.write(record);
if (record.getSizeInBytes() > 0) {
tieringMetrics.recordBytesRead(record.getSizeInBytes());
}
}
}
lastRecord = bucketScanRecords.get(bucketScanRecords.size() - 1);
}

// Prefer the scanner-reported lastConsumedOffset; fall back to the last record offset.
Long lastConsumedOffset = scanRecords.lastConsumedOffset(bucket);
boolean reachedEnd =
lastConsumedOffset != null
? lastConsumedOffset >= stoppingOffset
: lastRecord != null && lastRecord.logOffset() >= stoppingOffset - 1;

if (!reachedEnd && lastRecord == null) {
continue;
}

// When reachedEnd, the tiered offset is stoppingOffset - 1 (exclusive bound).
// Timestamp prefers the current record, then the previously tracked value.
long tieredOffset = reachedEnd ? stoppingOffset - 1 : lastRecord.logOffset();
long tieredTimestamp;
if (lastRecord != null) {
tieredTimestamp = lastRecord.timestamp();
} else {
LogOffsetAndTimestamp latest = currentTableTieredOffsetAndTimestamp.get(bucket);
tieredTimestamp = latest != null ? latest.timestamp : UNKNOWN_BUCKET_TIMESTAMP;
}
ScanRecord lastRecord = bucketScanRecords.get(bucketScanRecords.size() - 1);
currentTableTieredOffsetAndTimestamp.put(
bucket,
new LogOffsetAndTimestamp(lastRecord.logOffset(), lastRecord.timestamp()));
// has arrived into the end of the split,
if (lastRecord.logOffset() >= stoppingOffset - 1) {
currentTableStoppingOffsets.remove(bucket);
if (bucket.getPartitionId() != null) {
currentLogScanner.unsubscribe(bucket.getPartitionId(), bucket.getBucket());
} else {
// todo: should unsubscribe the log split if unsubscribe bucket for
// un-partitioned table is supported
}
TieringSplit currentTieringSplit = currentTableSplitsByBucket.remove(bucket);
String currentSplitId = currentTieringSplit.splitId();
// put write result of the bucket
writeResults.put(
bucket,
completeLakeWriter(
bucket,
currentTieringSplit.getPartitionName(),
stoppingOffset,
lastRecord.timestamp()));
// put split of the bucket
finishedSplitIds.put(bucket, currentSplitId);
LOG.info(
"Finish tier bucket {} for table {}, split: {}.",
bucket,
currentTablePath,
currentSplitId);
bucket, new LogOffsetAndTimestamp(tieredOffset, tieredTimestamp));

if (!reachedEnd) {
continue;
}

currentTableStoppingOffsets.remove(bucket);
if (bucket.getPartitionId() != null) {
currentLogScanner.unsubscribe(bucket.getPartitionId(), bucket.getBucket());
} else {
// todo: should unsubscribe the log split if unsubscribe bucket for
// un-partitioned table is supported
}
TieringSplit currentTieringSplit = currentTableSplitsByBucket.remove(bucket);
String currentSplitId = currentTieringSplit.splitId();
writeResults.put(
bucket,
completeLakeWriter(
bucket,
currentTieringSplit.getPartitionName(),
stoppingOffset,
tieredTimestamp));
finishedSplitIds.put(bucket, currentSplitId);
LOG.info(
"Finish tier bucket {} for table {}, split: {}.",
bucket,
currentTablePath,
currentSplitId);
}

if (!finishedSplitIds.isEmpty()) {
Expand Down
Loading