diff --git a/fluss-common/src/main/java/org/apache/fluss/config/ConfigOptions.java b/fluss-common/src/main/java/org/apache/fluss/config/ConfigOptions.java index df21129db6..02e016852f 100644 --- a/fluss-common/src/main/java/org/apache/fluss/config/ConfigOptions.java +++ b/fluss-common/src/main/java/org/apache/fluss/config/ConfigOptions.java @@ -515,6 +515,51 @@ public class ConfigOptions { + WRITER_ID_EXPIRATION_TIME.key() + " passing. The default value is 10 minutes."); + public static final ConfigOption KV_SCANNER_TTL = + key("kv.scanner.ttl") + .durationType() + .defaultValue(Duration.ofMinutes(10)) + .withDescription( + "The time-to-live for an idle KV scanner session on the server. A scanner that has not " + + "received a request within this duration will be automatically expired and its " + + "resources released. The default value is 10 minutes."); + + public static final ConfigOption KV_SCANNER_EXPIRATION_INTERVAL = + key("kv.scanner.expiration-interval") + .durationType() + .defaultValue(Duration.ofSeconds(30)) + .withDescription( + "How often the TTL evictor runs to close idle scanner sessions. " + + "The default value is 30 seconds."); + + public static final ConfigOption KV_SCANNER_MAX_PER_BUCKET = + key("kv.scanner.max-per-bucket") + .intType() + .defaultValue(8) + .withDescription( + "Maximum number of concurrent scanner sessions per bucket. " + + "Exceeding this limit returns TOO_MANY_SCANNERS. " + + "The default value is 8."); + + public static final ConfigOption KV_SCANNER_MAX_PER_SERVER = + key("kv.scanner.max-per-server") + .intType() + .defaultValue(200) + .withDescription( + "Maximum number of concurrent scanner sessions per tablet server. " + + "Exceeding this limit returns TOO_MANY_SCANNERS. " + + "The default value is 200."); + + public static final ConfigOption KV_SCANNER_MAX_BATCH_SIZE = + key("kv.scanner.max-batch-size") + .memoryType() + .defaultValue(MemorySize.parse("10mb")) + .withDescription( + "Server-side cap on the per-batch payload size for KV full-scan responses. " + + "The effective batch size is min(client-requested batch_size_bytes, " + + "this value). Protects the tablet server from out-of-memory if a " + + "client passes an excessively large batch size. The default value is 10mb."); + public static final ConfigOption TABLET_SERVER_CONTROLLED_SHUTDOWN_MAX_RETRIES = key("tablet-server.controlled-shutdown.max-retries") .intType() diff --git a/fluss-common/src/main/java/org/apache/fluss/record/DefaultValueRecordBatch.java b/fluss-common/src/main/java/org/apache/fluss/record/DefaultValueRecordBatch.java index 8d41ec4c86..3bb7539f61 100644 --- a/fluss-common/src/main/java/org/apache/fluss/record/DefaultValueRecordBatch.java +++ b/fluss-common/src/main/java/org/apache/fluss/record/DefaultValueRecordBatch.java @@ -278,6 +278,11 @@ public DefaultValueRecordBatch build() throws IOException { return DefaultValueRecordBatch.pointToMemory(segment, 0); } + /** Returns the current serialized size of the batch, including the record-batch header. */ + public int sizeInBytes() { + return sizeInBytes; + } + private void writeBatchHeader() throws IOException { outputView.setPosition(0); // update header diff --git a/fluss-server/src/main/java/org/apache/fluss/server/kv/KvTablet.java b/fluss-server/src/main/java/org/apache/fluss/server/kv/KvTablet.java index 85c192c922..f380193997 100644 --- a/fluss-server/src/main/java/org/apache/fluss/server/kv/KvTablet.java +++ b/fluss-server/src/main/java/org/apache/fluss/server/kv/KvTablet.java @@ -58,6 +58,8 @@ import org.apache.fluss.server.kv.rocksdb.RocksDBStatistics; import org.apache.fluss.server.kv.rowmerger.DefaultRowMerger; import org.apache.fluss.server.kv.rowmerger.RowMerger; +import org.apache.fluss.server.kv.scan.OpenScanResult; +import org.apache.fluss.server.kv.scan.ScannerContext; import org.apache.fluss.server.kv.snapshot.KvFileHandleAndLocalPath; import org.apache.fluss.server.kv.snapshot.KvSnapshotDataUploader; import org.apache.fluss.server.kv.snapshot.RocksIncrementalSnapshot; @@ -70,12 +72,17 @@ import org.apache.fluss.server.log.LogTablet; import org.apache.fluss.server.metrics.group.TabletServerMetricGroup; import org.apache.fluss.server.utils.FatalErrorHandler; +import org.apache.fluss.server.utils.ResourceGuard; import org.apache.fluss.shaded.arrow.org.apache.arrow.memory.BufferAllocator; import org.apache.fluss.types.RowType; import org.apache.fluss.utils.BytesUtils; import org.apache.fluss.utils.FileUtils; +import org.apache.fluss.utils.IOUtils; import org.rocksdb.RateLimiter; +import org.rocksdb.ReadOptions; +import org.rocksdb.RocksIterator; +import org.rocksdb.Snapshot; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -757,6 +764,74 @@ public List limitScan(int limit) throws IOException { }); } + /** + * Opens a new full-scan session under the {@code kvLock} read lock. Returns an empty-bucket + * result (context = {@code null}, all RocksDB resources released internally) when the bucket + * has no rows. The returned {@link ScannerContext} is unregistered; the caller owns + * registration and close. + * + * @param limit row-count cap across all batches ({@code ≤ 0} means unlimited) + * @throws IOException if RocksDB is shutting down + */ + public OpenScanResult openScan(String scannerId, long limit, long initialAccessTimeMs) + throws IOException { + return inReadLock( + kvLock, + () -> { + rocksDBKv.checkIfRocksDBClosed(); + ResourceGuard.Lease lease = rocksDBKv.getResourceGuard().acquireResource(); + Snapshot snapshot = null; + ReadOptions readOptions = null; + RocksIterator iterator = null; + boolean success = false; + try { + snapshot = rocksDBKv.getDb().getSnapshot(); + // Capture under kvLock so the offset matches the data visible through + // the snapshot. + long capturedLogOffset = flushedLogOffset; + readOptions = new ReadOptions().setSnapshot(snapshot); + iterator = + rocksDBKv + .getDb() + .newIterator( + rocksDBKv.getDefaultColumnFamilyHandle(), + readOptions); + iterator.seekToFirst(); + if (!iterator.isValid()) { + return new OpenScanResult(null, capturedLogOffset); + } + ScannerContext context = + new ScannerContext( + scannerId, + tableBucket, + rocksDBKv, + iterator, + readOptions, + snapshot, + lease, + limit, + capturedLogOffset, + initialAccessTimeMs); + success = true; + return new OpenScanResult(context, capturedLogOffset); + } finally { + if (!success) { + IOUtils.closeQuietly(iterator); + IOUtils.closeQuietly(readOptions); + if (snapshot != null) { + try { + rocksDBKv.getDb().releaseSnapshot(snapshot); + } catch (Throwable t) { + LOG.warn("Error releasing RocksDB snapshot.", t); + } + IOUtils.closeQuietly(snapshot); + } + IOUtils.closeQuietly(lease); + } + } + }); + } + public KvBatchWriter createKvBatchWriter() { return rocksDBKv.newWriteBatch( writeBatchSize, diff --git a/fluss-server/src/main/java/org/apache/fluss/server/kv/scan/OpenScanResult.java b/fluss-server/src/main/java/org/apache/fluss/server/kv/scan/OpenScanResult.java new file mode 100644 index 0000000000..5331a7a1cf --- /dev/null +++ b/fluss-server/src/main/java/org/apache/fluss/server/kv/scan/OpenScanResult.java @@ -0,0 +1,49 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.fluss.server.kv.scan; + +import org.apache.fluss.annotation.Internal; + +import javax.annotation.Nullable; + +/** + * Result of opening a KV full-scan session: an optional {@link ScannerContext} plus the log offset + * captured at snapshot time. The offset must reach the client even on the empty-bucket fast path so + * a snapshot-to-log handoff cannot miss or duplicate records. + */ +@Internal +public final class OpenScanResult { + + @Nullable private final ScannerContext context; + private final long logOffset; + + public OpenScanResult(@Nullable ScannerContext context, long logOffset) { + this.context = context; + this.logOffset = logOffset; + } + + /** {@code null} if the bucket was empty at snapshot time. */ + @Nullable + public ScannerContext getContext() { + return context; + } + + public long getLogOffset() { + return logOffset; + } +} diff --git a/fluss-server/src/main/java/org/apache/fluss/server/kv/scan/ScannerContext.java b/fluss-server/src/main/java/org/apache/fluss/server/kv/scan/ScannerContext.java new file mode 100644 index 0000000000..6347a3d439 --- /dev/null +++ b/fluss-server/src/main/java/org/apache/fluss/server/kv/scan/ScannerContext.java @@ -0,0 +1,212 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.fluss.server.kv.scan; + +import org.apache.fluss.exception.KvStorageException; +import org.apache.fluss.metadata.TableBucket; +import org.apache.fluss.server.kv.rocksdb.RocksDBKv; +import org.apache.fluss.server.utils.ResourceGuard; +import org.apache.fluss.utils.IOUtils; + +import org.rocksdb.ReadOptions; +import org.rocksdb.RocksDBException; +import org.rocksdb.RocksIterator; +import org.rocksdb.Snapshot; + +import javax.annotation.concurrent.NotThreadSafe; + +import java.io.Closeable; +import java.nio.charset.StandardCharsets; +import java.util.concurrent.atomic.AtomicBoolean; + +/** + * Server-side state for a single KV full-scan session: a pinned RocksDB {@link Snapshot}, a {@link + * RocksIterator} cursor, and a {@link ResourceGuard.Lease} that keeps the underlying RocksDB alive + * for the lifetime of the scan. + * + *

Cursor methods ({@link #advance()}, {@link #isValid()}, {@link #currentValue()}) are + * single-threaded; callers must hold the {@link #tryAcquireForUse()} flag while mutating cursor or + * sequence-id state. {@link #close()} is thread-safe and fences on that flag. + */ +@NotThreadSafe +public class ScannerContext implements Closeable { + private final String scannerId; + private final byte[] scannerIdBytes; + private final TableBucket tableBucket; + private final RocksDBKv rocksDBKv; + private final RocksIterator iterator; + private final ReadOptions readOptions; + private final Snapshot snapshot; + private final ResourceGuard.Lease resourceLease; + + /** Log offset captured when the snapshot was opened. */ + private final long logOffset; + + private long remainingLimit; + + // -1 so the first client call_seq_id of 0 satisfies expectedSeqId = callSeqId + 1. + // volatile: a continuation may run on a different RPC worker than the previous one. + private volatile int callSeqId = -1; + + /** Last-access wall-clock (ms); volatile for the TTL evictor. */ + private volatile long lastAccessTime; + + private final AtomicBoolean closed = new AtomicBoolean(false); + + private final AtomicBoolean inUse = new AtomicBoolean(false); + + public ScannerContext( + String scannerId, + TableBucket tableBucket, + RocksDBKv rocksDBKv, + RocksIterator iterator, + ReadOptions readOptions, + Snapshot snapshot, + ResourceGuard.Lease resourceLease, + long limit, + long logOffset, + long initialAccessTimeMs) { + this.scannerId = scannerId; + this.scannerIdBytes = scannerId.getBytes(StandardCharsets.UTF_8); + this.tableBucket = tableBucket; + this.rocksDBKv = rocksDBKv; + this.iterator = iterator; + this.readOptions = readOptions; + this.snapshot = snapshot; + this.resourceLease = resourceLease; + this.remainingLimit = limit <= 0 ? -1L : limit; + this.logOffset = logOffset; + this.lastAccessTime = initialAccessTimeMs; + } + + /** Log offset captured when the snapshot was opened. */ + public long getLogOffset() { + return logOffset; + } + + public byte[] getScannerId() { + return scannerIdBytes; + } + + /** Scanner ID as a UTF-8 string; used as the key in {@code ScannerManager#scanners}. */ + String getIdString() { + return scannerId; + } + + public TableBucket getTableBucket() { + return tableBucket; + } + + public boolean isValid() { + return iterator.isValid() && remainingLimit != 0; + } + + public byte[] currentValue() { + return iterator.value(); + } + + /** Advances the cursor; must be called only when {@link #isValid()} is {@code true}. */ + public void advance() { + iterator.next(); + if (remainingLimit > 0) { + remainingLimit--; + } + } + + /** + * Surfaces RocksDB-internal iterator errors. {@link RocksIterator#next()} does not throw on + * such errors; instead the iterator silently transitions to invalid. Callers MUST invoke this + * once iteration has stopped, otherwise an internal failure would be indistinguishable from a + * clean end-of-range and the client would think the scan completed when rows were dropped. + * + * @throws KvStorageException if the iterator reports an internal error + */ + public void checkIteratorStatus() { + try { + iterator.status(); + } catch (RocksDBException e) { + throw new KvStorageException( + "RocksDB iterator error during scan for bucket " + tableBucket, e); + } + } + + /** Returns the call-sequence ID of the last successfully served request, or {@code -1}. */ + public int getCallSeqId() { + return callSeqId; + } + + /** + * Updates the call-sequence ID. Must be called after the response payload is fully + * prepared so a client retry with the same id can be detected. + */ + public void setCallSeqId(int callSeqId) { + this.callSeqId = callSeqId; + } + + /** Resets the idle-TTL timer to the given wall-clock time. */ + public void updateLastAccessTime(long nowMs) { + this.lastAccessTime = nowMs; + } + + /** Returns {@code true} if idle for longer than {@code ttlMs}. */ + public boolean isExpired(long ttlMs, long nowMs) { + return nowMs - lastAccessTime > ttlMs; + } + + /** + * Tries to claim exclusive use of the cursor. Returns {@code false} if another thread already + * holds it, or if {@link #close()} has been initiated. Every successful acquire MUST be paired + * with {@link #releaseAfterUse()} in a {@code finally}, otherwise {@code close()} blocks. + */ + public boolean tryAcquireForUse() { + if (closed.get()) { + return false; + } + if (!inUse.compareAndSet(false, true)) { + return false; + } + // Re-check after the CAS in case close() flipped `closed` in between; let close() proceed. + if (closed.get()) { + inUse.set(false); + return false; + } + return true; + } + + /** + * Releases the flag obtained via {@link #tryAcquireForUse()}. MUST be called BEFORE any path + * that may trigger {@link #close()} on the same thread, otherwise {@code close()} self- + * deadlocks on its inUse fence. + */ + public void releaseAfterUse() { + inUse.set(false); + } + + @Override + public void close() { + if (closed.compareAndSet(false, true)) { + // force close the inUse fence so any racing tryAcquireForUse() calls + inUse.set(false); + IOUtils.closeQuietly(iterator); + IOUtils.closeQuietly(readOptions); + IOUtils.closeQuietly(() -> rocksDBKv.getDb().releaseSnapshot(snapshot)); + IOUtils.closeQuietly(snapshot); + IOUtils.closeQuietly(resourceLease); + } + } +} diff --git a/fluss-server/src/main/java/org/apache/fluss/server/kv/scan/ScannerManager.java b/fluss-server/src/main/java/org/apache/fluss/server/kv/scan/ScannerManager.java new file mode 100644 index 0000000000..c6e834c0d6 --- /dev/null +++ b/fluss-server/src/main/java/org/apache/fluss/server/kv/scan/ScannerManager.java @@ -0,0 +1,359 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.fluss.server.kv.scan; + +import org.apache.fluss.annotation.VisibleForTesting; +import org.apache.fluss.config.ConfigOptions; +import org.apache.fluss.config.Configuration; +import org.apache.fluss.exception.TooManyScannersException; +import org.apache.fluss.metadata.TableBucket; +import org.apache.fluss.server.replica.Replica; +import org.apache.fluss.utils.AutoCloseableAsync; +import org.apache.fluss.utils.clock.Clock; +import org.apache.fluss.utils.clock.SystemClock; +import org.apache.fluss.utils.concurrent.FutureUtils; +import org.apache.fluss.utils.concurrent.Scheduler; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import javax.annotation.Nullable; +import javax.annotation.concurrent.ThreadSafe; + +import java.io.IOException; +import java.nio.charset.StandardCharsets; +import java.util.ArrayList; +import java.util.List; +import java.util.Map; +import java.util.UUID; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ScheduledFuture; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicInteger; + +/** + * Manages server-side KV full-scan sessions ({@link ScannerContext}). Sessions are keyed by a + * server-assigned UUID and persist across multiple batched-fetch RPCs. + * + *

Concurrency limits are enforced both per-bucket and per-server, with a pre-check followed by + * an atomic increment + re-check (rolled back on contention) so the configured caps cannot be + * permanently breached. Sessions idle longer than {@code kv.scanner.ttl} are evicted by a + * background task; their IDs are retained for {@code 2 × ttl} so that {@link + * #isRecentlyExpired(byte[])} can distinguish expired sessions from unknown ones. + * + *

{@link #closeScannersForBucket(TableBucket)} must be called when a bucket loses leadership to + * release RocksDB resources promptly. + */ +@ThreadSafe +public class ScannerManager implements AutoCloseableAsync { + + private static final Logger LOG = LoggerFactory.getLogger(ScannerManager.class); + + /** Sentinel for read paths so a missing bucket entry does not allocate a counter. */ + private static final AtomicInteger ZERO = new AtomicInteger(0); + + private final Map scanners = new ConcurrentHashMap<>(); + private final Map recentlyExpiredIds = new ConcurrentHashMap<>(); + private final Map perBucketCount = new ConcurrentHashMap<>(); + private final AtomicInteger totalScanners = new AtomicInteger(0); + + private final AtomicBoolean closed = new AtomicBoolean(false); + + private final Clock clock; + private final long scannerTtlMs; + private final long recentlyExpiredRetentionMs; + private final int maxPerBucket; + private final int maxPerServer; + private final int maxBatchSizeBytes; + + @Nullable private ScheduledFuture evictorTask; + + public ScannerManager(Configuration conf, Scheduler scheduler) { + this(conf, scheduler, SystemClock.getInstance()); + } + + @VisibleForTesting + ScannerManager(Configuration conf, Scheduler scheduler, Clock clock) { + this.clock = clock; + this.scannerTtlMs = conf.get(ConfigOptions.KV_SCANNER_TTL).toMillis(); + this.recentlyExpiredRetentionMs = 2 * scannerTtlMs; + this.maxPerBucket = conf.get(ConfigOptions.KV_SCANNER_MAX_PER_BUCKET); + this.maxPerServer = conf.get(ConfigOptions.KV_SCANNER_MAX_PER_SERVER); + long configuredMaxBatch = conf.get(ConfigOptions.KV_SCANNER_MAX_BATCH_SIZE).getBytes(); + this.maxBatchSizeBytes = (int) Math.min(Integer.MAX_VALUE, configuredMaxBatch); + + long expirationIntervalMs = + conf.get(ConfigOptions.KV_SCANNER_EXPIRATION_INTERVAL).toMillis(); + this.evictorTask = + scheduler.schedule( + "scanner-expiration", + this::evictExpiredScanners, + expirationIntervalMs, + expirationIntervalMs); + + LOG.info( + "Started ScannerManager: ttl={}ms, expirationInterval={}ms, " + + "maxPerBucket={}, maxPerServer={}", + scannerTtlMs, + expirationIntervalMs, + maxPerBucket, + maxPerServer); + } + + /** + * Opens a new scan session against the given leader replica. The snapshot is taken under the + * replica's leader-ISR read lock; on {@link TooManyScannersException} the just-opened context + * is closed by {@link Replica#openScan} before the exception propagates. Returns an empty- + * bucket result (context = {@code null}) without consuming a slot when the bucket has no rows. + * + * @param limit optional row-count cap ({@code null} or ≤ 0 means unlimited) + * @throws TooManyScannersException if the per-bucket or per-server limit is exceeded + * @throws IOException if RocksDB is shutting down + */ + public OpenScanResult createScanner(Replica replica, @Nullable Long limit) throws IOException { + checkLimits(replica.getTableBucket()); + String scannerId = generateScannerId(); + return replica.openScan(this, scannerId, limit != null ? limit : -1L, clock.milliseconds()); + } + + /** + * Registers an already-opened {@link ScannerContext}, enforcing the per-bucket and per-server + * limits. The caller must close the context if {@link TooManyScannersException} is thrown. + */ + public void register(ScannerContext context) { + registerContext(context); + } + + /** + * Looks up a scanner session. Does not refresh the last-access timestamp — call {@link + * #markAccessed(ScannerContext)} only after the request has been fully validated, so malformed + * requests cannot extend the idle TTL of an orphan session. + */ + @Nullable + public ScannerContext getScanner(byte[] scannerId) { + return scanners.get(new String(scannerId, StandardCharsets.UTF_8)); + } + + /** Refreshes the last-access timestamp; call only when about to do real work. */ + public void markAccessed(ScannerContext context) { + context.updateLastAccessTime(clock.milliseconds()); + } + + /** + * Returns {@code true} if the ID belongs to a session evicted within the last {@code 2 × + * ttlMs}; lets callers distinguish "expired" from "unknown". + */ + public boolean isRecentlyExpired(byte[] scannerId) { + return recentlyExpiredIds.containsKey(new String(scannerId, StandardCharsets.UTF_8)); + } + + /** + * Removes and closes the given scanner. Uses a conditional remove so the TTL evictor and an + * explicit close cannot both run the close path. + */ + public void removeScanner(ScannerContext context) { + if (scanners.remove(context.getIdString(), context)) { + decrementCounts(context.getTableBucket()); + closeScannerContext(context); + } + } + + /** Looks up and removes a scanner by raw ID bytes; no-op if not found. */ + public void removeScanner(byte[] scannerId) { + String key = new String(scannerId, StandardCharsets.UTF_8); + ScannerContext context = scanners.get(key); + if (context != null) { + removeScanner(context); + } + } + + /** Returns the max batch size in bytes allowed for a kv scan response. */ + public int getMaxBatchSizeBytes() { + return maxBatchSizeBytes; + } + + @VisibleForTesting + public int activeScannerCount() { + return totalScanners.get(); + } + + @VisibleForTesting + public int activeScannerCountForBucket(TableBucket tableBucket) { + return perBucketCount.getOrDefault(tableBucket, ZERO).get(); + } + + /** + * Closes all active scanner sessions for the bucket on a leadership change. Records the IDs in + * {@link #recentlyExpiredIds} so continuation RPCs after the close surface SCANNER_EXPIRED + * (recoverable) instead of UNKNOWN_SCANNER_ID. + */ + public void closeScannersForBucket(TableBucket tableBucket) { + List toRemove = new ArrayList<>(); + for (Map.Entry entry : scanners.entrySet()) { + if (tableBucket.equals(entry.getValue().getTableBucket())) { + toRemove.add(entry.getValue()); + } + } + long now = clock.milliseconds(); + for (ScannerContext context : toRemove) { + LOG.info( + "Closing scanner {} for bucket {} due to leadership change.", + context.getIdString(), + tableBucket); + // Record before removal so a racing continuation never sees the ID as both unknown + // and not-recently-expired. + recentlyExpiredIds.put(context.getIdString(), now); + removeScanner(context); + } + perBucketCount.remove(tableBucket); + } + + /** Fast best-effort pre-check; the atomic re-check in {@link #registerContext} is the gate. */ + private void checkLimits(TableBucket tableBucket) { + if (totalScanners.get() >= maxPerServer) { + throw new TooManyScannersException( + String.format( + "Cannot create scanner for bucket %s: server-wide limit of %d reached.", + tableBucket, maxPerServer)); + } + if (perBucketCount.getOrDefault(tableBucket, ZERO).get() >= maxPerBucket) { + throw new TooManyScannersException( + String.format( + "Cannot create scanner for bucket %s: per-bucket limit of %d reached.", + tableBucket, maxPerBucket)); + } + } + + private void registerContext(ScannerContext context) { + TableBucket tableBucket = context.getTableBucket(); + + int newTotal = totalScanners.incrementAndGet(); + if (newTotal > maxPerServer) { + totalScanners.decrementAndGet(); + throw new TooManyScannersException( + String.format( + "Cannot create scanner for bucket %s: server-wide limit of %d reached.", + tableBucket, maxPerServer)); + } + + AtomicInteger bucketCount = + perBucketCount.computeIfAbsent(tableBucket, k -> new AtomicInteger(0)); + int newBucketCount = bucketCount.incrementAndGet(); + if (newBucketCount > maxPerBucket) { + bucketCount.decrementAndGet(); + totalScanners.decrementAndGet(); + throw new TooManyScannersException( + String.format( + "Cannot create scanner for bucket %s: per-bucket limit of %d reached.", + tableBucket, maxPerBucket)); + } + + scanners.put(context.getIdString(), context); + + LOG.debug( + "Registered scanner {} for bucket {} (total={}, perBucket={})", + context.getIdString(), + tableBucket, + newTotal, + newBucketCount); + } + + private void evictExpiredScanners() { + if (closed.get()) { + return; + } + long now = clock.milliseconds(); + + recentlyExpiredIds + .entrySet() + .removeIf(e -> now - e.getValue() > recentlyExpiredRetentionMs); + + for (Map.Entry entry : scanners.entrySet()) { + ScannerContext context = entry.getValue(); + if (context.isExpired(scannerTtlMs, now)) { + if (scanners.remove(entry.getKey(), context)) { + LOG.info( + "Evicted idle scanner {} for bucket {} (idle > {}ms).", + entry.getKey(), + context.getTableBucket(), + scannerTtlMs); + recentlyExpiredIds.put(entry.getKey(), now); + decrementCounts(context.getTableBucket()); + closeScannerContext(context); + } + } + } + } + + private void decrementCounts(TableBucket bucket) { + totalScanners.decrementAndGet(); + perBucketCount.computeIfPresent( + bucket, + (k, count) -> { + int remaining = count.decrementAndGet(); + return remaining <= 0 ? null : count; + }); + } + + private void closeScannerContext(ScannerContext context) { + try { + context.close(); + } catch (Exception e) { + LOG.warn( + "Error closing scanner {} for bucket {}.", + context.getIdString(), + context.getTableBucket(), + e); + } + } + + private static String generateScannerId() { + return UUID.randomUUID().toString().replace("-", ""); + } + + @Override + public CompletableFuture closeAsync() { + try { + close(); + return CompletableFuture.completedFuture(null); + } catch (Exception e) { + return FutureUtils.completedExceptionally(e); + } + } + + @Override + public void close() { + if (!closed.compareAndSet(false, true)) { + return; + } + + if (evictorTask != null) { + evictorTask.cancel(false); + evictorTask = null; + } + + for (Map.Entry entry : scanners.entrySet()) { + if (scanners.remove(entry.getKey(), entry.getValue())) { + decrementCounts(entry.getValue().getTableBucket()); + closeScannerContext(entry.getValue()); + } + } + recentlyExpiredIds.clear(); + } +} diff --git a/fluss-server/src/main/java/org/apache/fluss/server/replica/Replica.java b/fluss-server/src/main/java/org/apache/fluss/server/replica/Replica.java index 2259ec4c32..6a84316c39 100644 --- a/fluss-server/src/main/java/org/apache/fluss/server/replica/Replica.java +++ b/fluss-server/src/main/java/org/apache/fluss/server/replica/Replica.java @@ -31,6 +31,7 @@ import org.apache.fluss.exception.NonPrimaryKeyTableException; import org.apache.fluss.exception.NotEnoughReplicasException; import org.apache.fluss.exception.NotLeaderOrFollowerException; +import org.apache.fluss.exception.TooManyScannersException; import org.apache.fluss.fs.FsPath; import org.apache.fluss.metadata.ChangelogImage; import org.apache.fluss.metadata.LogFormat; @@ -61,6 +62,9 @@ import org.apache.fluss.server.kv.RemoteLogFetcher; import org.apache.fluss.server.kv.autoinc.AutoIncIDRange; import org.apache.fluss.server.kv.rocksdb.RocksDBKvBuilder; +import org.apache.fluss.server.kv.scan.OpenScanResult; +import org.apache.fluss.server.kv.scan.ScannerContext; +import org.apache.fluss.server.kv.scan.ScannerManager; import org.apache.fluss.server.kv.snapshot.CompletedKvSnapshotCommitter; import org.apache.fluss.server.kv.snapshot.CompletedSnapshot; import org.apache.fluss.server.kv.snapshot.KvFileHandleAndLocalPath; @@ -211,6 +215,13 @@ public final class Replica { private volatile @Nullable CloseableRegistry closeableRegistryForKv; private @Nullable PeriodicSnapshotManager kvSnapshotManager; + /** + * Server-wide {@link ScannerManager}. Active sessions for this bucket are closed in {@link + * #dropKv()} under the {@code leaderIsrUpdateLock} write lock; {@link #openScan} registers + * under the read lock, so registration cannot race a leadership flip. + */ + private final ScannerManager scannerManager; + // ------- metrics private Counter isrShrinks; private Counter isrExpands; @@ -236,7 +247,8 @@ public Replica( BucketMetricGroup bucketMetricGroup, TableInfo tableInfo, Clock clock, - RemoteLogManager remoteLogManager) + RemoteLogManager remoteLogManager, + ScannerManager scannerManager) throws Exception { this.physicalPath = physicalPath; this.tableBucket = tableBucket; @@ -269,6 +281,7 @@ public Replica( this.logTablet.updateIsDataLakeEnabled(tableConfig.isDataLakeEnabled()); this.clock = clock; this.remoteLogManager = remoteLogManager; + this.scannerManager = checkNotNull(scannerManager, "scannerManager"); registerMetrics(); } @@ -702,16 +715,16 @@ private void createKv() { } private void dropKv() { - // close any closeable registry for kv + // Release scanner leases first; otherwise resourceGuard.close() inside kvTablet.close() + // blocks waiting for them. Runs under leaderIsrUpdateLock(W), so no concurrent register. + scannerManager.closeScannersForBucket(tableBucket); + if (closeableRegistry.unregisterCloseable(closeableRegistryForKv)) { IOUtils.closeQuietly(closeableRegistryForKv); } if (kvTablet != null) { - // Unregister RocksDB statistics before dropping KvTablet - // This ensures statistics are cleaned up when KvTablet is destroyed bucketMetricGroup.unregisterRocksDBStatistics(); - // drop the kv tablet checkNotNull(kvManager); kvManager.dropKv(tableBucket); kvTablet = null; @@ -1377,6 +1390,52 @@ public DefaultValueRecordBatch limitKvScan(int limit) { }); } + /** + * Opens a new full-scan session against this replica's KV store. The leader check, snapshot + * open, and {@link ScannerManager#register} all happen under {@code leaderIsrUpdateLock(R)}, so + * a leadership flip cannot leave a scanner registered for a non-leader bucket. + * + * @param limit row-count cap ({@code ≤ 0} means unlimited) + * @throws NonPrimaryKeyTableException if this replica is not a primary-key table + * @throws NotLeaderOrFollowerException if this replica is not the leader + * @throws TooManyScannersException if scanner limits are exceeded + * @throws IOException if RocksDB is shutting down + */ + public OpenScanResult openScan( + ScannerManager scannerManager, String scannerId, long limit, long initialAccessTimeMs) + throws IOException { + if (!isKvTable()) { + throw new NonPrimaryKeyTableException( + "the primary key table not exists for " + tableBucket); + } + + return inReadLock( + leaderIsrUpdateLock, + () -> { + if (!isLeader()) { + throw new NotLeaderOrFollowerException( + String.format( + "Leader not local for bucket %s on tabletServer %d", + tableBucket, localTabletServerId)); + } + checkNotNull( + kvTablet, "KvTablet for the replica to open scan shouldn't be null."); + OpenScanResult result = + kvTablet.openScan(scannerId, limit, initialAccessTimeMs); + ScannerContext context = result.getContext(); + if (context == null) { + return result; + } + try { + scannerManager.register(context); + } catch (TooManyScannersException e) { + IOUtils.closeQuietly(context); + throw e; + } + return result; + }); + } + public LogRecords limitLogScan(int limit) { return inReadLock( leaderIsrUpdateLock, diff --git a/fluss-server/src/main/java/org/apache/fluss/server/replica/ReplicaManager.java b/fluss-server/src/main/java/org/apache/fluss/server/replica/ReplicaManager.java index 3b847266eb..f2a01cbe95 100644 --- a/fluss-server/src/main/java/org/apache/fluss/server/replica/ReplicaManager.java +++ b/fluss-server/src/main/java/org/apache/fluss/server/replica/ReplicaManager.java @@ -79,6 +79,7 @@ import org.apache.fluss.server.entity.UserContext; import org.apache.fluss.server.kv.KvManager; import org.apache.fluss.server.kv.KvSnapshotResource; +import org.apache.fluss.server.kv.scan.ScannerManager; import org.apache.fluss.server.kv.snapshot.CompletedKvSnapshotCommitter; import org.apache.fluss.server.kv.snapshot.DefaultSnapshotContext; import org.apache.fluss.server.log.FetchDataInfo; @@ -209,6 +210,8 @@ public class ReplicaManager implements ServerReconfigurable { private final Clock clock; + private final ScannerManager scannerManager; + public ReplicaManager( Configuration conf, Scheduler scheduler, @@ -223,6 +226,7 @@ public ReplicaManager( FatalErrorHandler fatalErrorHandler, TabletServerMetricGroup serverMetricGroup, UserMetrics userMetrics, + ScannerManager scannerManager, Clock clock, ExecutorService ioExecutor) throws IOException { @@ -241,6 +245,7 @@ public ReplicaManager( serverMetricGroup, userMetrics, new RemoteLogManager(conf, zkClient, coordinatorGateway, clock, ioExecutor), + scannerManager, clock, ioExecutor); } @@ -261,6 +266,7 @@ public ReplicaManager( TabletServerMetricGroup serverMetricGroup, UserMetrics userMetrics, RemoteLogManager remoteLogManager, + ScannerManager scannerManager, Clock clock, ExecutorService ioExecutor) throws IOException { @@ -310,6 +316,7 @@ public ReplicaManager( this.clock = clock; this.ioExecutor = ioExecutor; this.minInSyncReplicas = conf.get(ConfigOptions.LOG_REPLICA_MIN_IN_SYNC_REPLICAS_NUMBER); + this.scannerManager = checkNotNull(scannerManager, "scannerManager"); registerMetrics(); } @@ -1155,6 +1162,7 @@ private void makeFollowers( Replica replica = getReplicaOrException(data.getTableBucket()); if (replica.makeFollower(data)) { replicasBecomeFollower.add(replica); + scannerManager.closeScannersForBucket(tb); } // stop the remote log tiering tasks for followers remoteLogManager.stopLogTiering(replica); @@ -1833,6 +1841,8 @@ private StopReplicaResultForBucket stopReplica( // First stop fetchers for this table bucket. replicaFetcherManager.removeFetcherForBuckets(Collections.singleton(tb)); + scannerManager.closeScannersForBucket(tb); + HostedReplica replica = getReplica(tb); if (replica instanceof OnlineReplica) { Replica replicaToDelete = ((OnlineReplica) replica).getReplica(); @@ -1945,7 +1955,8 @@ protected Optional maybeCreateReplica(NotifyLeaderAndIsrData data) { bucketMetricGroup, tableInfo, clock, - remoteLogManager); + remoteLogManager, + scannerManager); allReplicas.put(tb, new OnlineReplica(replica)); replicaOpt = Optional.of(replica); } else if (hostedReplica instanceof OnlineReplica) { diff --git a/fluss-server/src/main/java/org/apache/fluss/server/tablet/TabletServer.java b/fluss-server/src/main/java/org/apache/fluss/server/tablet/TabletServer.java index 988f7565ac..d0f57db640 100644 --- a/fluss-server/src/main/java/org/apache/fluss/server/tablet/TabletServer.java +++ b/fluss-server/src/main/java/org/apache/fluss/server/tablet/TabletServer.java @@ -40,6 +40,7 @@ import org.apache.fluss.server.coordinator.LakeCatalogDynamicLoader; import org.apache.fluss.server.coordinator.MetadataManager; import org.apache.fluss.server.kv.KvManager; +import org.apache.fluss.server.kv.scan.ScannerManager; import org.apache.fluss.server.kv.snapshot.DefaultCompletedKvSnapshotCommitter; import org.apache.fluss.server.log.LogManager; import org.apache.fluss.server.log.remote.RemoteLogManager; @@ -145,6 +146,9 @@ public class TabletServer extends ServerBase { @GuardedBy("lock") private ReplicaManager replicaManager; + @GuardedBy("lock") + private ScannerManager scannerManager; + @GuardedBy("lock") private @Nullable RemoteLogManager remoteLogManager = null; @@ -254,6 +258,8 @@ protected void startServices() throws Exception { conf.get(ConfigOptions.SERVER_IO_POOL_SIZE), new ExecutorThreadFactory("tablet-server-io")); + this.scannerManager = new ScannerManager(conf, scheduler); + this.replicaManager = new ReplicaManager( conf, @@ -270,6 +276,7 @@ protected void startServices() throws Exception { this, tabletServerMetricGroup, userMetrics, + scannerManager, clock, ioExecutor); replicaManager.startup(); @@ -291,7 +298,8 @@ protected void startServices() throws Exception { metadataManager, authorizer, dynamicConfigManager, - ioExecutor); + ioExecutor, + scannerManager); RequestsMetrics requestsMetrics = RequestsMetrics.createTabletServerRequestMetrics(tabletServerMetricGroup); @@ -433,6 +441,10 @@ CompletableFuture stopServices() { scheduler.shutdown(); } + if (scannerManager != null) { + scannerManager.close(); + } + if (kvManager != null) { kvManager.shutdown(); } diff --git a/fluss-server/src/main/java/org/apache/fluss/server/tablet/TabletService.java b/fluss-server/src/main/java/org/apache/fluss/server/tablet/TabletService.java index 08536adfd9..88731daaba 100644 --- a/fluss-server/src/main/java/org/apache/fluss/server/tablet/TabletService.java +++ b/fluss-server/src/main/java/org/apache/fluss/server/tablet/TabletService.java @@ -19,10 +19,15 @@ import org.apache.fluss.cluster.ServerType; import org.apache.fluss.exception.AuthorizationException; +import org.apache.fluss.exception.InvalidScanRequestException; +import org.apache.fluss.exception.NotLeaderOrFollowerException; +import org.apache.fluss.exception.ScannerExpiredException; +import org.apache.fluss.exception.UnknownScannerIdException; import org.apache.fluss.exception.UnknownTableOrBucketException; import org.apache.fluss.fs.FileSystem; import org.apache.fluss.metadata.TableBucket; import org.apache.fluss.metadata.TablePath; +import org.apache.fluss.record.DefaultValueRecordBatch; import org.apache.fluss.record.KvRecordBatch; import org.apache.fluss.record.MemoryLogRecords; import org.apache.fluss.rpc.entity.FetchLogResultForBucket; @@ -52,6 +57,7 @@ import org.apache.fluss.rpc.messages.NotifyLeaderAndIsrResponse; import org.apache.fluss.rpc.messages.NotifyRemoteLogOffsetsRequest; import org.apache.fluss.rpc.messages.NotifyRemoteLogOffsetsResponse; +import org.apache.fluss.rpc.messages.PbScanReqForBucket; import org.apache.fluss.rpc.messages.PrefixLookupRequest; import org.apache.fluss.rpc.messages.PrefixLookupResponse; import org.apache.fluss.rpc.messages.ProduceLogRequest; @@ -76,12 +82,16 @@ import org.apache.fluss.server.entity.FetchReqInfo; import org.apache.fluss.server.entity.NotifyLeaderAndIsrData; import org.apache.fluss.server.entity.UserContext; +import org.apache.fluss.server.kv.scan.OpenScanResult; +import org.apache.fluss.server.kv.scan.ScannerContext; +import org.apache.fluss.server.kv.scan.ScannerManager; import org.apache.fluss.server.log.FetchParams; import org.apache.fluss.server.log.FetchParamsBuilder; import org.apache.fluss.server.log.FilterInfo; import org.apache.fluss.server.log.ListOffsetsParam; import org.apache.fluss.server.metadata.TabletServerMetadataCache; import org.apache.fluss.server.metadata.TabletServerMetadataProvider; +import org.apache.fluss.server.replica.Replica; import org.apache.fluss.server.replica.ReplicaManager; import org.apache.fluss.server.utils.ServerRpcMessageUtils; import org.apache.fluss.server.zk.ZooKeeperClient; @@ -137,6 +147,7 @@ public final class TabletService extends RpcServiceBase implements TabletServerG private final ReplicaManager replicaManager; private final TabletServerMetadataCache metadataCache; private final TabletServerMetadataProvider metadataFunctionProvider; + private final ScannerManager scannerManager; public TabletService( int serverId, @@ -147,7 +158,8 @@ public TabletService( MetadataManager metadataManager, @Nullable Authorizer authorizer, DynamicConfigManager dynamicConfigManager, - ExecutorService ioExecutor) { + ExecutorService ioExecutor, + ScannerManager scannerManager) { super( remoteFileSystem, ServerType.TABLET_SERVER, @@ -161,6 +173,7 @@ public TabletService( this.metadataCache = metadataCache; this.metadataFunctionProvider = new TabletServerMetadataProvider(zkClient, metadataManager, metadataCache); + this.scannerManager = scannerManager; } @Override @@ -435,7 +448,201 @@ public CompletableFuture notifyLakeTableOffset( @Override public CompletableFuture scanKv(ScanKvRequest request) { - return null; + ScanKvResponse response = new ScanKvResponse(); + ScannerContext openedContext = null; + ScannerContext acquiredContext = null; + try { + if (request.hasBucketScanReq() && request.hasScannerId()) { + throw new InvalidScanRequestException( + "ScanKvRequest must not set both bucket_scan_req and scanner_id."); + } + if (request.hasBucketScanReq() + && request.hasCloseScanner() + && request.isCloseScanner()) { + throw new InvalidScanRequestException( + "ScanKvRequest must not set close_scanner together with bucket_scan_req."); + } + if (!request.hasBucketScanReq() && !request.hasScannerId()) { + throw new InvalidScanRequestException( + "ScanKvRequest must have either bucket_scan_req (new scan) " + + "or scanner_id (continuation)."); + } + + boolean isCloseRequest = request.hasCloseScanner() && request.isCloseScanner(); + + // Validate batch_size_bytes up-front so malformed requests never open a snapshot. + int effectiveBatchSize = 0; + if (!isCloseRequest) { + if (!request.hasBatchSizeBytes()) { + throw new InvalidScanRequestException( + "batch_size_bytes is required for data-fetching scan requests."); + } + int requestedBatchSize = request.getBatchSizeBytes(); + if (requestedBatchSize <= 0) { + throw new InvalidScanRequestException( + "batch_size_bytes must be greater than 0."); + } + effectiveBatchSize = + Math.min(requestedBatchSize, scannerManager.getMaxBatchSizeBytes()); + } + + ScannerContext context; + + boolean isNewScan = false; + long initialLogOffset = 0L; + + if (request.hasBucketScanReq()) { + PbScanReqForBucket bucketReq = request.getBucketScanReq(); + long tableId = bucketReq.getTableId(); + authorizeTable(READ, tableId); + + TableBucket tableBucket = + new TableBucket( + tableId, + bucketReq.hasPartitionId() ? bucketReq.getPartitionId() : null, + bucketReq.getBucketId()); + Long limit = bucketReq.hasLimit() ? bucketReq.getLimit() : null; + + OpenScanResult openResult = + scannerManager.createScanner( + replicaManager.getReplicaOrException(tableBucket), limit); + isNewScan = true; + initialLogOffset = openResult.getLogOffset(); + + context = openResult.getContext(); + if (context == null) { + // Empty bucket: no session registered; still return the captured offset. + response.setHasMoreResults(false); + response.setLogOffset(initialLogOffset); + return CompletableFuture.completedFuture(response); + } + openedContext = context; + } else { + byte[] scannerId = request.getScannerId(); + context = scannerManager.getScanner(scannerId); + if (context == null) { + if (isCloseRequest) { + response.setScannerId(scannerId); + response.setHasMoreResults(false); + return CompletableFuture.completedFuture(response); + } + if (scannerManager.isRecentlyExpired(scannerId)) { + throw new ScannerExpiredException( + "Scanner session has expired due to inactivity. " + + "Please start a new scan."); + } else { + throw new UnknownScannerIdException( + "Unknown scanner ID. The session may have expired or " + + "never existed."); + } + } + openedContext = context; + } + + // Cursor-exclusion CAS: serialises concurrent same-scannerId RPCs and rejects if + // close() has begun. + if (!context.tryAcquireForUse()) { + throw new InvalidScanRequestException( + String.format( + "Concurrent scan request on scanner ID for bucket %s, or session " + + "is closing; only one in-flight scanKv RPC per scanner " + + "is allowed.", + context.getTableBucket())); + } + acquiredContext = context; + + if (!request.hasBucketScanReq() && request.hasCallSeqId()) { + long expectedSeqId = (long) context.getCallSeqId() + 1L; + int requestSeqId = request.getCallSeqId(); + if ((long) requestSeqId != expectedSeqId) { + throw new InvalidScanRequestException( + String.format( + "Out-of-order scan request: expected callSeqId=%d but got %d.", + expectedSeqId, requestSeqId)); + } + } + + // Honour close even on a non-leader: the local session is still ours to release. + if (isCloseRequest) { + response.setScannerId(context.getScannerId()); + response.setHasMoreResults(false); + return CompletableFuture.completedFuture(response); + } + + // Catch a leadership flip ahead of the eventual closeScannersForBucket callback so + // the client can redirect rather than consume a stale snapshot. + if (!request.hasBucketScanReq()) { + Replica replica = replicaManager.getReplicaOrException(context.getTableBucket()); + if (!replica.isLeader()) { + throw new NotLeaderOrFollowerException( + String.format( + "Leader is no longer local for bucket %s; client should " + + "restart the scan against the new leader.", + context.getTableBucket())); + } + } + + // Refresh TTL only now that the request is fully validated. + scannerManager.markAccessed(context); + + // Gate on builder.sizeInBytes() (not raw bytes) so the threshold reflects the + // serialised batch. Always append at least one record so a tiny effectiveBatchSize + // cannot produce an empty has_more=true response. + DefaultValueRecordBatch.Builder builder = DefaultValueRecordBatch.builder(); + boolean appendedAny = false; + while (context.isValid() + && (!appendedAny || builder.sizeInBytes() < effectiveBatchSize)) { + builder.append(context.currentValue()); + context.advance(); + appendedAny = true; + } + + boolean hasMore = context.isValid(); + if (!hasMore) { + // RocksIterator.next() does not throw on internal errors; an unchecked status + // would silently truncate the scan and report has_more=false to the client. + context.checkIteratorStatus(); + } + DefaultValueRecordBatch batch = builder.build(); + + response.setScannerId(context.getScannerId()); + response.setHasMoreResults(hasMore); + if (batch.sizeInBytes() > 0) { + response.setRecords(batch.getSegment(), batch.getPosition(), batch.sizeInBytes()); + } + if (isNewScan) { + response.setLogOffset(initialLogOffset); + } + + // Update callSeqId AFTER the response is prepared so a duplicate retry can be + // detected via the in-order check. + if (request.hasCallSeqId()) { + context.setCallSeqId(request.getCallSeqId()); + } + + // Keep the session alive only if there's more to read; otherwise leave openedContext + // set so finally drains it. + if (hasMore) { + openedContext = null; + } + + } catch (Exception e) { + if (e instanceof InterruptedException || e.getCause() instanceof InterruptedException) { + Thread.currentThread().interrupt(); + } + ApiError apiError = ApiError.fromThrowable(e); + response.setErrorCode(apiError.error().code()); + response.setErrorMessage(apiError.message() != null ? apiError.message() : ""); + } finally { + if (acquiredContext != null) { + acquiredContext.releaseAfterUse(); + } + if (openedContext != null) { + scannerManager.removeScanner(openedContext); + } + } + + return CompletableFuture.completedFuture(response); } @Override diff --git a/fluss-server/src/test/java/org/apache/fluss/server/kv/KvTabletTest.java b/fluss-server/src/test/java/org/apache/fluss/server/kv/KvTabletTest.java index fd08eeb4aa..1f2d5a89fa 100644 --- a/fluss-server/src/test/java/org/apache/fluss/server/kv/KvTabletTest.java +++ b/fluss-server/src/test/java/org/apache/fluss/server/kv/KvTabletTest.java @@ -57,6 +57,8 @@ import org.apache.fluss.server.kv.prewrite.KvPreWriteBuffer.Value; import org.apache.fluss.server.kv.rocksdb.RocksDBStatistics; import org.apache.fluss.server.kv.rowmerger.RowMerger; +import org.apache.fluss.server.kv.scan.OpenScanResult; +import org.apache.fluss.server.kv.scan.ScannerContext; import org.apache.fluss.server.log.FetchIsolation; import org.apache.fluss.server.log.LogAppendInfo; import org.apache.fluss.server.log.LogTablet; @@ -1857,4 +1859,139 @@ void testRowCountWithMixedOperations() throws Exception { kvTablet.close(); } + + @Test + void testOpenScan_emptyBucket_returnsNullContext() throws Exception { + initLogTabletAndKvTablet(DATA1_SCHEMA_PK, new HashMap<>()); + OpenScanResult result = kvTablet.openScan("scanner-empty", -1L, 0L); + assertThat(result).isNotNull(); + assertThat(result.getContext()).isNull(); + assertThat(result.getLogOffset()).isGreaterThanOrEqualTo(0L); + } + + @Test + void testOpenScan_returnsAllRows() throws Exception { + initLogTabletAndKvTablet(DATA1_SCHEMA_PK, new HashMap<>()); + + int numRows = 5; + List rows = new ArrayList<>(); + for (int i = 0; i < numRows; i++) { + rows.add( + kvRecordFactory.ofRecord( + String.valueOf(i).getBytes(), new Object[] {i, "v" + i})); + } + kvTablet.putAsLeader(kvRecordBatchFactory.ofRecords(rows), null); + kvTablet.flush(Long.MAX_VALUE, NOPErrorHandler.INSTANCE); + + OpenScanResult result = kvTablet.openScan("scanner-all", -1L, 0L); + ScannerContext context = result.getContext(); + assertThat(context).isNotNull(); + assertThat(result.getLogOffset()).isEqualTo(context.getLogOffset()); + + int count = 0; + while (context.isValid()) { + assertThat(context.currentValue()).isNotNull(); + context.advance(); + count++; + } + context.close(); + + assertThat(count).isEqualTo(numRows); + } + + /** Snapshot isolation: rows written after openScan must not appear in the scan. */ + @Test + void testOpenScan_snapshotIsolation() throws Exception { + initLogTabletAndKvTablet(DATA1_SCHEMA_PK, new HashMap<>()); + + List initialRows = + Arrays.asList( + kvRecordFactory.ofRecord("0".getBytes(), new Object[] {0, "v0"}), + kvRecordFactory.ofRecord("1".getBytes(), new Object[] {1, "v1"}), + kvRecordFactory.ofRecord("2".getBytes(), new Object[] {2, "v2"})); + kvTablet.putAsLeader(kvRecordBatchFactory.ofRecords(initialRows), null); + kvTablet.flush(Long.MAX_VALUE, NOPErrorHandler.INSTANCE); + + ScannerContext context = kvTablet.openScan("scanner-snap", -1L, 0L).getContext(); + assertThat(context).isNotNull(); + + List lateRows = + Arrays.asList( + kvRecordFactory.ofRecord("3".getBytes(), new Object[] {3, "v3"}), + kvRecordFactory.ofRecord("4".getBytes(), new Object[] {4, "v4"})); + kvTablet.putAsLeader(kvRecordBatchFactory.ofRecords(lateRows), null); + kvTablet.flush(Long.MAX_VALUE, NOPErrorHandler.INSTANCE); + + int count = 0; + while (context.isValid()) { + context.advance(); + count++; + } + context.close(); + + assertThat(count).isEqualTo(3); + } + + @Test + void testOpenScan_withLimit() throws Exception { + initLogTabletAndKvTablet(DATA1_SCHEMA_PK, new HashMap<>()); + + int numRows = 5; + List rows = new ArrayList<>(); + for (int i = 0; i < numRows; i++) { + rows.add( + kvRecordFactory.ofRecord( + String.valueOf(i).getBytes(), new Object[] {i, "v" + i})); + } + kvTablet.putAsLeader(kvRecordBatchFactory.ofRecords(rows), null); + kvTablet.flush(Long.MAX_VALUE, NOPErrorHandler.INSTANCE); + + long limit = 3L; + ScannerContext context = kvTablet.openScan("scanner-limit", limit, 0L).getContext(); + assertThat(context).isNotNull(); + + int count = 0; + while (context.isValid()) { + context.advance(); + count++; + } + context.close(); + + assertThat(count).isEqualTo((int) limit); + } + + @Test + void testOpenScan_multipleSessionsIndependent() throws Exception { + initLogTabletAndKvTablet(DATA1_SCHEMA_PK, new HashMap<>()); + + List rows = + Arrays.asList( + kvRecordFactory.ofRecord("0".getBytes(), new Object[] {0, "v0"}), + kvRecordFactory.ofRecord("1".getBytes(), new Object[] {1, "v1"}), + kvRecordFactory.ofRecord("2".getBytes(), new Object[] {2, "v2"})); + kvTablet.putAsLeader(kvRecordBatchFactory.ofRecords(rows), null); + kvTablet.flush(Long.MAX_VALUE, NOPErrorHandler.INSTANCE); + + ScannerContext ctx1 = kvTablet.openScan("scanner-a", -1L, 0L).getContext(); + ScannerContext ctx2 = kvTablet.openScan("scanner-b", -1L, 0L).getContext(); + assertThat(ctx1).isNotNull(); + assertThat(ctx2).isNotNull(); + + int count1 = 0; + while (ctx1.isValid()) { + ctx1.advance(); + count1++; + } + ctx1.close(); + + int count2 = 0; + while (ctx2.isValid()) { + ctx2.advance(); + count2++; + } + ctx2.close(); + + assertThat(count1).isEqualTo(3); + assertThat(count2).isEqualTo(3); + } } diff --git a/fluss-server/src/test/java/org/apache/fluss/server/kv/scan/ScannerManagerTest.java b/fluss-server/src/test/java/org/apache/fluss/server/kv/scan/ScannerManagerTest.java new file mode 100644 index 0000000000..c04a907b32 --- /dev/null +++ b/fluss-server/src/test/java/org/apache/fluss/server/kv/scan/ScannerManagerTest.java @@ -0,0 +1,488 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.fluss.server.kv.scan; + +import org.apache.fluss.config.ConfigOptions; +import org.apache.fluss.config.Configuration; +import org.apache.fluss.config.TableConfig; +import org.apache.fluss.exception.TooManyScannersException; +import org.apache.fluss.memory.TestingMemorySegmentPool; +import org.apache.fluss.metadata.KvFormat; +import org.apache.fluss.metadata.LogFormat; +import org.apache.fluss.metadata.PhysicalTablePath; +import org.apache.fluss.metadata.SchemaInfo; +import org.apache.fluss.metadata.TableBucket; +import org.apache.fluss.metadata.TablePath; +import org.apache.fluss.record.KvRecord; +import org.apache.fluss.record.KvRecordTestUtils; +import org.apache.fluss.record.TestData; +import org.apache.fluss.record.TestingSchemaGetter; +import org.apache.fluss.server.kv.KvManager; +import org.apache.fluss.server.kv.KvTablet; +import org.apache.fluss.server.kv.autoinc.AutoIncrementManager; +import org.apache.fluss.server.kv.autoinc.TestingSequenceGeneratorFactory; +import org.apache.fluss.server.kv.rowmerger.RowMerger; +import org.apache.fluss.server.log.LogTablet; +import org.apache.fluss.server.log.LogTestUtils; +import org.apache.fluss.server.metrics.group.TestingMetricGroups; +import org.apache.fluss.server.zk.NOPErrorHandler; +import org.apache.fluss.shaded.arrow.org.apache.arrow.memory.RootAllocator; +import org.apache.fluss.utils.clock.ManualClock; +import org.apache.fluss.utils.concurrent.FlussScheduler; + +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.io.TempDir; + +import java.io.File; +import java.time.Duration; +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.TimeUnit; + +import static org.apache.fluss.compression.ArrowCompressionInfo.DEFAULT_COMPRESSION; +import static org.apache.fluss.record.TestData.DATA1_SCHEMA_PK; +import static org.apache.fluss.testutils.common.CommonTestUtils.retry; +import static org.assertj.core.api.Assertions.assertThat; +import static org.assertj.core.api.Assertions.assertThatThrownBy; + +/** Tests for {@link ScannerManager}. */ +class ScannerManagerTest { + + private static final short SCHEMA_ID = 1; + + @TempDir File tempLogDir; + @TempDir File tmpKvDir; + + private final Configuration conf = new Configuration(); + private final KvRecordTestUtils.KvRecordBatchFactory kvRecordBatchFactory = + KvRecordTestUtils.KvRecordBatchFactory.of(SCHEMA_ID); + private final KvRecordTestUtils.KvRecordFactory kvRecordFactory = + KvRecordTestUtils.KvRecordFactory.of(TestData.DATA1_ROW_TYPE); + + private ManualClock clock; + private FlussScheduler scheduler; + private LogTablet logTablet; + private KvTablet kvTablet; + + @BeforeEach + void setUp() throws Exception { + clock = new ManualClock(0); + scheduler = new FlussScheduler(1); + scheduler.startup(); + + PhysicalTablePath physicalTablePath = PhysicalTablePath.of(TablePath.of("testDb", "t1")); + TestingSchemaGetter schemaGetter = + new TestingSchemaGetter(new SchemaInfo(DATA1_SCHEMA_PK, SCHEMA_ID)); + + File logTabletDir = + LogTestUtils.makeRandomLogTabletDir( + tempLogDir, + physicalTablePath.getDatabaseName(), + 0L, + physicalTablePath.getTableName()); + logTablet = + LogTablet.create( + physicalTablePath, + logTabletDir, + conf, + TestingMetricGroups.TABLET_SERVER_METRICS, + 0, + new FlussScheduler(1), + LogFormat.ARROW, + 1, + true, + org.apache.fluss.utils.clock.SystemClock.getInstance(), + true); + + TableBucket tableBucket = logTablet.getTableBucket(); + TableConfig tableConf = new TableConfig(new Configuration()); + RowMerger rowMerger = RowMerger.create(tableConf, KvFormat.COMPACTED, schemaGetter); + AutoIncrementManager autoIncrementManager = + new AutoIncrementManager( + schemaGetter, + physicalTablePath.getTablePath(), + tableConf, + new TestingSequenceGeneratorFactory()); + + kvTablet = + KvTablet.create( + physicalTablePath, + tableBucket, + logTablet, + tmpKvDir, + conf, + TestingMetricGroups.TABLET_SERVER_METRICS, + new RootAllocator(Long.MAX_VALUE), + new TestingMemorySegmentPool(10 * 1024), + KvFormat.COMPACTED, + rowMerger, + DEFAULT_COMPRESSION, + schemaGetter, + tableConf.getChangelogImage(), + KvManager.getDefaultRateLimiter(), + autoIncrementManager); + } + + @AfterEach + void tearDown() throws Exception { + if (kvTablet != null) { + kvTablet.close(); + } + if (logTablet != null) { + logTablet.close(); + } + if (scheduler != null) { + scheduler.shutdown(); + } + } + + private ScannerManager createManager() { + Configuration c = new Configuration(); + c.set(ConfigOptions.KV_SCANNER_TTL, Duration.ofHours(1)); + c.set(ConfigOptions.KV_SCANNER_EXPIRATION_INTERVAL, Duration.ofHours(1)); + c.set(ConfigOptions.KV_SCANNER_MAX_PER_BUCKET, 8); + c.set(ConfigOptions.KV_SCANNER_MAX_PER_SERVER, 200); + return new ScannerManager(c, scheduler, clock); + } + + private ScannerManager createManager(int maxPerBucket, int maxPerServer) { + Configuration c = new Configuration(); + c.set(ConfigOptions.KV_SCANNER_TTL, Duration.ofHours(1)); + c.set(ConfigOptions.KV_SCANNER_EXPIRATION_INTERVAL, Duration.ofHours(1)); + c.set(ConfigOptions.KV_SCANNER_MAX_PER_BUCKET, maxPerBucket); + c.set(ConfigOptions.KV_SCANNER_MAX_PER_SERVER, maxPerServer); + return new ScannerManager(c, scheduler, clock); + } + + private ScannerManager createManagerWithShortTtl(long ttlMs, long expirationIntervalMs) { + Configuration c = new Configuration(); + c.set(ConfigOptions.KV_SCANNER_TTL, Duration.ofMillis(ttlMs)); + c.set( + ConfigOptions.KV_SCANNER_EXPIRATION_INTERVAL, + Duration.ofMillis(expirationIntervalMs)); + c.set(ConfigOptions.KV_SCANNER_MAX_PER_BUCKET, 8); + c.set(ConfigOptions.KV_SCANNER_MAX_PER_SERVER, 200); + return new ScannerManager(c, scheduler, clock); + } + + /** Opens a scanner directly via KvTablet and registers it; mirrors Replica#openScan. */ + private ScannerContext openAndRegister(ScannerManager manager) throws Exception { + ScannerContext ctx = + kvTablet.openScan(java.util.UUID.randomUUID().toString(), -1L, clock.milliseconds()) + .getContext(); + if (ctx == null) { + return null; + } + try { + manager.register(ctx); + } catch (RuntimeException e) { + ctx.close(); + throw e; + } + return ctx; + } + + private void putAndFlush(int count) throws Exception { + List rows = new ArrayList<>(); + for (int i = 0; i < count; i++) { + rows.add( + kvRecordFactory.ofRecord( + String.valueOf(i).getBytes(), new Object[] {i, "v" + i})); + } + kvTablet.putAsLeader(kvRecordBatchFactory.ofRecords(rows), null); + kvTablet.flush(Long.MAX_VALUE, NOPErrorHandler.INSTANCE); + } + + @Test + void testCreateScanner_emptyBucket_returnsNull() throws Exception { + try (ScannerManager manager = createManager()) { + ScannerContext context = openAndRegister(manager); + assertThat(context).isNull(); + assertThat(manager.activeScannerCount()).isEqualTo(0); + } + } + + @Test + void testCreateAndRemoveScanner() throws Exception { + putAndFlush(3); + try (ScannerManager manager = createManager()) { + TableBucket tableBucket = kvTablet.getTableBucket(); + + ScannerContext context = openAndRegister(manager); + assertThat(context).isNotNull(); + assertThat(manager.activeScannerCount()).isEqualTo(1); + assertThat(manager.activeScannerCountForBucket(tableBucket)).isEqualTo(1); + + manager.removeScanner(context); + assertThat(manager.activeScannerCount()).isEqualTo(0); + assertThat(manager.activeScannerCountForBucket(tableBucket)).isEqualTo(0); + } + } + + @Test + void testGetScanner_doesNotRefreshLastAccessTime() throws Exception { + putAndFlush(3); + try (ScannerManager manager = createManager()) { + ScannerContext context = openAndRegister(manager); + assertThat(context).isNotNull(); + byte[] scannerId = context.getScannerId(); + + clock.advanceTime(5000, TimeUnit.MILLISECONDS); + ScannerContext fetched = manager.getScanner(scannerId); + assertThat(fetched).isSameAs(context); + + assertThat(context.isExpired(1000L, clock.milliseconds())).isTrue(); + + manager.removeScanner(context); + } + } + + @Test + void testMarkAccessed_refreshesLastAccessTime() throws Exception { + putAndFlush(3); + try (ScannerManager manager = createManager()) { + ScannerContext context = openAndRegister(manager); + assertThat(context).isNotNull(); + + clock.advanceTime(5000, TimeUnit.MILLISECONDS); + manager.markAccessed(context); + + assertThat(context.isExpired(3_600_000L, clock.milliseconds())).isFalse(); + + manager.removeScanner(context); + } + } + + @Test + void testTryAcquireForUse_serialisesConcurrentClaims() throws Exception { + putAndFlush(3); + try (ScannerManager manager = createManager()) { + ScannerContext context = openAndRegister(manager); + assertThat(context).isNotNull(); + + assertThat(context.tryAcquireForUse()).isTrue(); + assertThat(context.tryAcquireForUse()).isFalse(); + + context.releaseAfterUse(); + + assertThat(context.tryAcquireForUse()).isTrue(); + context.releaseAfterUse(); + + manager.removeScanner(context); + } + } + + @Test + void testTryAcquireForUse_exactlyOneWinnerUnderContention() throws Exception { + putAndFlush(3); + try (ScannerManager manager = createManager()) { + ScannerContext context = openAndRegister(manager); + assertThat(context).isNotNull(); + + int threadCount = 16; + java.util.concurrent.CountDownLatch start = new java.util.concurrent.CountDownLatch(1); + java.util.concurrent.atomic.AtomicInteger winners = + new java.util.concurrent.atomic.AtomicInteger(0); + java.util.concurrent.ExecutorService pool = + java.util.concurrent.Executors.newFixedThreadPool(threadCount); + try { + for (int i = 0; i < threadCount; i++) { + pool.submit( + () -> { + try { + start.await(); + } catch (InterruptedException ie) { + Thread.currentThread().interrupt(); + return; + } + if (context.tryAcquireForUse()) { + winners.incrementAndGet(); + } + }); + } + start.countDown(); + pool.shutdown(); + assertThat(pool.awaitTermination(10, TimeUnit.SECONDS)).isTrue(); + } finally { + pool.shutdownNow(); + } + + assertThat(winners.get()).isEqualTo(1); + context.releaseAfterUse(); + manager.removeScanner(context); + } + } + + @Test + void testCheckIteratorStatus_healthyIteratorIsNoOp() throws Exception { + putAndFlush(3); + try (ScannerManager manager = createManager()) { + ScannerContext context = openAndRegister(manager); + assertThat(context).isNotNull(); + + while (context.isValid()) { + context.advance(); + } + + context.checkIteratorStatus(); + + manager.removeScanner(context); + } + } + + @Test + void testTtlEviction() throws Exception { + putAndFlush(3); + ScannerManager manager = createManagerWithShortTtl(200, 200); + try { + ScannerContext context = openAndRegister(manager); + assertThat(context).isNotNull(); + byte[] scannerId = context.getScannerId(); + + clock.advanceTime(500, TimeUnit.MILLISECONDS); + + retry( + Duration.ofSeconds(10), + () -> assertThat(manager.activeScannerCount()).isEqualTo(0)); + assertThat(manager.getScanner(scannerId)).isNull(); + assertThat(manager.isRecentlyExpired(scannerId)).isTrue(); + } finally { + manager.close(); + } + } + + @Test + void testPerBucketLimit() throws Exception { + putAndFlush(3); + try (ScannerManager manager = createManager(2, 200)) { + TableBucket tableBucket = kvTablet.getTableBucket(); + + ScannerContext ctx1 = openAndRegister(manager); + ScannerContext ctx2 = openAndRegister(manager); + assertThat(manager.activeScannerCountForBucket(tableBucket)).isEqualTo(2); + + assertThatThrownBy(() -> openAndRegister(manager)) + .isInstanceOf(TooManyScannersException.class); + + assertThat(manager.activeScannerCountForBucket(tableBucket)).isEqualTo(2); + + manager.removeScanner(ctx1); + manager.removeScanner(ctx2); + } + } + + @Test + void testPerServerLimit() throws Exception { + putAndFlush(3); + try (ScannerManager manager = createManager(8, 2)) { + ScannerContext ctx1 = openAndRegister(manager); + ScannerContext ctx2 = openAndRegister(manager); + assertThat(manager.activeScannerCount()).isEqualTo(2); + + assertThatThrownBy(() -> openAndRegister(manager)) + .isInstanceOf(TooManyScannersException.class); + + assertThat(manager.activeScannerCount()).isEqualTo(2); + + manager.removeScanner(ctx1); + manager.removeScanner(ctx2); + } + } + + @Test + void testCloseScannersForBucket() throws Exception { + putAndFlush(3); + try (ScannerManager manager = createManager()) { + TableBucket tableBucket = kvTablet.getTableBucket(); + + openAndRegister(manager); + openAndRegister(manager); + assertThat(manager.activeScannerCount()).isEqualTo(2); + + manager.closeScannersForBucket(tableBucket); + + assertThat(manager.activeScannerCount()).isEqualTo(0); + assertThat(manager.activeScannerCountForBucket(tableBucket)).isEqualTo(0); + } + } + + @Test + void testCloseScannersForBucket_marksRecentlyExpired() throws Exception { + putAndFlush(3); + try (ScannerManager manager = createManager()) { + TableBucket tableBucket = kvTablet.getTableBucket(); + + ScannerContext ctx = openAndRegister(manager); + assertThat(ctx).isNotNull(); + byte[] scannerId = ctx.getScannerId(); + + manager.closeScannersForBucket(tableBucket); + + assertThat(manager.getScanner(scannerId)).isNull(); + assertThat(manager.isRecentlyExpired(scannerId)).isTrue(); + } + } + + @Test + void testShutdown_closesAllScanners() throws Exception { + putAndFlush(3); + ScannerManager manager = createManager(); + openAndRegister(manager); + openAndRegister(manager); + assertThat(manager.activeScannerCount()).isEqualTo(2); + + manager.close(); + + assertThat(manager.activeScannerCount()).isEqualTo(0); + } + + @Test + void testTryAcquireForUse_rejectsAfterClose() throws Exception { + putAndFlush(3); + try (ScannerManager manager = createManager()) { + ScannerContext ctx = openAndRegister(manager); + assertThat(ctx).isNotNull(); + + manager.removeScanner(ctx); + + assertThat(ctx.tryAcquireForUse()).isFalse(); + } + } + + /** close() force-clears inUse and completes immediately even if inUse=true. */ + @Test + void testClose_notWaitForInUse() throws Exception { + putAndFlush(3); + try (ScannerManager manager = createManager()) { + ScannerContext ctx = openAndRegister(manager); + assertThat(ctx).isNotNull(); + + assertThat(ctx.tryAcquireForUse()).isTrue(); + + // close() should complete immediately despite inUse=true + ctx.close(); + + // after close, tryAcquireForUse must be rejected + assertThat(ctx.tryAcquireForUse()).isFalse(); + + manager.removeScanner(ctx); + } + } +} diff --git a/fluss-server/src/test/java/org/apache/fluss/server/replica/ReplicaManagerTest.java b/fluss-server/src/test/java/org/apache/fluss/server/replica/ReplicaManagerTest.java index 5120bf6a6d..fa502f928f 100644 --- a/fluss-server/src/test/java/org/apache/fluss/server/replica/ReplicaManagerTest.java +++ b/fluss-server/src/test/java/org/apache/fluss/server/replica/ReplicaManagerTest.java @@ -37,6 +37,7 @@ import org.apache.fluss.record.DefaultValueRecordBatch; import org.apache.fluss.record.KvRecord; import org.apache.fluss.record.KvRecordBatch; +import org.apache.fluss.record.KvRecordTestUtils; import org.apache.fluss.record.LogRecord; import org.apache.fluss.record.LogRecordBatch; import org.apache.fluss.record.LogRecordReadContext; @@ -62,6 +63,7 @@ import org.apache.fluss.server.entity.NotifyLeaderAndIsrResultForBucket; import org.apache.fluss.server.entity.StopReplicaData; import org.apache.fluss.server.entity.StopReplicaResultForBucket; +import org.apache.fluss.server.kv.KvTablet; import org.apache.fluss.server.kv.rocksdb.RocksDBKv; import org.apache.fluss.server.kv.snapshot.CompletedSnapshot; import org.apache.fluss.server.log.FetchParams; @@ -72,6 +74,7 @@ import org.apache.fluss.server.metadata.ServerInfo; import org.apache.fluss.server.metadata.TableMetadata; import org.apache.fluss.server.testutils.KvTestUtils; +import org.apache.fluss.server.zk.NOPErrorHandler; import org.apache.fluss.server.zk.data.LeaderAndIsr; import org.apache.fluss.server.zk.data.TableRegistration; import org.apache.fluss.testutils.DataTestUtils; @@ -2374,4 +2377,40 @@ private void assertUpdateMetadataEquals( } }); } + + @Test + void testStopReplicas_closesScanners() throws Exception { + TableBucket tb = new TableBucket(DATA1_TABLE_ID_PK, 0); + makeKvTableAsLeader(DATA1_TABLE_ID_PK, DATA1_TABLE_PATH_PK, tb.getBucket()); + + KvTablet kvTablet = + kvManager + .getKv(tb) + .orElseThrow(() -> new IllegalStateException("KvTablet not found")); + KvRecordTestUtils.KvRecordBatchFactory batchFactory = + KvRecordTestUtils.KvRecordBatchFactory.of(DEFAULT_SCHEMA_ID); + KvRecordTestUtils.KvRecordFactory recordFactory = + KvRecordTestUtils.KvRecordFactory.of(DATA1_ROW_TYPE); + kvTablet.putAsLeader( + batchFactory.ofRecords( + Collections.singletonList( + recordFactory.ofRecord("k1".getBytes(), new Object[] {1, "v1"}))), + null); + kvTablet.flush(Long.MAX_VALUE, NOPErrorHandler.INSTANCE); + + Replica replica = replicaManager.getReplicaOrException(tb); + scannerManager.createScanner(replica, null); + assertThat(scannerManager.activeScannerCount()).isEqualTo(1); + + CompletableFuture> future = new CompletableFuture<>(); + replicaManager.stopReplicas( + INITIAL_COORDINATOR_EPOCH, + Collections.singletonList( + new StopReplicaData( + tb, false, false, INITIAL_COORDINATOR_EPOCH, INITIAL_LEADER_EPOCH)), + future::complete); + future.get(); + + assertThat(scannerManager.activeScannerCount()).isEqualTo(0); + } } diff --git a/fluss-server/src/test/java/org/apache/fluss/server/replica/ReplicaTestBase.java b/fluss-server/src/test/java/org/apache/fluss/server/replica/ReplicaTestBase.java index f6b2e01c87..a9041acbda 100644 --- a/fluss-server/src/test/java/org/apache/fluss/server/replica/ReplicaTestBase.java +++ b/fluss-server/src/test/java/org/apache/fluss/server/replica/ReplicaTestBase.java @@ -37,6 +37,7 @@ import org.apache.fluss.server.coordinator.TestCoordinatorGateway; import org.apache.fluss.server.entity.NotifyLeaderAndIsrData; import org.apache.fluss.server.kv.KvManager; +import org.apache.fluss.server.kv.scan.ScannerManager; import org.apache.fluss.server.kv.snapshot.CompletedKvSnapshotCommitter; import org.apache.fluss.server.kv.snapshot.CompletedSnapshot; import org.apache.fluss.server.kv.snapshot.KvSnapshotDataDownloader; @@ -140,6 +141,7 @@ public class ReplicaTestBase { protected LogManager logManager; protected KvManager kvManager; protected ReplicaManager replicaManager; + protected ScannerManager scannerManager; protected RpcClient rpcClient; protected Configuration conf; protected TabletServerMetadataCache serverMetadataCache; @@ -308,6 +310,9 @@ protected long registerTableInZkClient( protected ReplicaManager buildReplicaManager(CoordinatorGateway coordinatorGateway) throws Exception { + if (scannerManager == null) { + scannerManager = new ScannerManager(conf, scheduler); + } return new ReplicaManager( conf, scheduler, @@ -323,6 +328,7 @@ protected ReplicaManager buildReplicaManager(CoordinatorGateway coordinatorGatew TestingMetricGroups.TABLET_SERVER_METRICS, TestingMetricGroups.USER_METRICS, remoteLogManager, + scannerManager, manualClock, ioExecutor); } @@ -351,6 +357,10 @@ void tearDown() throws Exception { replicaManager.shutdown(); } + if (scannerManager != null) { + scannerManager.close(); + } + if (rpcClient != null) { rpcClient.close(); } @@ -502,7 +512,8 @@ private Replica makeReplica( metricGroup, DATA1_TABLE_INFO, manualClock, - remoteLogManager); + remoteLogManager, + scannerManager); } private void initRemoteLogEnv() throws Exception { diff --git a/fluss-server/src/test/java/org/apache/fluss/server/replica/fetcher/ReplicaFetcherThreadTest.java b/fluss-server/src/test/java/org/apache/fluss/server/replica/fetcher/ReplicaFetcherThreadTest.java index ff089cb4a9..b68cbe6b76 100644 --- a/fluss-server/src/test/java/org/apache/fluss/server/replica/fetcher/ReplicaFetcherThreadTest.java +++ b/fluss-server/src/test/java/org/apache/fluss/server/replica/fetcher/ReplicaFetcherThreadTest.java @@ -33,6 +33,7 @@ import org.apache.fluss.server.entity.NotifyLeaderAndIsrData; import org.apache.fluss.server.entity.NotifyLeaderAndIsrResultForBucket; import org.apache.fluss.server.kv.KvManager; +import org.apache.fluss.server.kv.scan.ScannerManager; import org.apache.fluss.server.kv.snapshot.TestingCompletedKvSnapshotCommitter; import org.apache.fluss.server.log.LogManager; import org.apache.fluss.server.metadata.TabletServerMetadataCache; @@ -566,6 +567,7 @@ public TestingReplicaManager( NOPErrorHandler.INSTANCE, serverMetricGroup, USER_METRICS, + new ScannerManager(conf, scheduler), clock, ioExecutor); } diff --git a/fluss-server/src/test/java/org/apache/fluss/server/tablet/TabletServiceITCase.java b/fluss-server/src/test/java/org/apache/fluss/server/tablet/TabletServiceITCase.java index 17b436e1ac..8adb942daf 100644 --- a/fluss-server/src/test/java/org/apache/fluss/server/tablet/TabletServiceITCase.java +++ b/fluss-server/src/test/java/org/apache/fluss/server/tablet/TabletServiceITCase.java @@ -51,6 +51,9 @@ import org.apache.fluss.rpc.messages.PbPutKvRespForBucket; import org.apache.fluss.rpc.messages.ProduceLogResponse; import org.apache.fluss.rpc.messages.PutKvResponse; +import org.apache.fluss.rpc.messages.ScanKvRequest; +import org.apache.fluss.rpc.messages.ScanKvResponse; +import org.apache.fluss.rpc.messages.StopReplicaRequest; import org.apache.fluss.rpc.protocol.Errors; import org.apache.fluss.server.entity.NotifyLeaderAndIsrData; import org.apache.fluss.server.entity.NotifyLeaderAndIsrResultForBucket; @@ -113,6 +116,7 @@ import static org.apache.fluss.server.testutils.RpcMessageTestUtils.newPutKvRequest; import static org.apache.fluss.server.utils.ServerRpcMessageUtils.getNotifyLeaderAndIsrResponseData; import static org.apache.fluss.server.utils.ServerRpcMessageUtils.makeNotifyBucketLeaderAndIsr; +import static org.apache.fluss.server.utils.ServerRpcMessageUtils.makeStopBucketReplica; import static org.apache.fluss.server.utils.ServerRpcMessageUtils.makeUpdateMetadataRequest; import static org.apache.fluss.testutils.DataTestUtils.compactedRow; import static org.apache.fluss.testutils.DataTestUtils.genKvRecordBatch; @@ -1075,4 +1079,349 @@ void testLookupWithInsertIfNotExistsAutoIncrement() throws Exception { assertThat(existingRow.getLong(1)).isEqualTo(1L); assertThat(newRow.getLong(1)).isEqualTo(3L); } + + @Test + void testScanKv_newScan_happyPath() throws Exception { + long tableId = + createTable( + FLUSS_CLUSTER_EXTENSION, DATA1_TABLE_PATH_PK, DATA1_TABLE_DESCRIPTOR_PK); + TableBucket tb = new TableBucket(tableId, 0); + FLUSS_CLUSTER_EXTENSION.waitUntilAllReplicaReady(tb); + int leader = FLUSS_CLUSTER_EXTENSION.waitAndGetLeader(tb); + TabletServerGateway leaderGateWay = + FLUSS_CLUSTER_EXTENSION.newTabletServerClientForNode(leader); + + assertPutKvResponse( + leaderGateWay + .putKv( + newPutKvRequest( + tableId, 0, 1, genKvRecordBatch(DATA_1_WITH_KEY_AND_VALUE))) + .get()); + FLUSS_CLUSTER_EXTENSION.triggerAndWaitSnapshot(tb); + + ScanKvResponse response = + leaderGateWay.scanKv(newScanKvOpenRequest(tableId, 0, 1024 * 1024)).get(); + + assertThat(response.hasErrorCode()).isFalse(); + assertThat(response.getScannerId()).isNotEmpty(); + assertThat(response.isHasMoreResults()).isFalse(); + assertThat(response.hasLogOffset()).isTrue(); + assertThat(response.getLogOffset()).isGreaterThanOrEqualTo(0L); + assertThat(response.hasRecords()).isTrue(); + DefaultValueRecordBatch batch = DefaultValueRecordBatch.pointToBytes(response.getRecords()); + assertThat(batch.getRecordCount()).isEqualTo(2); + } + + @Test + void testScanKv_multiBatchContinuation() throws Exception { + long tableId = + createTable( + FLUSS_CLUSTER_EXTENSION, DATA1_TABLE_PATH_PK, DATA1_TABLE_DESCRIPTOR_PK); + TableBucket tb = new TableBucket(tableId, 0); + FLUSS_CLUSTER_EXTENSION.waitUntilAllReplicaReady(tb); + int leader = FLUSS_CLUSTER_EXTENSION.waitAndGetLeader(tb); + TabletServerGateway leaderGateWay = + FLUSS_CLUSTER_EXTENSION.newTabletServerClientForNode(leader); + + assertPutKvResponse( + leaderGateWay + .putKv( + newPutKvRequest( + tableId, 0, 1, genKvRecordBatch(DATA_1_WITH_KEY_AND_VALUE))) + .get()); + FLUSS_CLUSTER_EXTENSION.triggerAndWaitSnapshot(tb); + + // batch_size_bytes=1 forces one record per batch via the appendedAny progress guard. + ScanKvResponse first = leaderGateWay.scanKv(newScanKvOpenRequest(tableId, 0, 1)).get(); + assertThat(first.hasErrorCode()).isFalse(); + byte[] scannerId = first.getScannerId(); + long firstLogOffset = first.getLogOffset(); + int totalRecords = + DefaultValueRecordBatch.pointToBytes(first.getRecords()).getRecordCount(); + + ScanKvResponse current = first; + int seq = 0; + while (current.isHasMoreResults()) { + current = + leaderGateWay + .scanKv(newScanKvContinueRequest(scannerId, seq++, /*batch=*/ 1)) + .get(); + assertThat(current.hasErrorCode()).isFalse(); + assertThat(current.getScannerId()).isEqualTo(scannerId); + // log_offset is carried only on the open response. + assertThat(current.hasLogOffset()).isFalse(); + if (current.hasRecords()) { + totalRecords += + DefaultValueRecordBatch.pointToBytes(current.getRecords()).getRecordCount(); + } + } + assertThat(totalRecords).isEqualTo(2); + // After draining, the session is auto-closed. + ScanKvResponse afterDrain = + leaderGateWay.scanKv(newScanKvContinueRequest(scannerId, seq, 1024)).get(); + assertThat(afterDrain.getErrorCode()).isEqualTo(Errors.UNKNOWN_SCANNER_ID.code()); + assertThat(firstLogOffset).isGreaterThanOrEqualTo(0L); + } + + @Test + void testScanKv_explicitClose() throws Exception { + long tableId = + createTable( + FLUSS_CLUSTER_EXTENSION, DATA1_TABLE_PATH_PK, DATA1_TABLE_DESCRIPTOR_PK); + TableBucket tb = new TableBucket(tableId, 0); + FLUSS_CLUSTER_EXTENSION.waitUntilAllReplicaReady(tb); + int leader = FLUSS_CLUSTER_EXTENSION.waitAndGetLeader(tb); + TabletServerGateway leaderGateWay = + FLUSS_CLUSTER_EXTENSION.newTabletServerClientForNode(leader); + + assertPutKvResponse( + leaderGateWay + .putKv( + newPutKvRequest( + tableId, 0, 1, genKvRecordBatch(DATA_1_WITH_KEY_AND_VALUE))) + .get()); + FLUSS_CLUSTER_EXTENSION.triggerAndWaitSnapshot(tb); + + // Tiny batch keeps the session open after the first response. + ScanKvResponse open = leaderGateWay.scanKv(newScanKvOpenRequest(tableId, 0, 1)).get(); + assertThat(open.hasErrorCode()).isFalse(); + assertThat(open.isHasMoreResults()).isTrue(); + byte[] scannerId = open.getScannerId(); + + ScanKvResponse close = leaderGateWay.scanKv(newScanKvCloseRequest(scannerId)).get(); + assertThat(close.hasErrorCode()).isFalse(); + assertThat(close.getScannerId()).isEqualTo(scannerId); + assertThat(close.isHasMoreResults()).isFalse(); + assertThat(close.hasRecords()).isFalse(); + } + + @Test + void testScanKv_closeOnAlreadyGoneScannerIsNoOp() throws Exception { + long tableId = + createTable( + FLUSS_CLUSTER_EXTENSION, DATA1_TABLE_PATH_PK, DATA1_TABLE_DESCRIPTOR_PK); + TableBucket tb = new TableBucket(tableId, 0); + FLUSS_CLUSTER_EXTENSION.waitUntilAllReplicaReady(tb); + int leader = FLUSS_CLUSTER_EXTENSION.waitAndGetLeader(tb); + TabletServerGateway leaderGateWay = + FLUSS_CLUSTER_EXTENSION.newTabletServerClientForNode(leader); + + assertPutKvResponse( + leaderGateWay + .putKv( + newPutKvRequest( + tableId, 0, 1, genKvRecordBatch(DATA_1_WITH_KEY_AND_VALUE))) + .get()); + FLUSS_CLUSTER_EXTENSION.triggerAndWaitSnapshot(tb); + + ScanKvResponse open = leaderGateWay.scanKv(newScanKvOpenRequest(tableId, 0, 1)).get(); + byte[] scannerId = open.getScannerId(); + + leaderGateWay.scanKv(newScanKvCloseRequest(scannerId)).get(); + + // Second close on the already-gone session must be a benign no-op. + ScanKvResponse second = leaderGateWay.scanKv(newScanKvCloseRequest(scannerId)).get(); + assertThat(second.hasErrorCode()).isFalse(); + assertThat(second.getScannerId()).isEqualTo(scannerId); + assertThat(second.isHasMoreResults()).isFalse(); + } + + @Test + void testScanKv_unknownScannerId() throws Exception { + long tableId = + createTable( + FLUSS_CLUSTER_EXTENSION, DATA1_TABLE_PATH_PK, DATA1_TABLE_DESCRIPTOR_PK); + TableBucket tb = new TableBucket(tableId, 0); + FLUSS_CLUSTER_EXTENSION.waitUntilAllReplicaReady(tb); + int leader = FLUSS_CLUSTER_EXTENSION.waitAndGetLeader(tb); + TabletServerGateway leaderGateWay = + FLUSS_CLUSTER_EXTENSION.newTabletServerClientForNode(leader); + + byte[] fakeId = "not-a-real-scanner-id".getBytes(); + ScanKvResponse response = + leaderGateWay.scanKv(newScanKvContinueRequest(fakeId, 0, 1024)).get(); + assertThat(response.getErrorCode()).isEqualTo(Errors.UNKNOWN_SCANNER_ID.code()); + assertThat(response.getErrorMessage()).contains("Unknown scanner ID"); + } + + @Test + void testScanKv_expiredOnRecentlyEvicted() throws Exception { + long tableId = + createTable( + FLUSS_CLUSTER_EXTENSION, DATA1_TABLE_PATH_PK, DATA1_TABLE_DESCRIPTOR_PK); + TableBucket tb = new TableBucket(tableId, 0); + FLUSS_CLUSTER_EXTENSION.waitUntilAllReplicaReady(tb); + int leader = FLUSS_CLUSTER_EXTENSION.waitAndGetLeader(tb); + TabletServerGateway leaderGateWay = + FLUSS_CLUSTER_EXTENSION.newTabletServerClientForNode(leader); + + assertPutKvResponse( + leaderGateWay + .putKv( + newPutKvRequest( + tableId, 0, 1, genKvRecordBatch(DATA_1_WITH_KEY_AND_VALUE))) + .get()); + FLUSS_CLUSTER_EXTENSION.triggerAndWaitSnapshot(tb); + + ScanKvResponse open = leaderGateWay.scanKv(newScanKvOpenRequest(tableId, 0, 1)).get(); + assertThat(open.hasErrorCode()).isFalse(); + assertThat(open.isHasMoreResults()).isTrue(); + byte[] scannerId = open.getScannerId(); + + // stopReplica(delete=false) routes through closeScannersForBucket, which records + // the id in recentlyExpiredIds. + LeaderAndIsr la = FLUSS_CLUSTER_EXTENSION.waitLeaderAndIsrReady(tb); + stopReplicaWithLatestEpoch(leaderGateWay, tb, la); + + ScanKvResponse cont = + leaderGateWay.scanKv(newScanKvContinueRequest(scannerId, 0, 1024)).get(); + assertThat(cont.getErrorCode()).isEqualTo(Errors.SCANNER_EXPIRED.code()); + assertThat(cont.getErrorMessage()).contains("expired"); + } + + @Test + void testScanKv_leadershipFlipMidScan() throws Exception { + long tableId = + createTable( + FLUSS_CLUSTER_EXTENSION, DATA1_TABLE_PATH_PK, DATA1_TABLE_DESCRIPTOR_PK); + TableBucket tb = new TableBucket(tableId, 0); + FLUSS_CLUSTER_EXTENSION.waitUntilAllReplicaReady(tb); + int leader = FLUSS_CLUSTER_EXTENSION.waitAndGetLeader(tb); + TabletServerGateway leaderGateWay = + FLUSS_CLUSTER_EXTENSION.newTabletServerClientForNode(leader); + + assertPutKvResponse( + leaderGateWay + .putKv( + newPutKvRequest( + tableId, 0, 1, genKvRecordBatch(DATA_1_WITH_KEY_AND_VALUE))) + .get()); + FLUSS_CLUSTER_EXTENSION.triggerAndWaitSnapshot(tb); + + ScanKvResponse open = leaderGateWay.scanKv(newScanKvOpenRequest(tableId, 0, 1)).get(); + assertThat(open.hasErrorCode()).isFalse(); + assertThat(open.isHasMoreResults()).isTrue(); + byte[] scannerId = open.getScannerId(); + + LeaderAndIsr la = FLUSS_CLUSTER_EXTENSION.waitLeaderAndIsrReady(tb); + stopReplicaWithLatestEpoch(leaderGateWay, tb, la); + + ScanKvResponse cont = + leaderGateWay.scanKv(newScanKvContinueRequest(scannerId, 0, 1024)).get(); + assertThat(cont.getErrorCode()) + .isIn(Errors.SCANNER_EXPIRED.code(), Errors.NOT_LEADER_OR_FOLLOWER.code()); + } + + @Test + void testScanKv_callSeqIdMismatch() throws Exception { + long tableId = + createTable( + FLUSS_CLUSTER_EXTENSION, DATA1_TABLE_PATH_PK, DATA1_TABLE_DESCRIPTOR_PK); + TableBucket tb = new TableBucket(tableId, 0); + FLUSS_CLUSTER_EXTENSION.waitUntilAllReplicaReady(tb); + int leader = FLUSS_CLUSTER_EXTENSION.waitAndGetLeader(tb); + TabletServerGateway leaderGateWay = + FLUSS_CLUSTER_EXTENSION.newTabletServerClientForNode(leader); + + assertPutKvResponse( + leaderGateWay + .putKv( + newPutKvRequest( + tableId, 0, 1, genKvRecordBatch(DATA_1_WITH_KEY_AND_VALUE))) + .get()); + FLUSS_CLUSTER_EXTENSION.triggerAndWaitSnapshot(tb); + + ScanKvResponse open = leaderGateWay.scanKv(newScanKvOpenRequest(tableId, 0, 1)).get(); + byte[] scannerId = open.getScannerId(); + + ScanKvResponse bad = + leaderGateWay.scanKv(newScanKvContinueRequest(scannerId, 5, 1024)).get(); + assertThat(bad.getErrorCode()).isEqualTo(Errors.INVALID_SCAN_REQUEST.code()); + assertThat(bad.getErrorMessage()).contains("Out-of-order"); + } + + @Test + void testScanKv_oversizeBatchSizeBytesIsClamped() throws Exception { + long tableId = + createTable( + FLUSS_CLUSTER_EXTENSION, DATA1_TABLE_PATH_PK, DATA1_TABLE_DESCRIPTOR_PK); + TableBucket tb = new TableBucket(tableId, 0); + FLUSS_CLUSTER_EXTENSION.waitUntilAllReplicaReady(tb); + int leader = FLUSS_CLUSTER_EXTENSION.waitAndGetLeader(tb); + TabletServerGateway leaderGateWay = + FLUSS_CLUSTER_EXTENSION.newTabletServerClientForNode(leader); + + assertPutKvResponse( + leaderGateWay + .putKv( + newPutKvRequest( + tableId, 0, 1, genKvRecordBatch(DATA_1_WITH_KEY_AND_VALUE))) + .get()); + FLUSS_CLUSTER_EXTENSION.triggerAndWaitSnapshot(tb); + + ScanKvResponse response = + leaderGateWay.scanKv(newScanKvOpenRequest(tableId, 0, Integer.MAX_VALUE)).get(); + assertThat(response.hasErrorCode()).isFalse(); + assertThat(response.hasRecords()).isTrue(); + assertThat(DefaultValueRecordBatch.pointToBytes(response.getRecords()).getRecordCount()) + .isEqualTo(2); + } + + @Test + void testScanKv_bucketScanReqAndScannerIdRejected() throws Exception { + long tableId = + createTable( + FLUSS_CLUSTER_EXTENSION, DATA1_TABLE_PATH_PK, DATA1_TABLE_DESCRIPTOR_PK); + TableBucket tb = new TableBucket(tableId, 0); + FLUSS_CLUSTER_EXTENSION.waitUntilAllReplicaReady(tb); + int leader = FLUSS_CLUSTER_EXTENSION.waitAndGetLeader(tb); + TabletServerGateway leaderGateWay = + FLUSS_CLUSTER_EXTENSION.newTabletServerClientForNode(leader); + + ScanKvRequest bad = new ScanKvRequest(); + bad.setBucketScanReq().setTableId(tableId).setBucketId(0); + bad.setScannerId("ambiguous".getBytes()); + bad.setBatchSizeBytes(1024); + ScanKvResponse response = leaderGateWay.scanKv(bad).get(); + assertThat(response.getErrorCode()).isEqualTo(Errors.INVALID_SCAN_REQUEST.code()); + assertThat(response.getErrorMessage()) + .contains("must not set both bucket_scan_req and scanner_id"); + } + + private static ScanKvRequest newScanKvOpenRequest(long tableId, int bucketId, int batchSize) { + ScanKvRequest req = new ScanKvRequest(); + req.setBucketScanReq().setTableId(tableId).setBucketId(bucketId); + req.setBatchSizeBytes(batchSize); + return req; + } + + private static ScanKvRequest newScanKvContinueRequest( + byte[] scannerId, int callSeqId, int batchSize) { + ScanKvRequest req = new ScanKvRequest(); + req.setScannerId(scannerId); + req.setCallSeqId(callSeqId); + req.setBatchSizeBytes(batchSize); + return req; + } + + private static ScanKvRequest newScanKvCloseRequest(byte[] scannerId) { + ScanKvRequest req = new ScanKvRequest(); + req.setScannerId(scannerId); + req.setCloseScanner(true); + return req; + } + + /** Sends stopReplica using the current coordinator epoch (vs. the helper's hardcoded 0). */ + private static void stopReplicaWithLatestEpoch( + TabletServerGateway leaderGateWay, TableBucket tb, LeaderAndIsr la) throws Exception { + leaderGateWay + .stopReplica( + new StopReplicaRequest() + .setCoordinatorEpoch(la.coordinatorEpoch()) + .addAllStopReplicasReqs( + Collections.singleton( + makeStopBucketReplica( + tb, false, false, la.leaderEpoch())))) + .get(); + } } diff --git a/website/docs/maintenance/configuration.md b/website/docs/maintenance/configuration.md index 48fc393b80..2f77fb5676 100644 --- a/website/docs/maintenance/configuration.md +++ b/website/docs/maintenance/configuration.md @@ -171,6 +171,11 @@ during the Fluss cluster working. | kv.rocksdb.bloom-filter.block-based-mode | Boolean | false | If true, RocksDB will use block-based filter instead of full filter, this only take effect when bloom filter is used. The default value is `false`. | | kv.rocksdb.shared-rate-limiter-bytes-per-sec | MemorySize | Long.MAX_VALUE | The bytes per second rate limit for RocksDB flush and compaction operations shared across all RocksDB instances on the TabletServer. The rate limiter is always enabled. The default value is Long.MAX_VALUE (effectively unlimited). Set to a lower value (e.g., 100MB) to limit the rate. This configuration can be updated dynamically without server restart. See [Updating Configs](operations/updating-configs.md) for more details. | | kv.recover.log-record-batch.max-size | MemorySize | 16mb | The max fetch size for fetching log to apply to kv during recovering kv. | +| kv.scanner.ttl | Duration | 10min | The time-to-live for an idle KV scanner session on the server. A scanner that has not received a request within this duration will be automatically expired and its resources released. The default value is 10 minutes. | +| kv.scanner.expiration-interval | Duration | 30s | The interval at which the server checks for and removes expired KV scanner sessions. The default value is 30 seconds. | +| kv.scanner.max-per-bucket | Integer | 8 | The maximum number of concurrent KV scanner sessions allowed per bucket. New scan requests that exceed this limit will be rejected with an error. The default value is 8. | +| kv.scanner.max-per-server | Integer | 200 | The maximum total number of concurrent KV scanner sessions allowed across all buckets on a single tablet server. New scan requests that exceed this limit will be rejected with an error. The default value is 200. | +| kv.scanner.max-batch-size | MemorySize | 10mb | Server-side cap on the per-batch payload size for KV full-scan responses. The effective batch size is min(client-requested batch_size_bytes, this value). Protects the tablet server from out-of-memory if a client passes an excessively large batch size. The default value is 10mb. | ## Metrics