Skip to content

[flink] introduce batched splits assignment mechanism#3288

Open
zuston wants to merge 2 commits intoapache:mainfrom
zuston:multisplitassign
Open

[flink] introduce batched splits assignment mechanism#3288
zuston wants to merge 2 commits intoapache:mainfrom
zuston:multisplitassign

Conversation

@zuston
Copy link
Copy Markdown
Member

@zuston zuston commented May 9, 2026

Purpose

Linked issue: close #3287

This PR introduces per-subtask batched split assignment to avoid RPC payloads exceeding size limits.

Brief change log

Tests

API and Format

Documentation

@zuston zuston marked this pull request as ready for review May 9, 2026 06:50
@luoyuxia luoyuxia requested a review from Copilot May 9, 2026 07:49
@luoyuxia
Copy link
Copy Markdown
Contributor

luoyuxia commented May 9, 2026

cc @loserwang1024

Copy link
Copy Markdown
Contributor

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

This PR introduces a per-reader batched split assignment mechanism in the Flink source enumerator to reduce the risk of exceeding Flink RPC message size limits when a reader is assigned a very large number of splits.

Changes:

  • Add a new connector option scan.split.assignment.batch-size and plumb it through builder/table sources into FlinkSourceEnumerator.
  • Update FlinkSourceEnumerator to partition per-reader split assignments into multiple assignSplits(...) calls.
  • Extend/adjust enumerator unit tests to account for batched assignment sequences and add validation coverage for invalid batch size.

Reviewed changes

Copilot reviewed 10 out of 10 changed files in this pull request and generated 4 comments.

Show a summary per file
File Description
fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/source/enumerator/FlinkSourceEnumeratorTest.java Adds tests for batched assignment behavior and invalid batch size; updates assignment assertions to aggregate multiple assignment events.
fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/source/FlussSourceBuilder.java Adds builder support for configuring split-assignment batch size and applies defaulting.
fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/source/FlussSource.java Threads batch-size into FlinkSource construction for DataStream usage.
fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/source/FlinkTableSource.java Threads batch-size through table source construction and copy methods.
fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/source/FlinkSource.java Adds/extends constructors and propagates batch-size into the enumerator.
fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/source/enumerator/FlinkSourceEnumerator.java Implements the actual per-reader batching in split assignment and validates the batch-size argument.
fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/source/ChangelogFlinkTableSource.java Threads batch-size into changelog table source runtime provider and copy.
fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/source/BinlogFlinkTableSource.java Threads batch-size into binlog table source runtime provider and copy.
fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/FlinkConnectorOptions.java Introduces the new scan.split.assignment.batch-size config option.
fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/catalog/FlinkTableFactory.java Reads the new option from table config, passes it into created sources, and exposes it as an optional option.

💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.

Comment on lines +146 to +149
public FlussSourceBuilder<OUT> setSplitPerAssignmentBatchSize(int splitPerAssignmentBatchSize) {
this.splitPerAssignmentBatchSize = splitPerAssignmentBatchSize;
return this;
}
Comment on lines 145 to 151
long partitionDiscoveryIntervalMs =
tableOptions
.get(FlinkConnectorOptions.SCAN_PARTITION_DISCOVERY_INTERVAL)
.toMillis();
int splitAssignmentBatchSize =
tableOptions.get(FlinkConnectorOptions.SCAN_SPLIT_ASSIGNMENT_BATCH_SIZE);

Comment on lines +153 to +160
public static final ConfigOption<Integer> SCAN_SPLIT_ASSIGNMENT_BATCH_SIZE =
ConfigOptions.key("scan.split.assignment.batch-size")
.intType()
.defaultValue(Integer.MAX_VALUE)
.withDescription(
"The maximum number of Fluss source splits assigned to a reader in "
+ "one assignment request. The value must be positive. By default, "
+ "all pending splits for a reader are assigned in one request.");
@@ -927,7 +1043,19 @@ private void assignPendingSplits(Set<Integer> pendingReaders) {
// Assign pending splits to readers
if (!incrementalAssignment.isEmpty()) {
LOG.info("Assigning splits to readers {}", incrementalAssignment);
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

Batched split assignment to per flink TM reader

3 participants