Skip to content

[KV] Kvscan Server Side#3151

Merged
wuchong merged 17 commits intoapache:mainfrom
polyzos:kvscan-server-side
May 9, 2026
Merged

[KV] Kvscan Server Side#3151
wuchong merged 17 commits intoapache:mainfrom
polyzos:kvscan-server-side

Conversation

@polyzos
Copy link
Copy Markdown
Contributor

@polyzos polyzos commented Apr 21, 2026

This PR brings in the server-side implementation for FIP-17, introduced here

#2809

@polyzos polyzos force-pushed the kvscan-server-side branch from 4577f63 to 8fe21dc Compare April 21, 2026 13:21
@polyzos
Copy link
Copy Markdown
Contributor Author

polyzos commented Apr 21, 2026

@polyzos polyzos requested review from Copilot and wuchong April 21, 2026 13:22
Copy link
Copy Markdown
Contributor

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

Note

Copilot was unable to run its full agentic suite in this review.

Adds server-side support for KV full-scan sessions (FIP-17), including session lifecycle management, RPC handling, and operator-facing configuration.

Changes:

  • Introduces ScannerManager/ScannerContext to manage server-side scan sessions with TTL eviction and concurrency limits.
  • Implements TabletService#scanKv to open/continue/close scans and stream batched results.
  • Wires scanner cleanup into replica leadership/stop paths and documents new configuration options.

Reviewed changes

Copilot reviewed 12 out of 12 changed files in this pull request and generated 5 comments.

Show a summary per file
File Description
website/docs/maintenance/configuration.md Documents new KV scanner TTL/limits configs.
fluss-common/src/main/java/org/apache/fluss/config/ConfigOptions.java Adds config keys for scanner TTL, eviction interval, and limits.
fluss-server/src/main/java/org/apache/fluss/server/kv/KvTablet.java Adds openScan to create snapshot-backed scan contexts.
fluss-server/src/main/java/org/apache/fluss/server/kv/scan/ScannerContext.java New scan session state holder (snapshot/iterator/lease).
fluss-server/src/main/java/org/apache/fluss/server/kv/scan/ScannerManager.java New session manager with TTL eviction and limit enforcement.
fluss-server/src/main/java/org/apache/fluss/server/tablet/TabletService.java Implements scanKv RPC using ScannerManager.
fluss-server/src/main/java/org/apache/fluss/server/tablet/TabletServer.java Instantiates/closes ScannerManager and injects into services.
fluss-server/src/main/java/org/apache/fluss/server/replica/ReplicaManager.java Closes scanners on leadership change / stopReplica; adds leader KV accessor.
fluss-server/src/main/java/org/apache/fluss/server/replica/Replica.java Safety-net close of scanners in dropKv().
fluss-server/src/test/java/org/apache/fluss/server/kv/KvTabletTest.java Adds unit tests for openScan behavior.
fluss-server/src/test/java/org/apache/fluss/server/kv/scan/ScannerManagerTest.java New tests for session creation, limits, TTL eviction, and shutdown.
fluss-server/src/test/java/org/apache/fluss/server/replica/ReplicaManagerTest.java Ensures scanners are closed when stopping replicas.

💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.

Comment thread fluss-server/src/main/java/org/apache/fluss/server/kv/KvTablet.java
Comment thread fluss-server/src/main/java/org/apache/fluss/server/kv/KvTablet.java
Comment thread fluss-server/src/main/java/org/apache/fluss/server/tablet/TabletService.java Outdated
Comment thread fluss-common/src/main/java/org/apache/fluss/config/ConfigOptions.java Outdated
@wuchong wuchong linked an issue May 5, 2026 that may be closed by this pull request
Copy link
Copy Markdown
Member

@wuchong wuchong left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks @polyzos , I left some comments.

}

/** Injects the {@link ScannerManager} so that {@link #dropKv()} can close active scanners. */
public void setScannerManager(@Nullable ScannerManager scannerManager) {
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is a little hack. The scannerManager should be immutable when the Replica is created. It should be a part of the constructor.

conf.get(ConfigOptions.LOG_REPLICA_MAX_LAG_TIME).toMillis() / 2);
}

