Skip to content

Flink: Support writing shredded variant in Flink#15596

Open
Guosmilesmile wants to merge 7 commits intoapache:mainfrom
Guosmilesmile:flink_shredded_varisnt_fileformat
Open

Flink: Support writing shredded variant in Flink#15596
Guosmilesmile wants to merge 7 commits intoapache:mainfrom
Guosmilesmile:flink_shredded_varisnt_fileformat

Conversation

@Guosmilesmile
Copy link
Copy Markdown
Contributor

@Guosmilesmile Guosmilesmile commented Mar 12, 2026

This PR is mainly to add support in Flink for writing shredding-variant data to Iceberg tables, based on #14297.

This PR is based on #14297 and will be adjusted in sync with it.

@Guosmilesmile Guosmilesmile marked this pull request as draft March 12, 2026 08:09
@Guosmilesmile Guosmilesmile force-pushed the flink_shredded_varisnt_fileformat branch from 15ff223 to 5b448b9 Compare March 12, 2026 08:22
@github-actions github-actions Bot removed the ORC label Mar 12, 2026
@Guosmilesmile Guosmilesmile force-pushed the flink_shredded_varisnt_fileformat branch 2 times, most recently from 8f6198a to b03caf6 Compare March 12, 2026 08:59
@Guosmilesmile Guosmilesmile changed the title Core,Flink: Support writing shredded variant in Flink Flink: Support writing shredded variant in Flink Mar 12, 2026
@Guosmilesmile Guosmilesmile force-pushed the flink_shredded_varisnt_fileformat branch 3 times, most recently from 88045e1 to cbfa8c2 Compare March 13, 2026 07:17
@Guosmilesmile Guosmilesmile force-pushed the flink_shredded_varisnt_fileformat branch from fae2814 to f3a2fba Compare March 24, 2026 05:50
@github-actions github-actions Bot added the core label Mar 24, 2026
@Guosmilesmile Guosmilesmile force-pushed the flink_shredded_varisnt_fileformat branch 4 times, most recently from b07b00b to c95d78f Compare March 24, 2026 08:36
@Guosmilesmile Guosmilesmile force-pushed the flink_shredded_varisnt_fileformat branch 3 times, most recently from fc8c45a to b116f25 Compare April 1, 2026 01:50
@Guosmilesmile Guosmilesmile force-pushed the flink_shredded_varisnt_fileformat branch from b116f25 to 650cb7a Compare April 10, 2026 09:42
@Guosmilesmile Guosmilesmile force-pushed the flink_shredded_varisnt_fileformat branch from 770d9c4 to 7d48389 Compare May 7, 2026 06:55
@Guosmilesmile Guosmilesmile marked this pull request as ready for review May 7, 2026 08:34
@Guosmilesmile
Copy link
Copy Markdown
Contributor Author

Hi @aihuaxu @nssalian @pvary @mxm . Since the Spark part has been merged, the Flink part has been adjusted accordingly. If you have time, please help review it.

Thanks!
GuoYu.

Comment on lines +270 to +271
.tableProperty(TableProperties.PARQUET_SHRED_VARIANTS)
.defaultValue(TableProperties.PARQUET_SHRED_VARIANTS_DEFAULT)
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How will we handle when ORC supports shredding variants?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good catch . I rename shred-variants to parquet-shred-variants to clarify this feature is only support parquet . If orc support this, then we can add another config.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's do parquet for now since we followed that pattern for the Spark implementation.

FlinkParquetReaders.buildReader(icebergSchema, fileSchema, idToConstant)));
FlinkParquetReaders.buildReader(icebergSchema, fileSchema, idToConstant),
new FlinkVariantShreddingAnalyzer(),
(row, rowType) -> new RowDataSerializer(rowType).copy(row)));
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Isn't this costly to recreate every time when we copy a row?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It will increase the cost, but without copying, there would be issues with data corruption when buffer data. We ran into this during early development, and the unit tests can reproduce it.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we reuse the RowDataSerializer?

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

