diff --git a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/FlinkConnectorOptions.java b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/FlinkConnectorOptions.java index c7a3b44c28..23d8f0b2e9 100644 --- a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/FlinkConnectorOptions.java +++ b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/FlinkConnectorOptions.java @@ -150,6 +150,15 @@ public class FlinkConnectorOptions { + "as a small value would cause frequent requests and increase server load. In the future, " + "once list partitions is optimized, the default value of this parameter can be reduced."); + public static final ConfigOption SCAN_SPLIT_ASSIGNMENT_BATCH_SIZE = + ConfigOptions.key("scan.split.assignment.batch-size") + .intType() + .defaultValue(Integer.MAX_VALUE) + .withDescription( + "The maximum number of Fluss source splits assigned to a reader in " + + "one assignment request. The value must be positive. By default, " + + "all pending splits for a reader are assigned in one request."); + public static final ConfigOption SINK_IGNORE_DELETE = ConfigOptions.key("sink.ignore-delete") .booleanType() diff --git a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/catalog/FlinkTableFactory.java b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/catalog/FlinkTableFactory.java index 408a703058..1b082855e6 100644 --- a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/catalog/FlinkTableFactory.java +++ b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/catalog/FlinkTableFactory.java @@ -146,6 +146,8 @@ public DynamicTableSource createDynamicTableSource(Context context) { tableOptions .get(FlinkConnectorOptions.SCAN_PARTITION_DISCOVERY_INTERVAL) .toMillis(); + int splitAssignmentBatchSize = + tableOptions.get(FlinkConnectorOptions.SCAN_SPLIT_ASSIGNMENT_BATCH_SIZE); LeaseContext leaseContext = LeaseContext.fromConf(tableOptions); return new FlinkTableSource( @@ -163,6 +165,7 @@ public DynamicTableSource createDynamicTableSource(Context context) { tableOptions.get(FlinkConnectorOptions.LOOKUP_INSERT_IF_NOT_EXISTS), cache, partitionDiscoveryIntervalMs, + splitAssignmentBatchSize, tableOptions.get(toFlinkOption(ConfigOptions.TABLE_DATALAKE_ENABLED)), tableOptions.get(toFlinkOption(ConfigOptions.TABLE_MERGE_ENGINE)), context.getCatalogTable().getOptions(), @@ -234,6 +237,7 @@ public Set> optionalOptions() { FlinkConnectorOptions.SCAN_STARTUP_MODE, FlinkConnectorOptions.SCAN_STARTUP_TIMESTAMP, FlinkConnectorOptions.SCAN_PARTITION_DISCOVERY_INTERVAL, + FlinkConnectorOptions.SCAN_SPLIT_ASSIGNMENT_BATCH_SIZE, FlinkConnectorOptions.SCAN_KV_SNAPSHOT_LEASE_ID, FlinkConnectorOptions.SCAN_KV_SNAPSHOT_LEASE_DURATION, FlinkConnectorOptions.LOOKUP_ASYNC, @@ -365,6 +369,8 @@ private DynamicTableSource createChangelogTableSource( tableOptions .get(FlinkConnectorOptions.SCAN_PARTITION_DISCOVERY_INTERVAL) .toMillis(); + int splitAssignmentBatchSize = + tableOptions.get(FlinkConnectorOptions.SCAN_SPLIT_ASSIGNMENT_BATCH_SIZE); return new ChangelogFlinkTableSource( TablePath.of(tableIdentifier.getDatabaseName(), baseTableName), @@ -374,6 +380,7 @@ private DynamicTableSource createChangelogTableSource( isStreamingMode, startupOptions, partitionDiscoveryIntervalMs, + splitAssignmentBatchSize, catalogTableOptions); } @@ -412,6 +419,8 @@ private DynamicTableSource createBinlogTableSource( tableOptions .get(FlinkConnectorOptions.SCAN_PARTITION_DISCOVERY_INTERVAL) .toMillis(); + int splitAssignmentBatchSize = + tableOptions.get(FlinkConnectorOptions.SCAN_SPLIT_ASSIGNMENT_BATCH_SIZE); return new BinlogFlinkTableSource( TablePath.of(tableIdentifier.getDatabaseName(), baseTableName), @@ -421,6 +430,7 @@ private DynamicTableSource createBinlogTableSource( isStreamingMode, startupOptions, partitionDiscoveryIntervalMs, + splitAssignmentBatchSize, catalogTableOptions); } } diff --git a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/source/BinlogFlinkTableSource.java b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/source/BinlogFlinkTableSource.java index 901995a657..93261e8430 100644 --- a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/source/BinlogFlinkTableSource.java +++ b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/source/BinlogFlinkTableSource.java @@ -19,6 +19,7 @@ import org.apache.fluss.client.initializer.OffsetsInitializer; import org.apache.fluss.config.Configuration; +import org.apache.fluss.flink.FlinkConnectorOptions; import org.apache.fluss.flink.source.deserializer.BinlogDeserializationSchema; import org.apache.fluss.flink.source.reader.LeaseContext; import org.apache.fluss.flink.utils.FlinkConnectorOptionsUtils; @@ -51,6 +52,7 @@ public class BinlogFlinkTableSource implements ScanTableSource { private final boolean streaming; private final FlinkConnectorOptionsUtils.StartupOptions startupOptions; private final long scanPartitionDiscoveryIntervalMs; + private final int splitPerAssignmentBatchSize; private final Map tableOptions; // Projection pushdown @@ -68,6 +70,28 @@ public BinlogFlinkTableSource( FlinkConnectorOptionsUtils.StartupOptions startupOptions, long scanPartitionDiscoveryIntervalMs, Map tableOptions) { + this( + tablePath, + flussConfig, + binlogOutputType, + isPartitioned, + streaming, + startupOptions, + scanPartitionDiscoveryIntervalMs, + FlinkConnectorOptions.SCAN_SPLIT_ASSIGNMENT_BATCH_SIZE.defaultValue(), + tableOptions); + } + + public BinlogFlinkTableSource( + TablePath tablePath, + Configuration flussConfig, + org.apache.flink.table.types.logical.RowType binlogOutputType, + boolean isPartitioned, + boolean streaming, + FlinkConnectorOptionsUtils.StartupOptions startupOptions, + long scanPartitionDiscoveryIntervalMs, + int splitPerAssignmentBatchSize, + Map tableOptions) { this.tablePath = tablePath; this.flussConfig = flussConfig; this.binlogOutputType = binlogOutputType; @@ -75,6 +99,7 @@ public BinlogFlinkTableSource( this.streaming = streaming; this.startupOptions = startupOptions; this.scanPartitionDiscoveryIntervalMs = scanPartitionDiscoveryIntervalMs; + this.splitPerAssignmentBatchSize = splitPerAssignmentBatchSize; this.tableOptions = tableOptions; // Extract data columns from the 'before' nested ROW type (index 3) @@ -129,6 +154,7 @@ public ScanRuntimeProvider getScanRuntimeProvider(ScanContext scanContext) { null, offsetsInitializer, scanPartitionDiscoveryIntervalMs, + splitPerAssignmentBatchSize, new BinlogDeserializationSchema(), streaming, partitionFilters, @@ -148,6 +174,7 @@ public DynamicTableSource copy() { streaming, startupOptions, scanPartitionDiscoveryIntervalMs, + splitPerAssignmentBatchSize, tableOptions); copy.producedDataType = producedDataType; copy.projectedFields = projectedFields; diff --git a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/source/ChangelogFlinkTableSource.java b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/source/ChangelogFlinkTableSource.java index f58fac3df9..4304b271b2 100644 --- a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/source/ChangelogFlinkTableSource.java +++ b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/source/ChangelogFlinkTableSource.java @@ -19,6 +19,7 @@ import org.apache.fluss.client.initializer.OffsetsInitializer; import org.apache.fluss.config.Configuration; +import org.apache.fluss.flink.FlinkConnectorOptions; import org.apache.fluss.flink.source.deserializer.ChangelogDeserializationSchema; import org.apache.fluss.flink.source.reader.LeaseContext; import org.apache.fluss.flink.utils.FlinkConnectorOptionsUtils; @@ -57,6 +58,7 @@ public class ChangelogFlinkTableSource implements ScanTableSource { private final boolean streaming; private final FlinkConnectorOptionsUtils.StartupOptions startupOptions; private final long scanPartitionDiscoveryIntervalMs; + private final int splitPerAssignmentBatchSize; private final Map tableOptions; // Projection pushdown @@ -81,6 +83,28 @@ public ChangelogFlinkTableSource( FlinkConnectorOptionsUtils.StartupOptions startupOptions, long scanPartitionDiscoveryIntervalMs, Map tableOptions) { + this( + tablePath, + flussConfig, + changelogOutputType, + partitionKeyIndexes, + streaming, + startupOptions, + scanPartitionDiscoveryIntervalMs, + FlinkConnectorOptions.SCAN_SPLIT_ASSIGNMENT_BATCH_SIZE.defaultValue(), + tableOptions); + } + + public ChangelogFlinkTableSource( + TablePath tablePath, + Configuration flussConfig, + org.apache.flink.table.types.logical.RowType changelogOutputType, + int[] partitionKeyIndexes, + boolean streaming, + FlinkConnectorOptionsUtils.StartupOptions startupOptions, + long scanPartitionDiscoveryIntervalMs, + int splitPerAssignmentBatchSize, + Map tableOptions) { this.tablePath = tablePath; this.flussConfig = flussConfig; // The changelogOutputType already includes metadata columns from FlinkCatalog @@ -89,6 +113,7 @@ public ChangelogFlinkTableSource( this.streaming = streaming; this.startupOptions = startupOptions; this.scanPartitionDiscoveryIntervalMs = scanPartitionDiscoveryIntervalMs; + this.splitPerAssignmentBatchSize = splitPerAssignmentBatchSize; this.tableOptions = tableOptions; // Extract data columns by filtering out metadata columns by name @@ -166,6 +191,7 @@ public ScanRuntimeProvider getScanRuntimeProvider(ScanContext scanContext) { null, offsetsInitializer, scanPartitionDiscoveryIntervalMs, + splitPerAssignmentBatchSize, new ChangelogDeserializationSchema(), streaming, partitionFilters, @@ -185,6 +211,7 @@ public DynamicTableSource copy() { streaming, startupOptions, scanPartitionDiscoveryIntervalMs, + splitPerAssignmentBatchSize, tableOptions); copy.producedDataType = producedDataType; copy.projectedFields = projectedFields; diff --git a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/source/FlinkSource.java b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/source/FlinkSource.java index efa426e5f9..8535a881a8 100644 --- a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/source/FlinkSource.java +++ b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/source/FlinkSource.java @@ -19,6 +19,7 @@ import org.apache.fluss.client.initializer.OffsetsInitializer; import org.apache.fluss.config.Configuration; +import org.apache.fluss.flink.FlinkConnectorOptions; import org.apache.fluss.flink.source.deserializer.DeserializerInitContextImpl; import org.apache.fluss.flink.source.deserializer.FlussDeserializationSchema; import org.apache.fluss.flink.source.emitter.FlinkRecordEmitter; @@ -67,6 +68,7 @@ public class FlinkSource @Nullable private final int[] projectedFields; protected final OffsetsInitializer offsetsInitializer; protected final long scanPartitionDiscoveryIntervalMs; + protected final int splitPerAssignmentBatchSize; private final boolean streaming; private final FlussDeserializationSchema deserializationSchema; @Nullable private final Predicate partitionFilters; @@ -99,6 +101,7 @@ public FlinkSource( logRecordBatchFilter, offsetsInitializer, scanPartitionDiscoveryIntervalMs, + FlinkConnectorOptions.SCAN_SPLIT_ASSIGNMENT_BATCH_SIZE.defaultValue(), deserializationSchema, streaming, partitionFilters, @@ -121,6 +124,73 @@ public FlinkSource( @Nullable Predicate partitionFilters, @Nullable LakeSource lakeSource, LeaseContext leaseContext) { + this( + flussConf, + tablePath, + hasPrimaryKey, + isPartitioned, + sourceOutputType, + projectedFields, + logRecordBatchFilter, + offsetsInitializer, + scanPartitionDiscoveryIntervalMs, + FlinkConnectorOptions.SCAN_SPLIT_ASSIGNMENT_BATCH_SIZE.defaultValue(), + deserializationSchema, + streaming, + partitionFilters, + lakeSource, + leaseContext); + } + + public FlinkSource( + Configuration flussConf, + TablePath tablePath, + boolean hasPrimaryKey, + boolean isPartitioned, + RowType sourceOutputType, + @Nullable int[] projectedFields, + @Nullable Predicate logRecordBatchFilter, + OffsetsInitializer offsetsInitializer, + long scanPartitionDiscoveryIntervalMs, + int splitPerAssignmentBatchSize, + FlussDeserializationSchema deserializationSchema, + boolean streaming, + @Nullable Predicate partitionFilters, + LeaseContext leaseContext) { + this( + flussConf, + tablePath, + hasPrimaryKey, + isPartitioned, + sourceOutputType, + projectedFields, + logRecordBatchFilter, + offsetsInitializer, + scanPartitionDiscoveryIntervalMs, + splitPerAssignmentBatchSize, + deserializationSchema, + streaming, + partitionFilters, + null, + leaseContext); + } + + public FlinkSource( + Configuration flussConf, + TablePath tablePath, + boolean hasPrimaryKey, + boolean isPartitioned, + RowType sourceOutputType, + @Nullable int[] projectedFields, + @Nullable Predicate logRecordBatchFilter, + OffsetsInitializer offsetsInitializer, + long scanPartitionDiscoveryIntervalMs, + int splitPerAssignmentBatchSize, + FlussDeserializationSchema deserializationSchema, + boolean streaming, + @Nullable Predicate partitionFilters, + @Nullable LakeSource lakeSource, + LeaseContext leaseContext) { this.flussConf = flussConf; this.tablePath = tablePath; this.hasPrimaryKey = hasPrimaryKey; @@ -130,6 +200,7 @@ public FlinkSource( this.logRecordBatchFilter = logRecordBatchFilter; this.offsetsInitializer = offsetsInitializer; this.scanPartitionDiscoveryIntervalMs = scanPartitionDiscoveryIntervalMs; + this.splitPerAssignmentBatchSize = splitPerAssignmentBatchSize; this.deserializationSchema = deserializationSchema; this.streaming = streaming; this.partitionFilters = partitionFilters; @@ -153,6 +224,7 @@ public SplitEnumerator createEnumerator( splitEnumeratorContext, offsetsInitializer, scanPartitionDiscoveryIntervalMs, + splitPerAssignmentBatchSize, streaming, partitionFilters, lakeSource, @@ -175,6 +247,7 @@ public SplitEnumerator restoreEnumerator sourceEnumeratorState.getRemainingHybridLakeFlussSplits(), offsetsInitializer, scanPartitionDiscoveryIntervalMs, + splitPerAssignmentBatchSize, streaming, partitionFilters, lakeSource, diff --git a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/source/FlinkTableSource.java b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/source/FlinkTableSource.java index 0da1ba7668..859ef7cfae 100644 --- a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/source/FlinkTableSource.java +++ b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/source/FlinkTableSource.java @@ -139,6 +139,7 @@ public class FlinkTableSource @Nullable private final LookupCache cache; private final long scanPartitionDiscoveryIntervalMs; + private final int splitPerAssignmentBatchSize; private final boolean isDataLakeEnabled; private final LeaseContext leaseContext; @@ -194,6 +195,46 @@ public FlinkTableSource( @Nullable MergeEngineType mergeEngineType, Map tableOptions, LeaseContext leaseContext) { + this( + tablePath, + flussConfig, + tableConfig, + tableOutputType, + primaryKeyIndexes, + bucketKeyIndexes, + partitionKeyIndexes, + streaming, + startupOptions, + lookupAsync, + insertIfNotExists, + cache, + scanPartitionDiscoveryIntervalMs, + FlinkConnectorOptions.SCAN_SPLIT_ASSIGNMENT_BATCH_SIZE.defaultValue(), + isDataLakeEnabled, + mergeEngineType, + tableOptions, + leaseContext); + } + + public FlinkTableSource( + TablePath tablePath, + Configuration flussConfig, + TableConfig tableConfig, + org.apache.flink.table.types.logical.RowType tableOutputType, + int[] primaryKeyIndexes, + int[] bucketKeyIndexes, + int[] partitionKeyIndexes, + boolean streaming, + FlinkConnectorOptionsUtils.StartupOptions startupOptions, + boolean lookupAsync, + boolean insertIfNotExists, + @Nullable LookupCache cache, + long scanPartitionDiscoveryIntervalMs, + int splitPerAssignmentBatchSize, + boolean isDataLakeEnabled, + @Nullable MergeEngineType mergeEngineType, + Map tableOptions, + LeaseContext leaseContext) { this.tablePath = tablePath; this.flussConfig = flussConfig; this.tableOutputType = tableOutputType; @@ -209,6 +250,7 @@ public FlinkTableSource( this.cache = cache; this.scanPartitionDiscoveryIntervalMs = scanPartitionDiscoveryIntervalMs; + this.splitPerAssignmentBatchSize = splitPerAssignmentBatchSize; this.isDataLakeEnabled = isDataLakeEnabled; this.leaseContext = leaseContext; this.mergeEngineType = mergeEngineType; @@ -370,6 +412,7 @@ public boolean isBounded() { logRecordBatchFilter, offsetsInitializer, scanPartitionDiscoveryIntervalMs, + splitPerAssignmentBatchSize, new RowDataDeserializationSchema(), streaming, partitionFilters, @@ -478,6 +521,7 @@ public DynamicTableSource copy() { insertIfNotExists, cache, scanPartitionDiscoveryIntervalMs, + splitPerAssignmentBatchSize, isDataLakeEnabled, mergeEngineType, tableOptions, diff --git a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/source/FlussSource.java b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/source/FlussSource.java index 53cabcca79..795e850821 100644 --- a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/source/FlussSource.java +++ b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/source/FlussSource.java @@ -20,6 +20,7 @@ import org.apache.fluss.annotation.VisibleForTesting; import org.apache.fluss.client.initializer.OffsetsInitializer; import org.apache.fluss.config.Configuration; +import org.apache.fluss.flink.FlinkConnectorOptions; import org.apache.fluss.flink.source.deserializer.FlussDeserializationSchema; import org.apache.fluss.flink.source.reader.LeaseContext; import org.apache.fluss.metadata.TablePath; @@ -71,6 +72,34 @@ public class FlussSource extends FlinkSource { long scanPartitionDiscoveryIntervalMs, FlussDeserializationSchema deserializationSchema, boolean streaming) { + this( + flussConf, + tablePath, + hasPrimaryKey, + isPartitioned, + sourceOutputType, + projectedFields, + logRecordBatchFilter, + offsetsInitializer, + scanPartitionDiscoveryIntervalMs, + FlinkConnectorOptions.SCAN_SPLIT_ASSIGNMENT_BATCH_SIZE.defaultValue(), + deserializationSchema, + streaming); + } + + FlussSource( + Configuration flussConf, + TablePath tablePath, + boolean hasPrimaryKey, + boolean isPartitioned, + RowType sourceOutputType, + @Nullable int[] projectedFields, + @Nullable Predicate logRecordBatchFilter, + OffsetsInitializer offsetsInitializer, + long scanPartitionDiscoveryIntervalMs, + int splitPerAssignmentBatchSize, + FlussDeserializationSchema deserializationSchema, + boolean streaming) { // TODO: Support partition pushDown in datastream super( flussConf, @@ -82,6 +111,7 @@ public class FlussSource extends FlinkSource { logRecordBatchFilter, offsetsInitializer, scanPartitionDiscoveryIntervalMs, + splitPerAssignmentBatchSize, deserializationSchema, streaming, null, diff --git a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/source/FlussSourceBuilder.java b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/source/FlussSourceBuilder.java index 87efc01cd2..1e65491342 100644 --- a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/source/FlussSourceBuilder.java +++ b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/source/FlussSourceBuilder.java @@ -71,6 +71,7 @@ public class FlussSourceBuilder { private String[] projectedFieldNames; private Predicate logRecordBatchFilter; private Long scanPartitionDiscoveryIntervalMs; + private Integer splitPerAssignmentBatchSize; private OffsetsInitializer offsetsInitializer; private FlussDeserializationSchema deserializationSchema; @@ -133,6 +134,20 @@ public FlussSourceBuilder setScanPartitionDiscoveryIntervalMs( return this; } + /** + * Sets the maximum number of splits assigned to a reader in one assignment request. + * + *

If not specified, the default value from {@link + * FlinkConnectorOptions#SCAN_SPLIT_ASSIGNMENT_BATCH_SIZE} is used. + * + * @param splitPerAssignmentBatchSize maximum splits per assignment request + * @return this builder + */ + public FlussSourceBuilder setSplitPerAssignmentBatchSize(int splitPerAssignmentBatchSize) { + this.splitPerAssignmentBatchSize = splitPerAssignmentBatchSize; + return this; + } + /** * Sets the starting offsets strategy for the Fluss source. * @@ -241,6 +256,10 @@ public FlussSource build() { .defaultValue() .toMillis(); } + if (splitPerAssignmentBatchSize == null) { + splitPerAssignmentBatchSize = + FlinkConnectorOptions.SCAN_SPLIT_ASSIGNMENT_BATCH_SIZE.defaultValue(); + } if (this.flussConf == null) { this.flussConf = new Configuration(); @@ -317,6 +336,7 @@ public FlussSource build() { logRecordBatchFilter, offsetsInitializer, scanPartitionDiscoveryIntervalMs, + splitPerAssignmentBatchSize, deserializationSchema, true); } diff --git a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/source/enumerator/FlinkSourceEnumerator.java b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/source/enumerator/FlinkSourceEnumerator.java index 1a6521f57e..6cd5da3b39 100644 --- a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/source/enumerator/FlinkSourceEnumerator.java +++ b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/source/enumerator/FlinkSourceEnumerator.java @@ -29,6 +29,7 @@ import org.apache.fluss.config.ConfigOptions; import org.apache.fluss.config.Configuration; import org.apache.fluss.exception.UnsupportedVersionException; +import org.apache.fluss.flink.FlinkConnectorOptions; import org.apache.fluss.flink.lake.LakeSplitGenerator; import org.apache.fluss.flink.lake.split.LakeSnapshotAndFlussLogSplit; import org.apache.fluss.flink.lake.split.LakeSnapshotSplit; @@ -52,6 +53,7 @@ import org.apache.fluss.row.BinaryString; import org.apache.fluss.row.GenericRow; import org.apache.fluss.row.InternalRow; +import org.apache.fluss.shaded.guava32.com.google.common.collect.Lists; import org.apache.fluss.utils.ExceptionUtils; import org.apache.flink.annotation.Internal; @@ -83,6 +85,7 @@ import java.util.TreeMap; import java.util.stream.Collectors; +import static org.apache.fluss.utils.Preconditions.checkArgument; import static org.apache.fluss.utils.Preconditions.checkNotNull; import static org.apache.fluss.utils.Preconditions.checkState; @@ -183,6 +186,37 @@ public class FlinkSourceEnumerator @Nullable private final LakeSource lakeSource; + private final int splitPerAssignmentBatchSize; + + public FlinkSourceEnumerator( + TablePath tablePath, + Configuration flussConf, + boolean hasPrimaryKey, + boolean isPartitioned, + SplitEnumeratorContext context, + OffsetsInitializer startingOffsetsInitializer, + long scanPartitionDiscoveryIntervalMs, + boolean streaming, + @Nullable Predicate partitionFilters, + @Nullable LakeSource lakeSource, + LeaseContext leaseContext, + boolean checkpointTriggeredBefore) { + this( + tablePath, + flussConf, + hasPrimaryKey, + isPartitioned, + context, + startingOffsetsInitializer, + scanPartitionDiscoveryIntervalMs, + FlinkConnectorOptions.SCAN_SPLIT_ASSIGNMENT_BATCH_SIZE.defaultValue(), + streaming, + partitionFilters, + lakeSource, + leaseContext, + checkpointTriggeredBefore); + } + public FlinkSourceEnumerator( TablePath tablePath, Configuration flussConf, @@ -191,6 +225,7 @@ public FlinkSourceEnumerator( SplitEnumeratorContext context, OffsetsInitializer startingOffsetsInitializer, long scanPartitionDiscoveryIntervalMs, + int splitPerAssignmentBatchSize, boolean streaming, @Nullable Predicate partitionFilters, @Nullable LakeSource lakeSource, @@ -207,6 +242,7 @@ public FlinkSourceEnumerator( null, startingOffsetsInitializer, scanPartitionDiscoveryIntervalMs, + splitPerAssignmentBatchSize, streaming, partitionFilters, lakeSource, @@ -241,6 +277,43 @@ public FlinkSourceEnumerator( pendingHybridLakeFlussSplits, startingOffsetsInitializer, scanPartitionDiscoveryIntervalMs, + FlinkConnectorOptions.SCAN_SPLIT_ASSIGNMENT_BATCH_SIZE.defaultValue(), + streaming, + partitionFilters, + lakeSource, + leaseContext, + checkpointTriggeredBefore); + } + + public FlinkSourceEnumerator( + TablePath tablePath, + Configuration flussConf, + boolean hasPrimaryKey, + boolean isPartitioned, + SplitEnumeratorContext context, + Set assignedTableBuckets, + Map assignedPartitions, + List pendingHybridLakeFlussSplits, + OffsetsInitializer startingOffsetsInitializer, + long scanPartitionDiscoveryIntervalMs, + int splitPerAssignmentBatchSize, + boolean streaming, + @Nullable Predicate partitionFilters, + @Nullable LakeSource lakeSource, + LeaseContext leaseContext, + boolean checkpointTriggeredBefore) { + this( + tablePath, + flussConf, + hasPrimaryKey, + isPartitioned, + context, + assignedTableBuckets, + assignedPartitions, + pendingHybridLakeFlussSplits, + startingOffsetsInitializer, + scanPartitionDiscoveryIntervalMs, + splitPerAssignmentBatchSize, streaming, partitionFilters, lakeSource, @@ -266,6 +339,48 @@ public FlinkSourceEnumerator( WorkerExecutor workerExecutor, LeaseContext leaseContext, boolean checkpointTriggeredBefore) { + this( + tablePath, + flussConf, + hasPrimaryKey, + isPartitioned, + context, + assignedTableBuckets, + assignedPartitions, + pendingHybridLakeFlussSplits, + startingOffsetsInitializer, + scanPartitionDiscoveryIntervalMs, + FlinkConnectorOptions.SCAN_SPLIT_ASSIGNMENT_BATCH_SIZE.defaultValue(), + streaming, + partitionFilters, + lakeSource, + workerExecutor, + leaseContext, + checkpointTriggeredBefore); + } + + FlinkSourceEnumerator( + TablePath tablePath, + Configuration flussConf, + boolean hasPrimaryKey, + boolean isPartitioned, + SplitEnumeratorContext context, + Set assignedTableBuckets, + Map assignedPartitions, + List pendingHybridLakeFlussSplits, + OffsetsInitializer startingOffsetsInitializer, + long scanPartitionDiscoveryIntervalMs, + int splitPerAssignmentBatchSize, + boolean streaming, + @Nullable Predicate partitionFilters, + @Nullable LakeSource lakeSource, + WorkerExecutor workerExecutor, + LeaseContext leaseContext, + boolean checkpointTriggeredBefore) { + checkArgument( + splitPerAssignmentBatchSize > 0, + "Split assignment batch size must be positive, but was %s.", + splitPerAssignmentBatchSize); this.tablePath = checkNotNull(tablePath); this.flussConf = checkNotNull(flussConf); this.hasPrimaryKey = hasPrimaryKey; @@ -288,6 +403,7 @@ public FlinkSourceEnumerator( this.workerExecutor = workerExecutor; this.leaseContext = leaseContext; this.checkpointTriggeredBefore = checkpointTriggeredBefore; + this.splitPerAssignmentBatchSize = splitPerAssignmentBatchSize; } @Override @@ -927,7 +1043,19 @@ private void assignPendingSplits(Set pendingReaders) { // Assign pending splits to readers if (!incrementalAssignment.isEmpty()) { LOG.info("Assigning splits to readers {}", incrementalAssignment); - context.assignSplits(new SplitsAssignment<>(incrementalAssignment)); + for (Map.Entry> entry : + incrementalAssignment.entrySet()) { + int readerId = entry.getKey(); + List splits = entry.getValue(); + Lists.partition(splits, splitPerAssignmentBatchSize).stream() + .forEach( + batchSplits -> { + context.assignSplits( + new SplitsAssignment<>( + Collections.singletonMap( + readerId, batchSplits))); + }); + } } if (noMoreNewSplits) { diff --git a/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/source/enumerator/FlinkSourceEnumeratorTest.java b/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/source/enumerator/FlinkSourceEnumeratorTest.java index ca4722c9e6..6d239f965b 100644 --- a/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/source/enumerator/FlinkSourceEnumeratorTest.java +++ b/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/source/enumerator/FlinkSourceEnumeratorTest.java @@ -79,6 +79,7 @@ import static org.apache.fluss.record.TestData.DEFAULT_REMOTE_DATA_DIR; import static org.apache.fluss.testutils.DataTestUtils.row; import static org.assertj.core.api.Assertions.assertThat; +import static org.assertj.core.api.Assertions.assertThatThrownBy; /** Unit tests for {@link FlinkSourceEnumerator}. */ class FlinkSourceEnumeratorTest extends FlinkTestBase { @@ -137,12 +138,78 @@ void testPkTableNoSnapshotSplits() throws Throwable { expectedAssignment.put(i, Collections.singletonList(genLogSplit(tableId, i))); } - Map> actualAssignment = - getLastReadersAssignments(context); + Map> actualAssignment = getReadersAssignments(context); assertThat(actualAssignment).isEqualTo(expectedAssignment); } } + @Test + void testSplitAssignmentBatchSize() throws Throwable { + long tableId = createTable(DEFAULT_TABLE_PATH, DEFAULT_PK_TABLE_DESCRIPTOR); + try (MockSplitEnumeratorContext context = + new MockSplitEnumeratorContext<>(1)) { + FlinkSourceEnumerator enumerator = + new FlinkSourceEnumerator( + DEFAULT_TABLE_PATH, + flussConf, + true, + false, + context, + OffsetsInitializer.full(), + DEFAULT_SCAN_PARTITION_DISCOVERY_INTERVAL_MS, + 2, + streaming, + null, + null, + LeaseContext.DEFAULT, + false); + + enumerator.start(); + registerReader(context, enumerator, 0); + context.runNextOneTimeCallable(); + + List> assignments = + context.getSplitsAssignmentSequence(); + assertThat(assignments).hasSize(2); + assertThat(assignments.get(0).assignment().get(0)).hasSize(2); + assertThat(assignments.get(1).assignment().get(0)).hasSize(1); + + List assignedSplits = new ArrayList<>(); + assignments.forEach( + assignment -> assignedSplits.addAll(assignment.assignment().get(0))); + assertThat(assignedSplits) + .containsExactly( + genLogSplit(tableId, 0), + genLogSplit(tableId, 1), + genLogSplit(tableId, 2)); + } + } + + @Test + void testInvalidSplitAssignmentBatchSize() throws Exception { + try (MockSplitEnumeratorContext context = + new MockSplitEnumeratorContext<>(1)) { + assertThatThrownBy( + () -> + new FlinkSourceEnumerator( + DEFAULT_TABLE_PATH, + flussConf, + true, + false, + context, + OffsetsInitializer.full(), + DEFAULT_SCAN_PARTITION_DISCOVERY_INTERVAL_MS, + 0, + streaming, + null, + null, + LeaseContext.DEFAULT, + false)) + .isInstanceOf(IllegalArgumentException.class) + .hasMessageContaining("Split assignment batch size must be positive"); + } + } + @Test void testPkTableWithSnapshotSplits() throws Throwable { long tableId = createTable(DEFAULT_TABLE_PATH, DEFAULT_PK_TABLE_DESCRIPTOR); @@ -177,8 +244,7 @@ void testPkTableWithSnapshotSplits() throws Throwable { // make enumerate to get splits and assign context.runNextOneTimeCallable(); - Map> actualAssignment = - getLastReadersAssignments(context); + Map> actualAssignment = getReadersAssignments(context); Map> expectedAssignment = new HashMap<>(); @@ -264,8 +330,7 @@ void testNonPkTable() throws Throwable { new LogSplit(new TableBucket(tableId, i), null, -2L))); } - Map> actualAssignment = - getLastReadersAssignments(context); + Map> actualAssignment = getReadersAssignments(context); assertThat(actualAssignment).isEqualTo(expectedAssignment); } } @@ -477,10 +542,11 @@ void testDiscoverPartitionsPeriodically(boolean isPrimaryKeyTable) throws Throwa createPartitions(zooKeeperClient, DEFAULT_TABLE_PATH, newPartitions); /// invoke partition discovery callable again and there should assignments. + int assignmentStart = context.getSplitsAssignmentSequence().size(); runPeriodicPartitionDiscovery(workExecutor); expectedAssignment = expectAssignments(enumerator, tableId, newPartitionNameIds); - actualAssignments = getLastReadersAssignments(context); + actualAssignments = getReadersAssignments(context, assignmentStart); checkAssignmentIgnoreOrder(actualAssignments, expectedAssignment); // drop + create partitions; @@ -493,6 +559,7 @@ void testDiscoverPartitionsPeriodically(boolean isPrimaryKeyTable) throws Throwa createPartitions(zooKeeperClient, DEFAULT_TABLE_PATH, newPartitions); // invoke partition discovery callable again + assignmentStart = context.getSplitsAssignmentSequence().size(); runPeriodicPartitionDiscovery(workExecutor); // there should be partition removed events @@ -513,7 +580,7 @@ void testDiscoverPartitionsPeriodically(boolean isPrimaryKeyTable) throws Throwa // check new assignments. expectedAssignment = expectAssignments(enumerator, tableId, newPartitionNameIds); - actualAssignments = getLastReadersAssignments(context); + actualAssignments = getReadersAssignments(context, assignmentStart); checkAssignmentIgnoreOrder(actualAssignments, expectedAssignment); Map assignedPartitions = @@ -916,11 +983,21 @@ private LogSplit genLogSplit(long tableId, int bucketId) { private Map> getReadersAssignments( MockSplitEnumeratorContext context) { + return getReadersAssignments(context, 0); + } + + private Map> getReadersAssignments( + MockSplitEnumeratorContext context, int startIndex) { List> splitsAssignments = context.getSplitsAssignmentSequence(); Map> assignment = new HashMap<>(); - for (SplitsAssignment splitAssignment : splitsAssignments) { - assignment.putAll(splitAssignment.assignment()); + for (int i = startIndex; i < splitsAssignments.size(); i++) { + for (Map.Entry> splitAssignment : + splitsAssignments.get(i).assignment().entrySet()) { + assignment + .computeIfAbsent(splitAssignment.getKey(), key -> new ArrayList<>()) + .addAll(splitAssignment.getValue()); + } } return assignment; }