Skip to content

[flink] Support kv snapshot lease in tiering service (#2898)#3286

Open
XuQianJin-Stars wants to merge 6 commits intoapache:mainfrom
XuQianJin-Stars:issue-2898-kv-snapshot-lease-tiering
Open

[flink] Support kv snapshot lease in tiering service (#2898)#3286
XuQianJin-Stars wants to merge 6 commits intoapache:mainfrom
XuQianJin-Stars:issue-2898-kv-snapshot-lease-tiering

Conversation

@XuQianJin-Stars
Copy link
Copy Markdown
Contributor

TieringSourceEnumerator now acquires a KV snapshot lease for all TieringSnapshotSplits before they are assigned to readers, and releases the lease when the table finishes or fails tiering, or when a reader failover returns the splits. A best-effort dropLease is also performed on enumerator close. This prevents the Fluss server from cleaning up snapshots that the tiering job still depends on.

One lease id per tiering job (UUID-based) is reused across tables and persisted into TieringSourceEnumeratorState so that it survives enumerator restore instead of leaking orphan leases. The lease uses a fixed 1-day duration that is implicitly renewed by every acquireSnapshots call, and UnsupportedVersionException from older Fluss servers is downgraded to a warning to keep backward compatibility.

Purpose

Linked issue: close #2898

Before this change, the tiering job only read Fluss KV snapshots without holding any lease on them. A long-running tiering job could therefore race with the server-side snapshot GC: the server may clean up a snapshot that is still being / about to be consumed by the tiering SourceReader, causing tiering failures or data loss on the lake side.

This PR makes TieringSource hold a KV snapshot lease for the full lifecycle of each snapshot split it hands out, so that the Fluss server will not reclaim those snapshots while tiering is in progress.

Brief change log

  • TieringSourceEnumerator
    • Generate one kvSnapshotLeaseId per tiering job (UUID-based) and reuse it across all tables.
    • Before assigning any TieringSnapshotSplit to a reader, call acquireSnapshots(leaseId, snapshots, 1 day) on the admin / gateway client to acquire a lease covering all snapshot splits of the table.
    • Track in-flight leased snapshots per table; release the lease (releaseSnapshots) when the table finishes tiering, fails tiering, or when a reader failover returns the splits back to the enumerator.
    • On enumerator close(), best-effort dropLease(leaseId) to release everything still held by this job.
    • Downgrade UnsupportedVersionException (old server) to a warning log, so the tiering job keeps working against older Fluss servers without the lease API.
  • TieringSourceEnumeratorState + TieringSourceEnumeratorStateSerializer
    • Persist kvSnapshotLeaseId into the enumerator checkpoint state (new serializer version, backward compatible with the previous version).
  • TieringSource
    • On restoreEnumerator, reuse the persisted kvSnapshotLeaseId from the checkpoint so the recovered enumerator does not generate a new UUID and leak the previous lease.

Tests

  • TieringSourceEnumeratorTest
    • New cases covering: lease is acquired before snapshot splits are assigned; lease is released on table finish / fail / reader failover; dropLease is invoked on enumerator close; enumerator works gracefully when the server returns UnsupportedVersionException.
  • TieringSourceEnumeratorStateSerializerTest
    • Round-trip tests for the new kvSnapshotLeaseId field, plus a backward-compatibility case that deserializes a state written by the previous serializer version.
  • mvn clean verify passes locally for the affected modules.

API and Format

  • No public user-facing API change.
  • TieringSourceEnumeratorState checkpoint format is extended with a new kvSnapshotLeaseId field. The serializer version is bumped and older checkpoints remain readable (the field defaults to a freshly generated UUID on restore from old state).
  • No storage / wire format change on the Fluss server side; this PR only consumes the existing acquireSnapshots / releaseSnapshots / dropLease admin APIs.

Documentation

  • No new user-facing feature or configuration option is introduced, so no documentation update is required.

TieringSourceEnumerator now acquires a KV snapshot lease for all TieringSnapshotSplits before they are assigned to readers, and releases the lease when the table finishes or fails tiering, or when a reader failover returns the splits. A best-effort dropLease is also performed on enumerator close. This prevents the Fluss server from cleaning up snapshots that the tiering job still depends on.

One lease id per tiering job (UUID-based) is reused across tables and persisted into TieringSourceEnumeratorState so that it survives enumerator restore instead of leaking orphan leases. The lease uses a fixed 1-day duration that is implicitly renewed by every acquireSnapshots call, and UnsupportedVersionException from older Fluss servers is downgraded to a warning to keep backward compatibility.
Copy link
Copy Markdown
Contributor

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

This PR adds KV snapshot lease management to the Flink tiering source so that Fluss server-side snapshot GC cannot delete snapshots while a tiering job is still consuming them. It also extends the enumerator checkpoint state to persist a single job-scoped lease id across JM failover / restore.

Changes:

  • Add snapshot-lease acquisition before assigning TieringSnapshotSplits, and release tracking per table on finish/fail/failover in TieringSourceEnumerator.
  • Persist kvSnapshotLeaseId in TieringSourceEnumeratorState with a bumped, backward-compatible serializer version.
  • Update TieringSource restoration path and add/extend tests covering lease id persistence and lease lifecycle behaviors.

Reviewed changes

Copilot reviewed 6 out of 6 changed files in this pull request and generated 5 comments.

Show a summary per file
File Description
fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/tiering/source/enumerator/TieringSourceEnumerator.java Implements per-table leased-bucket tracking and acquire/release logic tied to tiering lifecycle events.
fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/tiering/source/state/TieringSourceEnumeratorState.java Adds nullable kvSnapshotLeaseId to enumerator checkpoint state with proper equals/hashCode.
fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/tiering/source/state/TieringSourceEnumeratorStateSerializer.java Introduces v1 serializer format that persists the lease id and remains compatible with v0 empty-state.
fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/tiering/source/TieringSource.java Restores the enumerator using the checkpointed lease id.
fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/tiering/source/enumerator/TieringSourceEnumeratorTest.java Adds assertions/tests for lease acquisition and release across finish/fail/failover and lease id restore.
fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/tiering/source/state/TieringSourceEnumeratorStateSerializerTest.java Adds round-trip + v0 compatibility tests for the new lease id state field.

💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.

Comment on lines +589 to +593
// NOTE: we intentionally do NOT drop the kv snapshot lease here. The lease id is
// persisted into the enumerator checkpoint state and will be reused by the restored
// enumerator after a JM failover. Dropping it on close would destroy the lease that
// the restored enumerator expects to reuse, potentially causing the referenced
// snapshots to be garbage-collected before tiering finishes. The lease will expire
Comment on lines +680 to +684
private void maybeReleaseKvSnapshotLease(long tableId) {
Set<TableBucket> buckets = leasedBucketsByTable.remove(tableId);
if (flussAdmin == null || buckets == null || buckets.isEmpty()) {
return;
}
Comment on lines +712 to +716
private void maybeReleaseKvSnapshotLeaseAsync(long tableId) {
Set<TableBucket> buckets = leasedBucketsByTable.remove(tableId);
if (flussAdmin == null || buckets == null || buckets.isEmpty()) {
return;
}
Comment on lines +648 to +652
flussAdmin
.createKvSnapshotLease(kvSnapshotLeaseId, KV_SNAPSHOT_LEASE_DURATION_MS)
.acquireSnapshots(bucketsToLease)
.get();
leasedBucketsByTable
Comment on lines +58 to +67
String leaseId = obj.getKvSnapshotLeaseId();
if (leaseId != null) {
out.writeBoolean(true);
out.writeUTF(leaseId);
} else {
out.writeBoolean(false);
}
final byte[] result = out.getCopyOfBuffer();
out.clear();
return result;
…ing on lease release failure, log unavailable snapshots on acquire
- Reduce retry timeout from 30s to 5s in testTableReachMaxTieringDuration
- Add 10s timeout protection to waitUntilTieringTableSplitAssignmentReady method
- Reduce sleep time from 3000ms to 1000ms for faster test execution
- Fix checkstyle trailing whitespace issues

These changes prevent 60-minute CI timeout by reducing excessive waiting in tests.
… blocking coordinator

Use SplitEnumeratorContext.callAsync to run the blocking release RPC on the coordinator worker pool and apply bookkeeping updates back on the coordinator thread, so failover does not stall the enumerator while still keeping leasedBucketsByTable mutations single-threaded. Update testLeaseReleasedOnReaderFailover to drive the next one-time callable explicitly so the release RPC is observed before the assertion, without draining subsequent heartbeat callables that would re-acquire a lease for the same table.
Copy link
Copy Markdown
Contributor

@luoyuxia luoyuxia left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@XuQianJin-Stars Thanks for the pr. Left minor comments. PTAL

// attempt have not been fully registered yet. The pending failed-table report and the
// subsequent split request will be driven by the periodic callAsync once failover is
// marked complete.
if (isFailOvering) {
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why change this failover logic?

if (maxAttempts.size() == 1 && globalMaxAttempt >= 1) {
LOG.info(
"Failover completed. All {} subtasks reached the same attempt number {}. Current registered readers are {}",
"All {} subtasks reached the same attempt number {}. Current registered readers are {}. Waiting for failed-table report to complete before clearing failover state.",
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why change this failover logic?

* abnormally.
*
* <p>TODO: introduce an explicit periodic lease-renewal mechanism so that a single tiering
* round that exceeds {@link #KV_SNAPSHOT_LEASE_DURATION_MS} (e.g. for very large tables) will
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: Javadoc pointing to itself

* not see its snapshots garbage-collected mid-flight. Tracked as a follow-up issue; tiering
* rounds are typically minute-level today so a 1-day lease is sufficient in practice.
*/
private static final long KV_SNAPSHOT_LEASE_DURATION_MS = Duration.ofDays(1).toMillis();
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

change to Duration.ofHours(6)? I think 6 hource should be fine for most of case.

pendingSplits.clear();
// Release leases asynchronously to avoid blocking the coordinator thread when
// multiple tables are involved and RPC calls may time out.
for (Long tableId : tableIdsToRelease) {
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can we just release all in leasedBucketsByTable? So that we won't need to collect tableIdsToRelease.

* Asynchronous variant of {@link #maybeReleaseKvSnapshotLease(long)} used during failover to
* avoid blocking the coordinator thread when multiple tables need to be released.
*/
private void maybeReleaseKvSnapshotLeaseAsync(long tableId) {
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

maybeReleaseKvSnapshotLeaseAsync is most same to maybeReleaseKvSnapshotLease, can we just keep one single method?

.releaseSnapshots(bucketsToRelease)
.get();
// Only drop the bookkeeping entry after the server confirms the release.
Set<TableBucket> tracked = leasedBucketsByTable.get(tableId);
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can we just call leasedBucketsByTable.remove(tableId);?
IIUC, bucketsToRelease is same with the buckets in the leasedBucketsByTable

tableId,
e);
} else {
// Keep the buckets tracked so we (or the next failover/close) can retry.
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

when will we do the retry?

if (System.currentTimeMillis() - startTime > timeoutMs) {
throw new AssertionError(
String.format(
"等待分配超时: 期望 %d 个分配, 实际 %d 个分配, 超时时间 %dms",
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why change this?
Also, please use engligh

* The state of the {@link TieringSourceEnumerator}. Stores the KV snapshot lease id so that it can
* be recovered after a checkpoint restore, avoiding orphaned leases on the server side.
*/
public class TieringSourceEnumeratorState {
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It will breaks the statless design of tiering service. I think we can just ignore the kvSnapshotLeaseId and delegate fluss cluster to do snapshot ttl.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

support kv snapshot lease in tiering service

3 participants