With the current BiFunction, (row, rowType) -> new RowDataSerializer(rowType).copy(row) creates a new RowDataSerializer for every buffered row (default buffer = 100). This construction is not free, as it involves walking rowType.getChildren(), building a TypeSerializer[] via InternalSerializers.create, a BinaryRowDataSerializer, and a RowData.FieldGetter[]. Since the engine schema is fixed for the entire file, a factory allows us to build it once and reuse it. Using the Factory Pattern, we can avoid recreating the serializer for a given table schema with every incoming record.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, we can use Function<S, UnaryOperator<D>> instead of BiFunction<D, S, D> to implement this.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1. We should be able to reuse RowDataSerializer so we don't need to create new instance for every row.

@Guosmilesmile Guosmilesmile force-pushed the flink_shredded_varisnt_fileformat branch from 63ae5ae to 0f2ae10 Compare May 9, 2026 05:33
Copy link
Copy Markdown
Contributor

@talatuyarer talatuyarer left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for putting this together. the structure mirrors the Spark side cleanly, and the test coverage of inference behaviors (tie-breaking, decimal fallback, cross-file types) is genuinely valuable. I believe we should address reconstructing RowDataSerializer on every buffered row.

Comment thread parquet/src/main/java/org/apache/iceberg/parquet/ParquetFormatModel.java Outdated
Comment thread parquet/src/main/java/org/apache/iceberg/parquet/ParquetFormatModel.java Outdated
FlinkParquetReaders.buildReader(icebergSchema, fileSchema, idToConstant)));
FlinkParquetReaders.buildReader(icebergSchema, fileSchema, idToConstant),
new FlinkVariantShreddingAnalyzer(),
(row, rowType) -> new RowDataSerializer(rowType).copy(row)));
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

With the current BiFunction, (row, rowType) -> new RowDataSerializer(rowType).copy(row) creates a new RowDataSerializer for every buffered row (default buffer = 100). This construction is not free, as it involves walking rowType.getChildren(), building a TypeSerializer[] via InternalSerializers.create, a BinaryRowDataSerializer, and a RowData.FieldGetter[]. Since the engine schema is fixed for the entire file, a factory allows us to build it once and reuse it. Using the Factory Pattern, we can avoid recreating the serializer for a given table schema with every incoming record.

@github-actions github-actions Bot added the build label May 9, 2026
@Guosmilesmile Guosmilesmile force-pushed the flink_shredded_varisnt_fileformat branch from 23b94ed to 1c72097 Compare May 9, 2026 06:55
Copy link
Copy Markdown
Contributor

@aihuaxu aihuaxu left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Minor comments. It mirrors what Spark does. The implementation is clean in general to me.

.parse();
}

public int variantInferenceBufferSize() {
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should this be Parquet specific as well?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for pointing out. Initially I intended to make it a general parameter. But after your suggestion, I realized that the table property it references is Parquet-specific, so it still needs to be defined as a Parquet-only parameter.

ConfigOptions.key("parquet-shred-variants").booleanType().defaultValue(false);

public static final ConfigOption<Integer> VARIANT_INFERENCE_BUFFER_SIZE =
ConfigOptions.key("variant-inference-buffer-size").intType().defaultValue(10);
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe default to 100 to align with TableProperties.PARQUET_VARIANT_BUFFER_SIZE_DEFAULT value?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You are right, at the first implement in spark is 10, last it change to 100 . I have make it the same .

FlinkParquetReaders.buildReader(icebergSchema, fileSchema, idToConstant)));
FlinkParquetReaders.buildReader(icebergSchema, fileSchema, idToConstant),
new FlinkVariantShreddingAnalyzer(),
(row, rowType) -> new RowDataSerializer(rowType).copy(row)));
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1. We should be able to reuse RowDataSerializer so we don't need to create new instance for every row.

SparkParquetReaders.buildReader(icebergSchema, fileSchema, idToConstant),
new SparkVariantShreddingAnalyzer(),
InternalRow::copy));
structType -> InternalRow::copy));
Copy link
Copy Markdown
Contributor