public void setScannerManager(ScannerManager scannerManager) {
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is a little hack. The ScannerManager should never be null and should be immutable, so it should be set in the constructor.

* {@link #closeScannersForBucket(TableBucket)} must be called when a bucket loses leadership to
* release all RocksDB snapshot/iterator resources for that bucket promptly.
*/
public class ScannerManager implements AutoCloseableAsync {
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

IIUC, this class is thread-safe, otherwise, it can be shared across worker threads. Please add @ThreadSafe annotation to the class.

*/
@Nullable
public ScannerContext createScanner(
KvTablet kvTablet, TableBucket tableBucket, @Nullable Long limit) throws IOException {
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We can derive the TableBucket variable from kvTablet.getTableBucket(). Thus, we don't need the tableBucket parameter. Otherwise, we have to check the consistence between kvTablet.getTableBucket() and tableBucket, which is an overhead.

* bucket loses leadership to prevent stale RocksDB snapshot/iterator leaks.
*/
public void closeScannersForBucket(TableBucket tableBucket) {
List<String> toRemove = new ArrayList<>();
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: we can track ScannerContext rather than id in the toRemove List to avoid additional scanners.get(key) concurrent map lookup.

while (context.isValid() && totalBytes < batchSizeBytes) {
byte[] value = context.currentValue();
builder.append(value);
totalBytes += value.length;
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should use the builder.sizeInBytes rather than the totalBytes which is more accurate for the final record batch size.

Comment on lines +575 to +576
response.setErrorCode(Errors.forException(e).code());
response.setErrorMessage(e.getMessage() != null ? e.getMessage() : "");
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it's suggested to use

ApiError error = ApiError.fromThrowable(e).
response.setError(error.error().code(), error.message());


String scannerId = generateScannerId();
ScannerContext context =
kvTablet.openScan(scannerId, limit != null ? limit : -1L, clock.milliseconds());
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Unlike lookups(), prefixLookup(), and limitKvScan(), replicaManager.getLeaderKvTablet(tableBucket) returns the raw KvTablet after only an unsynchronized isLeader() check. If leadership flips after getLeaderKvTablet() returns but before
createScanner() registers the session, makeFollowers() has already run closeScannersForBucket(), so the
new scanner survives on a follower. Subsequent scanKv(scanner_id=...) calls never re-check leadership,
which lets clients keep reading a stale snapshot from a bucket that no longer has the leader.

A safe way to open scan should be have a openScan on the Replica, and returns the rocksdb snapshot under a leaderIsrUpdateLock read lock, just like other methods (lookups, limitScan...).

Comment on lines +59 to +70
private long remainingLimit;
// Initial value -1 so that the first client call_seq_id of 0 satisfies the server's
// in-order check: expectedSeqId = callSeqId + 1 = -1 + 1 = 0.
// callSeqId validation is only performed for continuation requests (those carrying a
// scanner_id), never for the initial open request (those carrying a bucket_scan_req).
private int callSeqId = -1;

/**
* Wall-clock timestamp (ms) of the most recent request that touched this session. Used by
* {@link ScannerManager} for TTL-based eviction.
*/
private long lastAccessTime;
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ScannerManager updates lastAccessTime on RPC workers and reads it later from the background TTL evictor, but this state is stored in plain fields. Without volatile, the evictor can observe a stale timestamp and expire a scanner that was just used; callSeqId has the same visibility problem if a continuation is handled by a different worker after reconnect/retry. Since ScannerContext is intentionally shared across threads by the new manager, these fields need a happens-before edge.

}
}

return CompletableFuture.completedFuture(response);
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The ScanKvResponse contract in proto file says the initial batch must carry the log offset captured when the RocksDB snapshot was opened, but this implementation only fills scanner_id, has_more_results, and records.

Without that offset, a client cannot do a consistent snapshot-to-log handoff (and the empty-bucket fast path has no other way to return it), so scans can miss or duplicate updates that race with the snapshot.

We should initialize the log_offset into the ScannerContext when opening scan on the replica.

@polyzos polyzos force-pushed the kvscan-server-side branch from e406f65 to 627f403 Compare May 6, 2026 13:29
@polyzos
Copy link
Copy Markdown
Contributor Author

polyzos commented May 6, 2026

@wuchong Thanks you for your thorough and detailed review.

I addressed all the comments, rebased on main and added some improvements. Let me know if you think n there is something more that needs to be addressed.

Copy link
Copy Markdown
Member

@wuchong wuchong left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM.

I pushed 2 commits to fix the remaining issues. Will merge this once CI is passed.

Comment on lines +202 to +207
// Fence: closing the iterator while another thread is mid iterator.next() is
// undefined at the RocksDB JNI boundary. tryAcquireForUse() re-checks `closed`
// after winning its CAS, so any racing waiter releases inUse and lets us through.
while (inUse.get()) {
Thread.yield();
}
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The current implementation uses an unbounded spin loop with Thread.yield(), which poses a significant risk of infinite CPU consumption if the inUse flag is never cleared due to bugs or stuck operations. Furthermore, relying on Thread.yield() is generally discouraged in production environments. Regarding the concern about undefined behavior at the RocksDB JNI boundary, I believe it is unnecessary here. If a scan is active while the corresponding KV tablet is dropped or the scan times out, the appropriate response is to throw an exception to the user rather than waiting indefinitely. Therefore, I recommend replacing the blocking wait with a direct update that sets inUse to false. This approach simplifies the logic and avoids the potential performance pitfalls of busy-waiting.

ioExecutor);
ioExecutor,
scannerManager,
kvScanMaxBatchSizeBytes);
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should move kvScanMaxBatchSizeBytes into ScannerManager to reduce the number of parameters in the TabletService constructor. Since this configuration logically belongs to ScannerManager, this change improves code cohesion and simplifies initialization.

@wuchong wuchong force-pushed the kvscan-server-side branch from 73cdf8e to 6b611fb Compare May 9, 2026 09:00
@wuchong wuchong merged commit c6a1c2d into apache:main May 9, 2026
8 checks passed
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

Implement Server-Side KvScan

3 participants