Flink: Support writing shredded variant in Flink#15596
Flink: Support writing shredded variant in Flink#15596Guosmilesmile wants to merge 7 commits intoapache:mainfrom
Conversation
15ff223 to
5b448b9
Compare
8f6198a to
b03caf6
Compare
88045e1 to
cbfa8c2
Compare
fae2814 to
f3a2fba
Compare
b07b00b to
c95d78f
Compare
fc8c45a to
b116f25
Compare
b116f25 to
650cb7a
Compare
770d9c4 to
7d48389
Compare
| .tableProperty(TableProperties.PARQUET_SHRED_VARIANTS) | ||
| .defaultValue(TableProperties.PARQUET_SHRED_VARIANTS_DEFAULT) |
There was a problem hiding this comment.
How will we handle when ORC supports shredding variants?
There was a problem hiding this comment.
Good catch . I rename shred-variants to parquet-shred-variants to clarify this feature is only support parquet . If orc support this, then we can add another config.
There was a problem hiding this comment.
Let's do parquet for now since we followed that pattern for the Spark implementation.
| FlinkParquetReaders.buildReader(icebergSchema, fileSchema, idToConstant))); | ||
| FlinkParquetReaders.buildReader(icebergSchema, fileSchema, idToConstant), | ||
| new FlinkVariantShreddingAnalyzer(), | ||
| (row, rowType) -> new RowDataSerializer(rowType).copy(row))); |
There was a problem hiding this comment.
Isn't this costly to recreate every time when we copy a row?
There was a problem hiding this comment.
It will increase the cost, but without copying, there would be issues with data corruption when buffer data. We ran into this during early development, and the unit tests can reproduce it.
There was a problem hiding this comment.
Can we reuse the RowDataSerializer?
There was a problem hiding this comment.
With the current BiFunction, (row, rowType) -> new RowDataSerializer(rowType).copy(row) creates a new RowDataSerializer for every buffered row (default buffer = 100). This construction is not free, as it involves walking rowType.getChildren(), building a TypeSerializer[] via InternalSerializers.create, a BinaryRowDataSerializer, and a RowData.FieldGetter[]. Since the engine schema is fixed for the entire file, a factory allows us to build it once and reuse it. Using the Factory Pattern, we can avoid recreating the serializer for a given table schema with every incoming record.
There was a problem hiding this comment.
Yes, we can use Function<S, UnaryOperator<D>> instead of BiFunction<D, S, D> to implement this.
There was a problem hiding this comment.
+1. We should be able to reuse RowDataSerializer so we don't need to create new instance for every row.
63ae5ae to
0f2ae10
Compare
talatuyarer
left a comment
There was a problem hiding this comment.
Thanks for putting this together. the structure mirrors the Spark side cleanly, and the test coverage of inference behaviors (tie-breaking, decimal fallback, cross-file types) is genuinely valuable. I believe we should address reconstructing RowDataSerializer on every buffered row.
| FlinkParquetReaders.buildReader(icebergSchema, fileSchema, idToConstant))); | ||
| FlinkParquetReaders.buildReader(icebergSchema, fileSchema, idToConstant), | ||
| new FlinkVariantShreddingAnalyzer(), | ||
| (row, rowType) -> new RowDataSerializer(rowType).copy(row))); |
There was a problem hiding this comment.
With the current BiFunction, (row, rowType) -> new RowDataSerializer(rowType).copy(row) creates a new RowDataSerializer for every buffered row (default buffer = 100). This construction is not free, as it involves walking rowType.getChildren(), building a TypeSerializer[] via InternalSerializers.create, a BinaryRowDataSerializer, and a RowData.FieldGetter[]. Since the engine schema is fixed for the entire file, a factory allows us to build it once and reuse it. Using the Factory Pattern, we can avoid recreating the serializer for a given table schema with every incoming record.
23b94ed to
1c72097
Compare
aihuaxu
left a comment
There was a problem hiding this comment.
Minor comments. It mirrors what Spark does. The implementation is clean in general to me.
| .parse(); | ||
| } | ||
|
|
||
| public int variantInferenceBufferSize() { |
There was a problem hiding this comment.
Should this be Parquet specific as well?
There was a problem hiding this comment.
Thanks for pointing out. Initially I intended to make it a general parameter. But after your suggestion, I realized that the table property it references is Parquet-specific, so it still needs to be defined as a Parquet-only parameter.
| ConfigOptions.key("parquet-shred-variants").booleanType().defaultValue(false); | ||
|
|
||
| public static final ConfigOption<Integer> VARIANT_INFERENCE_BUFFER_SIZE = | ||
| ConfigOptions.key("variant-inference-buffer-size").intType().defaultValue(10); |
There was a problem hiding this comment.
Maybe default to 100 to align with TableProperties.PARQUET_VARIANT_BUFFER_SIZE_DEFAULT value?
There was a problem hiding this comment.
You are right, at the first implement in spark is 10, last it change to 100 . I have make it the same .
| FlinkParquetReaders.buildReader(icebergSchema, fileSchema, idToConstant))); | ||
| FlinkParquetReaders.buildReader(icebergSchema, fileSchema, idToConstant), | ||
| new FlinkVariantShreddingAnalyzer(), | ||
| (row, rowType) -> new RowDataSerializer(rowType).copy(row))); |
There was a problem hiding this comment.
+1. We should be able to reuse RowDataSerializer so we don't need to create new instance for every row.
| SparkParquetReaders.buildReader(icebergSchema, fileSchema, idToConstant), | ||
| new SparkVariantShreddingAnalyzer(), | ||
| InternalRow::copy)); | ||
| structType -> InternalRow::copy)); |
There was a problem hiding this comment.
We dont need this change anymore. Can you revert it ?
There was a problem hiding this comment.
Since we changed UnaryOperator<D> to Function<S, UnaryOperator<D>>, the Spark part should be adjusted accordingly.
| SparkParquetReaders.buildReader(icebergSchema, fileSchema, idToConstant), | ||
| new SparkVariantShreddingAnalyzer(), | ||
| InternalRow::copy)); | ||
| structType -> InternalRow::copy)); |
There was a problem hiding this comment.
We dont need this change anymore. Can you revert it ?
There was a problem hiding this comment.
The same above.
| } | ||
|
|
||
| @TestTemplate | ||
| public void testDecimalFallbackAfterBuffer() { |
There was a problem hiding this comment.
My understanding is that this test verifies that rows which don't fit the inferred shredded type are written to the unshredded value field. However, the test only performs a round trip verification. The schema assertion would be the most informative part of this test.
There was a problem hiding this comment.
Good point. I will add a asset to verify the schema.
| } | ||
|
|
||
| @TestTemplate | ||
| public void testInfrequentFieldPruning() throws IOException { |
There was a problem hiding this comment.
The test is implicitly coupled to MIN_FIELD_FREQUENCY = 0.10. The ratio of 1/11 (≈ 0.0909) is just below this threshold. If this constant is adjusted, the test's outcome will change without an obvious failure. Either reference the constant or add a comment to document this dependency.
There was a problem hiding this comment.
Good point. Add a comment to clarify it
| .tableProperty(TableProperties.PARQUET_SHRED_VARIANTS) | ||
| .defaultValue(TableProperties.PARQUET_SHRED_VARIANTS_DEFAULT) |
There was a problem hiding this comment.
Let's do parquet for now since we followed that pattern for the Spark implementation.
| public int parquetVariantInferenceBufferSize() { | ||
| return confParser | ||
| .intConf() | ||
| .option(FlinkWriteOptions.PARQUET_VARIANT_INFERENCE_BUFFER_SIZE.key()) |
There was a problem hiding this comment.
Could you add these to the flink documentation similar to the #14297 for spark?
There was a problem hiding this comment.
Right, will add it.
| public static final ConfigOption<Boolean> PARQUET_SHRED_VARIANTS = | ||
| ConfigOptions.key("parquet-shred-variants").booleanType().defaultValue(false); | ||
|
|
||
| public static final ConfigOption<Integer> PARQUET_VARIANT_INFERENCE_BUFFER_SIZE = |
There was a problem hiding this comment.
Could you rename this to VARIANT_INFERENCE_BUFFER_SIZE to be consistent with Spark?
There was a problem hiding this comment.
Right, change it.
| private final boolean isBatchReader; | ||
| private final VariantShreddingAnalyzer<D, S> variantAnalyzer; | ||
| private final UnaryOperator<D> copyFunc; | ||
| private final Function<S, UnaryOperator<D>> copyFuncFactory; |
There was a problem hiding this comment.
Based on this comment thread we decided to keep the UnaryOperator: #14297 (comment). @pvary suggest BiFunction too? Worth exploring to see what is the best way or we can keep it to Unary too.
There was a problem hiding this comment.
The previous version used BiFunction, which caused RowDataSerializer to be repeatedly created. To avoid this, we use Function<S, UnaryOperator<D>> instead, so that RowDataSerializer can be initialized once before BufferedFileAppender is created.
| GenericParquetReaders.buildReader(icebergSchema, fileSchema), | ||
| testAnalyzer, | ||
| record -> record); | ||
| unused -> oriRecord -> oriRecord); |
There was a problem hiding this comment.
We change UnaryOperator<D> to Function<S, UnaryOperator<D>>
|
Thanks for cleaning up this PR @Guosmilesmile. Do we need to back port this to additional Flink versions as well? CC: @pvary @stevenzwu |
@nssalian Flink support variant type in 2.1 . So we only add this feature in version of 2.1 . |
1ae2aac to
8bf41e8
Compare
This PR is mainly to add support in Flink for writing shredding-variant data to Iceberg tables, based on #14297.
This PR is based on #14297 and will be adjusted in sync with it.