@talatuyarer talatuyarer May 9, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We dont need this change anymore. Can you revert it ?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since we changed UnaryOperator<D> to Function<S, UnaryOperator<D>>, the Spark part should be adjusted accordingly.

SparkParquetReaders.buildReader(icebergSchema, fileSchema, idToConstant),
new SparkVariantShreddingAnalyzer(),
InternalRow::copy));
structType -> InternalRow::copy));
Copy link
Copy Markdown
Contributor

@talatuyarer talatuyarer May 9, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We dont need this change anymore. Can you revert it ?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The same above.

}

@TestTemplate
public void testDecimalFallbackAfterBuffer() {
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

My understanding is that this test verifies that rows which don't fit the inferred shredded type are written to the unshredded value field. However, the test only performs a round trip verification. The schema assertion would be the most informative part of this test.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good point. I will add a asset to verify the schema.

}

@TestTemplate
public void testInfrequentFieldPruning() throws IOException {
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The test is implicitly coupled to MIN_FIELD_FREQUENCY = 0.10. The ratio of 1/11 (≈ 0.0909) is just below this threshold. If this constant is adjusted, the test's outcome will change without an obvious failure. Either reference the constant or add a comment to document this dependency.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good point. Add a comment to clarify it

Comment on lines +270 to +271
.tableProperty(TableProperties.PARQUET_SHRED_VARIANTS)
.defaultValue(TableProperties.PARQUET_SHRED_VARIANTS_DEFAULT)
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's do parquet for now since we followed that pattern for the Spark implementation.

public int parquetVariantInferenceBufferSize() {
return confParser
.intConf()
.option(FlinkWriteOptions.PARQUET_VARIANT_INFERENCE_BUFFER_SIZE.key())
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could you add these to the flink documentation similar to the #14297 for spark?

Copy link
Copy Markdown
Contributor Author

@Guosmilesmile Guosmilesmile May 10, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Right, will add it.

public static final ConfigOption<Boolean> PARQUET_SHRED_VARIANTS =
ConfigOptions.key("parquet-shred-variants").booleanType().defaultValue(false);

public static final ConfigOption<Integer> PARQUET_VARIANT_INFERENCE_BUFFER_SIZE =
Copy link
Copy Markdown
Contributor

@nssalian nssalian May 9, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could you rename this to VARIANT_INFERENCE_BUFFER_SIZE to be consistent with Spark?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Right, change it.

private final boolean isBatchReader;
private final VariantShreddingAnalyzer<D, S> variantAnalyzer;
private final UnaryOperator<D> copyFunc;
private final Function<S, UnaryOperator<D>> copyFuncFactory;
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Based on this comment thread we decided to keep the UnaryOperator: #14297 (comment). @pvary suggest BiFunction too? Worth exploring to see what is the best way or we can keep it to Unary too.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The previous version used BiFunction, which caused RowDataSerializer to be repeatedly created. To avoid this, we use Function<S, UnaryOperator<D>> instead, so that RowDataSerializer can be initialized once before BufferedFileAppender is created.

GenericParquetReaders.buildReader(icebergSchema, fileSchema),
testAnalyzer,
record -> record);
unused -> oriRecord -> oriRecord);
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What was this change for?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We change UnaryOperator<D> to Function<S, UnaryOperator<D>>

@nssalian
Copy link
Copy Markdown
Contributor

nssalian commented May 9, 2026

Thanks for cleaning up this PR @Guosmilesmile. Do we need to back port this to additional Flink versions as well? CC: @pvary @stevenzwu

@Guosmilesmile
Copy link
Copy Markdown
Contributor Author

Thanks for cleaning up this PR @Guosmilesmile. Do we need to back port this to additional Flink versions as well? CC: @pvary @stevenzwu

@nssalian Flink support variant type in 2.1 . So we only add this feature in version of 2.1 .

@github-actions github-actions Bot added the docs label May 10, 2026
@Guosmilesmile Guosmilesmile force-pushed the flink_shredded_varisnt_fileformat branch from 1ae2aac to 8bf41e8 Compare May 10, 2026 06:37
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

5 participants