[flink] Support kv snapshot lease in tiering service (#2898)#3286
[flink] Support kv snapshot lease in tiering service (#2898)#3286XuQianJin-Stars wants to merge 6 commits intoapache:mainfrom
Conversation
TieringSourceEnumerator now acquires a KV snapshot lease for all TieringSnapshotSplits before they are assigned to readers, and releases the lease when the table finishes or fails tiering, or when a reader failover returns the splits. A best-effort dropLease is also performed on enumerator close. This prevents the Fluss server from cleaning up snapshots that the tiering job still depends on. One lease id per tiering job (UUID-based) is reused across tables and persisted into TieringSourceEnumeratorState so that it survives enumerator restore instead of leaking orphan leases. The lease uses a fixed 1-day duration that is implicitly renewed by every acquireSnapshots call, and UnsupportedVersionException from older Fluss servers is downgraded to a warning to keep backward compatibility.
There was a problem hiding this comment.
Pull request overview
This PR adds KV snapshot lease management to the Flink tiering source so that Fluss server-side snapshot GC cannot delete snapshots while a tiering job is still consuming them. It also extends the enumerator checkpoint state to persist a single job-scoped lease id across JM failover / restore.
Changes:
- Add snapshot-lease acquisition before assigning
TieringSnapshotSplits, and release tracking per table on finish/fail/failover inTieringSourceEnumerator. - Persist
kvSnapshotLeaseIdinTieringSourceEnumeratorStatewith a bumped, backward-compatible serializer version. - Update
TieringSourcerestoration path and add/extend tests covering lease id persistence and lease lifecycle behaviors.
Reviewed changes
Copilot reviewed 6 out of 6 changed files in this pull request and generated 5 comments.
Show a summary per file
| File | Description |
|---|---|
| fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/tiering/source/enumerator/TieringSourceEnumerator.java | Implements per-table leased-bucket tracking and acquire/release logic tied to tiering lifecycle events. |
| fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/tiering/source/state/TieringSourceEnumeratorState.java | Adds nullable kvSnapshotLeaseId to enumerator checkpoint state with proper equals/hashCode. |
| fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/tiering/source/state/TieringSourceEnumeratorStateSerializer.java | Introduces v1 serializer format that persists the lease id and remains compatible with v0 empty-state. |
| fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/tiering/source/TieringSource.java | Restores the enumerator using the checkpointed lease id. |
| fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/tiering/source/enumerator/TieringSourceEnumeratorTest.java | Adds assertions/tests for lease acquisition and release across finish/fail/failover and lease id restore. |
| fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/tiering/source/state/TieringSourceEnumeratorStateSerializerTest.java | Adds round-trip + v0 compatibility tests for the new lease id state field. |
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
| // NOTE: we intentionally do NOT drop the kv snapshot lease here. The lease id is | ||
| // persisted into the enumerator checkpoint state and will be reused by the restored | ||
| // enumerator after a JM failover. Dropping it on close would destroy the lease that | ||
| // the restored enumerator expects to reuse, potentially causing the referenced | ||
| // snapshots to be garbage-collected before tiering finishes. The lease will expire |
| private void maybeReleaseKvSnapshotLease(long tableId) { | ||
| Set<TableBucket> buckets = leasedBucketsByTable.remove(tableId); | ||
| if (flussAdmin == null || buckets == null || buckets.isEmpty()) { | ||
| return; | ||
| } |
| private void maybeReleaseKvSnapshotLeaseAsync(long tableId) { | ||
| Set<TableBucket> buckets = leasedBucketsByTable.remove(tableId); | ||
| if (flussAdmin == null || buckets == null || buckets.isEmpty()) { | ||
| return; | ||
| } |
| flussAdmin | ||
| .createKvSnapshotLease(kvSnapshotLeaseId, KV_SNAPSHOT_LEASE_DURATION_MS) | ||
| .acquireSnapshots(bucketsToLease) | ||
| .get(); | ||
| leasedBucketsByTable |
| String leaseId = obj.getKvSnapshotLeaseId(); | ||
| if (leaseId != null) { | ||
| out.writeBoolean(true); | ||
| out.writeUTF(leaseId); | ||
| } else { | ||
| out.writeBoolean(false); | ||
| } | ||
| final byte[] result = out.getCopyOfBuffer(); | ||
| out.clear(); | ||
| return result; |
…ing on lease release failure, log unavailable snapshots on acquire
- Reduce retry timeout from 30s to 5s in testTableReachMaxTieringDuration - Add 10s timeout protection to waitUntilTieringTableSplitAssignmentReady method - Reduce sleep time from 3000ms to 1000ms for faster test execution - Fix checkstyle trailing whitespace issues These changes prevent 60-minute CI timeout by reducing excessive waiting in tests.
… blocking coordinator Use SplitEnumeratorContext.callAsync to run the blocking release RPC on the coordinator worker pool and apply bookkeeping updates back on the coordinator thread, so failover does not stall the enumerator while still keeping leasedBucketsByTable mutations single-threaded. Update testLeaseReleasedOnReaderFailover to drive the next one-time callable explicitly so the release RPC is observed before the assertion, without draining subsequent heartbeat callables that would re-acquire a lease for the same table.
luoyuxia
left a comment
There was a problem hiding this comment.
@XuQianJin-Stars Thanks for the pr. Left minor comments. PTAL
| // attempt have not been fully registered yet. The pending failed-table report and the | ||
| // subsequent split request will be driven by the periodic callAsync once failover is | ||
| // marked complete. | ||
| if (isFailOvering) { |
There was a problem hiding this comment.
why change this failover logic?
| if (maxAttempts.size() == 1 && globalMaxAttempt >= 1) { | ||
| LOG.info( | ||
| "Failover completed. All {} subtasks reached the same attempt number {}. Current registered readers are {}", | ||
| "All {} subtasks reached the same attempt number {}. Current registered readers are {}. Waiting for failed-table report to complete before clearing failover state.", |
There was a problem hiding this comment.
why change this failover logic?
| * abnormally. | ||
| * | ||
| * <p>TODO: introduce an explicit periodic lease-renewal mechanism so that a single tiering | ||
| * round that exceeds {@link #KV_SNAPSHOT_LEASE_DURATION_MS} (e.g. for very large tables) will |
There was a problem hiding this comment.
nit: Javadoc pointing to itself
| * not see its snapshots garbage-collected mid-flight. Tracked as a follow-up issue; tiering | ||
| * rounds are typically minute-level today so a 1-day lease is sufficient in practice. | ||
| */ | ||
| private static final long KV_SNAPSHOT_LEASE_DURATION_MS = Duration.ofDays(1).toMillis(); |
There was a problem hiding this comment.
change to Duration.ofHours(6)? I think 6 hource should be fine for most of case.
| pendingSplits.clear(); | ||
| // Release leases asynchronously to avoid blocking the coordinator thread when | ||
| // multiple tables are involved and RPC calls may time out. | ||
| for (Long tableId : tableIdsToRelease) { |
There was a problem hiding this comment.
can we just release all in leasedBucketsByTable? So that we won't need to collect tableIdsToRelease.
| * Asynchronous variant of {@link #maybeReleaseKvSnapshotLease(long)} used during failover to | ||
| * avoid blocking the coordinator thread when multiple tables need to be released. | ||
| */ | ||
| private void maybeReleaseKvSnapshotLeaseAsync(long tableId) { |
There was a problem hiding this comment.
maybeReleaseKvSnapshotLeaseAsync is most same to maybeReleaseKvSnapshotLease, can we just keep one single method?
| .releaseSnapshots(bucketsToRelease) | ||
| .get(); | ||
| // Only drop the bookkeeping entry after the server confirms the release. | ||
| Set<TableBucket> tracked = leasedBucketsByTable.get(tableId); |
There was a problem hiding this comment.
can we just call leasedBucketsByTable.remove(tableId);?
IIUC, bucketsToRelease is same with the buckets in the leasedBucketsByTable
| tableId, | ||
| e); | ||
| } else { | ||
| // Keep the buckets tracked so we (or the next failover/close) can retry. |
There was a problem hiding this comment.
when will we do the retry?
| if (System.currentTimeMillis() - startTime > timeoutMs) { | ||
| throw new AssertionError( | ||
| String.format( | ||
| "等待分配超时: 期望 %d 个分配, 实际 %d 个分配, 超时时间 %dms", |
There was a problem hiding this comment.
why change this?
Also, please use engligh
| * The state of the {@link TieringSourceEnumerator}. Stores the KV snapshot lease id so that it can | ||
| * be recovered after a checkpoint restore, avoiding orphaned leases on the server side. | ||
| */ | ||
| public class TieringSourceEnumeratorState { |
There was a problem hiding this comment.
It will breaks the statless design of tiering service. I think we can just ignore the kvSnapshotLeaseId and delegate fluss cluster to do snapshot ttl.
TieringSourceEnumerator now acquires a KV snapshot lease for all TieringSnapshotSplits before they are assigned to readers, and releases the lease when the table finishes or fails tiering, or when a reader failover returns the splits. A best-effort
dropLeaseis also performed on enumerator close. This prevents the Fluss server from cleaning up snapshots that the tiering job still depends on.One lease id per tiering job (UUID-based) is reused across tables and persisted into
TieringSourceEnumeratorStateso that it survives enumerator restore instead of leaking orphan leases. The lease uses a fixed 1-day duration that is implicitly renewed by everyacquireSnapshotscall, andUnsupportedVersionExceptionfrom older Fluss servers is downgraded to a warning to keep backward compatibility.Purpose
Linked issue: close #2898
Before this change, the tiering job only read Fluss KV snapshots without holding any lease on them. A long-running tiering job could therefore race with the server-side snapshot GC: the server may clean up a snapshot that is still being / about to be consumed by the tiering
SourceReader, causing tiering failures or data loss on the lake side.This PR makes
TieringSourcehold a KV snapshot lease for the full lifecycle of each snapshot split it hands out, so that the Fluss server will not reclaim those snapshots while tiering is in progress.Brief change log
TieringSourceEnumeratorkvSnapshotLeaseIdper tiering job (UUID-based) and reuse it across all tables.TieringSnapshotSplitto a reader, callacquireSnapshots(leaseId, snapshots, 1 day)on the admin / gateway client to acquire a lease covering all snapshot splits of the table.releaseSnapshots) when the table finishes tiering, fails tiering, or when a reader failover returns the splits back to the enumerator.close(), best-effortdropLease(leaseId)to release everything still held by this job.UnsupportedVersionException(old server) to a warning log, so the tiering job keeps working against older Fluss servers without the lease API.TieringSourceEnumeratorState+TieringSourceEnumeratorStateSerializerkvSnapshotLeaseIdinto the enumerator checkpoint state (new serializer version, backward compatible with the previous version).TieringSourcerestoreEnumerator, reuse the persistedkvSnapshotLeaseIdfrom the checkpoint so the recovered enumerator does not generate a new UUID and leak the previous lease.Tests
TieringSourceEnumeratorTestdropLeaseis invoked on enumerator close; enumerator works gracefully when the server returnsUnsupportedVersionException.TieringSourceEnumeratorStateSerializerTestkvSnapshotLeaseIdfield, plus a backward-compatibility case that deserializes a state written by the previous serializer version.mvn clean verifypasses locally for the affected modules.API and Format
TieringSourceEnumeratorStatecheckpoint format is extended with a newkvSnapshotLeaseIdfield. The serializer version is bumped and older checkpoints remain readable (the field defaults to a freshly generated UUID on restore from old state).acquireSnapshots/releaseSnapshots/dropLeaseadmin APIs.Documentation