Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -150,6 +150,15 @@ public class FlinkConnectorOptions {
+ "as a small value would cause frequent requests and increase server load. In the future, "
+ "once list partitions is optimized, the default value of this parameter can be reduced.");

public static final ConfigOption<Integer> SCAN_SPLIT_ASSIGNMENT_BATCH_SIZE =
ConfigOptions.key("scan.split.assignment.batch-size")
.intType()
.defaultValue(Integer.MAX_VALUE)
.withDescription(
"The maximum number of Fluss source splits assigned to a reader in "
+ "one assignment request. The value must be positive. By default, "
+ "all pending splits for a reader are assigned in one request.");
Comment on lines +153 to +160

public static final ConfigOption<Boolean> SINK_IGNORE_DELETE =
ConfigOptions.key("sink.ignore-delete")
.booleanType()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -146,6 +146,8 @@ public DynamicTableSource createDynamicTableSource(Context context) {
tableOptions
.get(FlinkConnectorOptions.SCAN_PARTITION_DISCOVERY_INTERVAL)
.toMillis();
int splitAssignmentBatchSize =
tableOptions.get(FlinkConnectorOptions.SCAN_SPLIT_ASSIGNMENT_BATCH_SIZE);

LeaseContext leaseContext = LeaseContext.fromConf(tableOptions);
return new FlinkTableSource(
Expand All @@ -163,6 +165,7 @@ public DynamicTableSource createDynamicTableSource(Context context) {
tableOptions.get(FlinkConnectorOptions.LOOKUP_INSERT_IF_NOT_EXISTS),
cache,
partitionDiscoveryIntervalMs,
splitAssignmentBatchSize,
tableOptions.get(toFlinkOption(ConfigOptions.TABLE_DATALAKE_ENABLED)),
tableOptions.get(toFlinkOption(ConfigOptions.TABLE_MERGE_ENGINE)),
context.getCatalogTable().getOptions(),
Expand Down Expand Up @@ -234,6 +237,7 @@ public Set<ConfigOption<?>> optionalOptions() {
FlinkConnectorOptions.SCAN_STARTUP_MODE,
FlinkConnectorOptions.SCAN_STARTUP_TIMESTAMP,
FlinkConnectorOptions.SCAN_PARTITION_DISCOVERY_INTERVAL,
FlinkConnectorOptions.SCAN_SPLIT_ASSIGNMENT_BATCH_SIZE,
FlinkConnectorOptions.SCAN_KV_SNAPSHOT_LEASE_ID,
FlinkConnectorOptions.SCAN_KV_SNAPSHOT_LEASE_DURATION,
FlinkConnectorOptions.LOOKUP_ASYNC,
Expand Down Expand Up @@ -365,6 +369,8 @@ private DynamicTableSource createChangelogTableSource(
tableOptions
.get(FlinkConnectorOptions.SCAN_PARTITION_DISCOVERY_INTERVAL)
.toMillis();
int splitAssignmentBatchSize =
tableOptions.get(FlinkConnectorOptions.SCAN_SPLIT_ASSIGNMENT_BATCH_SIZE);

return new ChangelogFlinkTableSource(
TablePath.of(tableIdentifier.getDatabaseName(), baseTableName),
Expand All @@ -374,6 +380,7 @@ private DynamicTableSource createChangelogTableSource(
isStreamingMode,
startupOptions,
partitionDiscoveryIntervalMs,
splitAssignmentBatchSize,
catalogTableOptions);
}

Expand Down Expand Up @@ -412,6 +419,8 @@ private DynamicTableSource createBinlogTableSource(
tableOptions
.get(FlinkConnectorOptions.SCAN_PARTITION_DISCOVERY_INTERVAL)
.toMillis();
int splitAssignmentBatchSize =
tableOptions.get(FlinkConnectorOptions.SCAN_SPLIT_ASSIGNMENT_BATCH_SIZE);

return new BinlogFlinkTableSource(
TablePath.of(tableIdentifier.getDatabaseName(), baseTableName),
Expand All @@ -421,6 +430,7 @@ private DynamicTableSource createBinlogTableSource(
isStreamingMode,
startupOptions,
partitionDiscoveryIntervalMs,
splitAssignmentBatchSize,
catalogTableOptions);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@

import org.apache.fluss.client.initializer.OffsetsInitializer;
import org.apache.fluss.config.Configuration;
import org.apache.fluss.flink.FlinkConnectorOptions;
import org.apache.fluss.flink.source.deserializer.BinlogDeserializationSchema;
import org.apache.fluss.flink.source.reader.LeaseContext;
import org.apache.fluss.flink.utils.FlinkConnectorOptionsUtils;
Expand Down Expand Up @@ -51,6 +52,7 @@ public class BinlogFlinkTableSource implements ScanTableSource {
private final boolean streaming;
private final FlinkConnectorOptionsUtils.StartupOptions startupOptions;
private final long scanPartitionDiscoveryIntervalMs;
private final int splitPerAssignmentBatchSize;
private final Map<String, String> tableOptions;

// Projection pushdown
Expand All @@ -68,13 +70,36 @@ public BinlogFlinkTableSource(
FlinkConnectorOptionsUtils.StartupOptions startupOptions,
long scanPartitionDiscoveryIntervalMs,
Map<String, String> tableOptions) {
this(
tablePath,
flussConfig,
binlogOutputType,
isPartitioned,
streaming,
startupOptions,
scanPartitionDiscoveryIntervalMs,
FlinkConnectorOptions.SCAN_SPLIT_ASSIGNMENT_BATCH_SIZE.defaultValue(),
tableOptions);
}

public BinlogFlinkTableSource(
TablePath tablePath,
Configuration flussConfig,
org.apache.flink.table.types.logical.RowType binlogOutputType,
boolean isPartitioned,
boolean streaming,
FlinkConnectorOptionsUtils.StartupOptions startupOptions,
long scanPartitionDiscoveryIntervalMs,
int splitPerAssignmentBatchSize,
Map<String, String> tableOptions) {
this.tablePath = tablePath;
this.flussConfig = flussConfig;
this.binlogOutputType = binlogOutputType;
this.isPartitioned = isPartitioned;
this.streaming = streaming;
this.startupOptions = startupOptions;
this.scanPartitionDiscoveryIntervalMs = scanPartitionDiscoveryIntervalMs;
this.splitPerAssignmentBatchSize = splitPerAssignmentBatchSize;
this.tableOptions = tableOptions;

// Extract data columns from the 'before' nested ROW type (index 3)
Expand Down Expand Up @@ -129,6 +154,7 @@ public ScanRuntimeProvider getScanRuntimeProvider(ScanContext scanContext) {
null,
offsetsInitializer,
scanPartitionDiscoveryIntervalMs,
splitPerAssignmentBatchSize,
new BinlogDeserializationSchema(),
streaming,
partitionFilters,
Expand All @@ -148,6 +174,7 @@ public DynamicTableSource copy() {
streaming,
startupOptions,
scanPartitionDiscoveryIntervalMs,
splitPerAssignmentBatchSize,
tableOptions);
copy.producedDataType = producedDataType;
copy.projectedFields = projectedFields;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@

import org.apache.fluss.client.initializer.OffsetsInitializer;
import org.apache.fluss.config.Configuration;
import org.apache.fluss.flink.FlinkConnectorOptions;
import org.apache.fluss.flink.source.deserializer.ChangelogDeserializationSchema;
import org.apache.fluss.flink.source.reader.LeaseContext;
import org.apache.fluss.flink.utils.FlinkConnectorOptionsUtils;
Expand Down Expand Up @@ -57,6 +58,7 @@ public class ChangelogFlinkTableSource implements ScanTableSource {
private final boolean streaming;
private final FlinkConnectorOptionsUtils.StartupOptions startupOptions;
private final long scanPartitionDiscoveryIntervalMs;
private final int splitPerAssignmentBatchSize;
private final Map<String, String> tableOptions;

// Projection pushdown
Expand All @@ -81,6 +83,28 @@ public ChangelogFlinkTableSource(
FlinkConnectorOptionsUtils.StartupOptions startupOptions,
long scanPartitionDiscoveryIntervalMs,
Map<String, String> tableOptions) {
this(
tablePath,
flussConfig,
changelogOutputType,
partitionKeyIndexes,
streaming,
startupOptions,
scanPartitionDiscoveryIntervalMs,
FlinkConnectorOptions.SCAN_SPLIT_ASSIGNMENT_BATCH_SIZE.defaultValue(),
tableOptions);
}

public ChangelogFlinkTableSource(
TablePath tablePath,
Configuration flussConfig,
org.apache.flink.table.types.logical.RowType changelogOutputType,
int[] partitionKeyIndexes,
boolean streaming,
FlinkConnectorOptionsUtils.StartupOptions startupOptions,
long scanPartitionDiscoveryIntervalMs,
int splitPerAssignmentBatchSize,
Map<String, String> tableOptions) {
this.tablePath = tablePath;
this.flussConfig = flussConfig;
// The changelogOutputType already includes metadata columns from FlinkCatalog
Expand All @@ -89,6 +113,7 @@ public ChangelogFlinkTableSource(
this.streaming = streaming;
this.startupOptions = startupOptions;
this.scanPartitionDiscoveryIntervalMs = scanPartitionDiscoveryIntervalMs;
this.splitPerAssignmentBatchSize = splitPerAssignmentBatchSize;
this.tableOptions = tableOptions;

// Extract data columns by filtering out metadata columns by name
Expand Down Expand Up @@ -166,6 +191,7 @@ public ScanRuntimeProvider getScanRuntimeProvider(ScanContext scanContext) {
null,
offsetsInitializer,
scanPartitionDiscoveryIntervalMs,
splitPerAssignmentBatchSize,
new ChangelogDeserializationSchema(),
streaming,
partitionFilters,
Expand All @@ -185,6 +211,7 @@ public DynamicTableSource copy() {
streaming,
startupOptions,
scanPartitionDiscoveryIntervalMs,
splitPerAssignmentBatchSize,
tableOptions);
copy.producedDataType = producedDataType;
copy.projectedFields = projectedFields;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@

import org.apache.fluss.client.initializer.OffsetsInitializer;
import org.apache.fluss.config.Configuration;
import org.apache.fluss.flink.FlinkConnectorOptions;
import org.apache.fluss.flink.source.deserializer.DeserializerInitContextImpl;
import org.apache.fluss.flink.source.deserializer.FlussDeserializationSchema;
import org.apache.fluss.flink.source.emitter.FlinkRecordEmitter;
Expand Down Expand Up @@ -67,6 +68,7 @@ public class FlinkSource<OUT>
@Nullable private final int[] projectedFields;
protected final OffsetsInitializer offsetsInitializer;
protected final long scanPartitionDiscoveryIntervalMs;
protected final int splitPerAssignmentBatchSize;
private final boolean streaming;
private final FlussDeserializationSchema<OUT> deserializationSchema;
@Nullable private final Predicate partitionFilters;
Expand Down Expand Up @@ -99,6 +101,7 @@ public FlinkSource(
logRecordBatchFilter,
offsetsInitializer,
scanPartitionDiscoveryIntervalMs,
FlinkConnectorOptions.SCAN_SPLIT_ASSIGNMENT_BATCH_SIZE.defaultValue(),
deserializationSchema,
streaming,
partitionFilters,
Expand All @@ -121,6 +124,73 @@ public FlinkSource(
@Nullable Predicate partitionFilters,
@Nullable LakeSource<LakeSplit> lakeSource,
LeaseContext leaseContext) {
this(
flussConf,
tablePath,
hasPrimaryKey,
isPartitioned,
sourceOutputType,
projectedFields,
logRecordBatchFilter,
offsetsInitializer,
scanPartitionDiscoveryIntervalMs,
FlinkConnectorOptions.SCAN_SPLIT_ASSIGNMENT_BATCH_SIZE.defaultValue(),
deserializationSchema,
streaming,
partitionFilters,
lakeSource,
leaseContext);
}

public FlinkSource(
Configuration flussConf,
TablePath tablePath,
boolean hasPrimaryKey,
boolean isPartitioned,
RowType sourceOutputType,
@Nullable int[] projectedFields,
@Nullable Predicate logRecordBatchFilter,
OffsetsInitializer offsetsInitializer,
long scanPartitionDiscoveryIntervalMs,
int splitPerAssignmentBatchSize,
FlussDeserializationSchema<OUT> deserializationSchema,
boolean streaming,
@Nullable Predicate partitionFilters,
LeaseContext leaseContext) {
this(
flussConf,
tablePath,
hasPrimaryKey,
isPartitioned,
sourceOutputType,
projectedFields,
logRecordBatchFilter,
offsetsInitializer,
scanPartitionDiscoveryIntervalMs,
splitPerAssignmentBatchSize,
deserializationSchema,
streaming,
partitionFilters,
null,
leaseContext);
}

public FlinkSource(
Configuration flussConf,
TablePath tablePath,
boolean hasPrimaryKey,
boolean isPartitioned,
RowType sourceOutputType,
@Nullable int[] projectedFields,
@Nullable Predicate logRecordBatchFilter,
OffsetsInitializer offsetsInitializer,
long scanPartitionDiscoveryIntervalMs,
int splitPerAssignmentBatchSize,
FlussDeserializationSchema<OUT> deserializationSchema,
boolean streaming,
@Nullable Predicate partitionFilters,
@Nullable LakeSource<LakeSplit> lakeSource,
LeaseContext leaseContext) {
this.flussConf = flussConf;
this.tablePath = tablePath;
this.hasPrimaryKey = hasPrimaryKey;
Expand All @@ -130,6 +200,7 @@ public FlinkSource(
this.logRecordBatchFilter = logRecordBatchFilter;
this.offsetsInitializer = offsetsInitializer;
this.scanPartitionDiscoveryIntervalMs = scanPartitionDiscoveryIntervalMs;
this.splitPerAssignmentBatchSize = splitPerAssignmentBatchSize;
this.deserializationSchema = deserializationSchema;
this.streaming = streaming;
this.partitionFilters = partitionFilters;
Expand All @@ -153,6 +224,7 @@ public SplitEnumerator<SourceSplitBase, SourceEnumeratorState> createEnumerator(
splitEnumeratorContext,
offsetsInitializer,
scanPartitionDiscoveryIntervalMs,
splitPerAssignmentBatchSize,
streaming,
partitionFilters,
lakeSource,
Expand All @@ -175,6 +247,7 @@ public SplitEnumerator<SourceSplitBase, SourceEnumeratorState> restoreEnumerator
sourceEnumeratorState.getRemainingHybridLakeFlussSplits(),
offsetsInitializer,
scanPartitionDiscoveryIntervalMs,
splitPerAssignmentBatchSize,
streaming,
partitionFilters,
lakeSource,
Expand Down
Loading