From d67df8faf8efe4a688066088bec07724946a82a7 Mon Sep 17 00:00:00 2001 From: slfan1989 Date: Sun, 5 Apr 2026 11:08:36 +0800 Subject: [PATCH] [AMORO-4166] [Improvement]: Implement heap-based flush mechanism for SortedPosDeleteWriter to prevent OOM --- .../amoro/io/writer/GenericTaskWriters.java | 20 +- .../writer/MixedTreeNodePosDeleteWriter.java | 51 ++- .../io/writer/SortedPosDeleteWriter.java | 141 ++++++- .../MixedIcebergRewriteExecutor.java | 3 +- .../apache/amoro/table/TableProperties.java | 10 + .../io/TestMixedTreeNodePosDeleteWriter.java | 3 +- .../TestSortedPosDeleteWriterHeapFlush.java | 353 ++++++++++++++++++ .../AdaptHiveGenericTaskWriterBuilder.java | 20 +- .../optimizing/MixedHiveRewriteExecutor.java | 3 +- .../spark/io/UnkeyedUpsertSparkWriter.java | 29 +- .../spark/io/UnkeyedUpsertSparkWriter.java | 29 +- .../spark/io/UnkeyedUpsertSparkWriter.java | 29 +- 12 files changed, 675 insertions(+), 16 deletions(-) create mode 100644 amoro-format-iceberg/src/test/java/org/apache/amoro/io/writer/TestSortedPosDeleteWriterHeapFlush.java diff --git a/amoro-format-iceberg/src/main/java/org/apache/amoro/io/writer/GenericTaskWriters.java b/amoro-format-iceberg/src/main/java/org/apache/amoro/io/writer/GenericTaskWriters.java index 5857402f5c..611f3be1f0 100644 --- a/amoro-format-iceberg/src/main/java/org/apache/amoro/io/writer/GenericTaskWriters.java +++ b/amoro-format-iceberg/src/main/java/org/apache/amoro/io/writer/GenericTaskWriters.java @@ -170,6 +170,21 @@ public SortedPosDeleteWriter buildBasePosDeleteWriter( org.apache.iceberg.TableProperties.METRICS_MODE_COLUMN_CONF_PREFIX + MetadataColumns.DELETE_FILE_POS.name(), MetricsModes.Full.get().toString()); + double heapUsageRatioThreshold = + PropertyUtil.propertyAsDouble( + table.properties(), + TableProperties.POS_DELETE_FLUSH_HEAP_RATIO, + TableProperties.POS_DELETE_FLUSH_HEAP_RATIO_DEFAULT); + long recordsNumThreshold = + PropertyUtil.propertyAsLong( + table.properties(), + TableProperties.POS_DELETE_FLUSH_RECORDS, + TableProperties.POS_DELETE_FLUSH_RECORDS_DEFAULT); + int heapFlushMinRecords = + PropertyUtil.propertyAsInt( + table.properties(), + TableProperties.POS_DELETE_FLUSH_HEAP_MIN_RECORDS, + TableProperties.POS_DELETE_FLUSH_HEAP_MIN_RECORDS_DEFAULT); return new SortedPosDeleteWriter<>( appenderFactory, new CommonOutputFileFactory( @@ -185,7 +200,10 @@ public SortedPosDeleteWriter buildBasePosDeleteWriter( fileFormat, mask, index, - partitionKey); + partitionKey, + recordsNumThreshold, + heapUsageRatioThreshold, + heapFlushMinRecords); } public GenericChangeTaskWriter buildChangeWriter() { diff --git a/amoro-format-iceberg/src/main/java/org/apache/amoro/io/writer/MixedTreeNodePosDeleteWriter.java b/amoro-format-iceberg/src/main/java/org/apache/amoro/io/writer/MixedTreeNodePosDeleteWriter.java index f2de58cccb..6433fa8db3 100644 --- a/amoro-format-iceberg/src/main/java/org/apache/amoro/io/writer/MixedTreeNodePosDeleteWriter.java +++ b/amoro-format-iceberg/src/main/java/org/apache/amoro/io/writer/MixedTreeNodePosDeleteWriter.java @@ -21,6 +21,7 @@ import org.apache.amoro.data.DataTreeNode; import org.apache.amoro.io.AuthenticatedFileIO; import org.apache.amoro.shade.guava32.com.google.common.collect.Maps; +import org.apache.amoro.table.TableProperties; import org.apache.iceberg.DeleteFile; import org.apache.iceberg.FileFormat; import org.apache.iceberg.PartitionSpec; @@ -30,9 +31,11 @@ import org.apache.iceberg.io.DeleteWriteResult; import org.apache.iceberg.io.FileAppenderFactory; import org.apache.iceberg.io.FileWriter; +import org.apache.iceberg.util.PropertyUtil; import java.io.IOException; import java.util.ArrayList; +import java.util.Collections; import java.util.List; import java.util.Map; @@ -62,6 +65,9 @@ public class MixedTreeNodePosDeleteWriter private final String location; private final PartitionSpec spec; + private final double heapUsageRatioThreshold; + private final long recordsNumThreshold; + private final int heapFlushMinRecords; public MixedTreeNodePosDeleteWriter( FileAppenderFactory appenderFactory, @@ -71,7 +77,9 @@ public MixedTreeNodePosDeleteWriter( EncryptionManager encryptionManager, Long transactionId, String location, - PartitionSpec spec) { + PartitionSpec spec, + Map properties) { + Map safeProperties = properties == null ? Collections.emptyMap() : properties; this.appenderFactory = appenderFactory; this.format = format; this.partition = partition; @@ -80,6 +88,42 @@ public MixedTreeNodePosDeleteWriter( this.transactionId = transactionId; this.location = location; this.spec = spec; + this.heapUsageRatioThreshold = + PropertyUtil.propertyAsDouble( + safeProperties, + TableProperties.POS_DELETE_FLUSH_HEAP_RATIO, + TableProperties.POS_DELETE_FLUSH_HEAP_RATIO_DEFAULT); + this.recordsNumThreshold = + PropertyUtil.propertyAsLong( + safeProperties, + TableProperties.POS_DELETE_FLUSH_RECORDS, + TableProperties.POS_DELETE_FLUSH_RECORDS_DEFAULT); + this.heapFlushMinRecords = + PropertyUtil.propertyAsInt( + safeProperties, + TableProperties.POS_DELETE_FLUSH_HEAP_MIN_RECORDS, + TableProperties.POS_DELETE_FLUSH_HEAP_MIN_RECORDS_DEFAULT); + } + + public MixedTreeNodePosDeleteWriter( + FileAppenderFactory appenderFactory, + FileFormat format, + StructLike partition, + AuthenticatedFileIO fileIO, + EncryptionManager encryptionManager, + Long transactionId, + String location, + PartitionSpec spec) { + this( + appenderFactory, + format, + partition, + fileIO, + encryptionManager, + transactionId, + location, + spec, + Collections.emptyMap()); } @Override @@ -109,7 +153,10 @@ private SortedPosDeleteWriter generatePosDelete(DataTreeNode treeNode) { format, treeNode.mask(), treeNode.index(), - partition); + partition, + recordsNumThreshold, + heapUsageRatioThreshold, + heapFlushMinRecords); } public List complete() throws IOException { diff --git a/amoro-format-iceberg/src/main/java/org/apache/amoro/io/writer/SortedPosDeleteWriter.java b/amoro-format-iceberg/src/main/java/org/apache/amoro/io/writer/SortedPosDeleteWriter.java index 2464cd1949..710e1888a5 100644 --- a/amoro-format-iceberg/src/main/java/org/apache/amoro/io/writer/SortedPosDeleteWriter.java +++ b/amoro-format-iceberg/src/main/java/org/apache/amoro/io/writer/SortedPosDeleteWriter.java @@ -49,6 +49,27 @@ */ public class SortedPosDeleteWriter implements Closeable { private static final long DEFAULT_RECORDS_NUM_THRESHOLD = Long.MAX_VALUE; + private static final Runtime RUNTIME = Runtime.getRuntime(); + // Note: totalMemory/freeMemory only reflect the currently allocated heap, not max heap. + // When the heap has not fully expanded, usedMemory() can be underestimated and delay flushes. + private static final HeapUsageProvider RUNTIME_HEAP_USAGE = + new HeapUsageProvider() { + @Override + public long maxMemory() { + return RUNTIME.maxMemory(); + } + + @Override + public long usedMemory() { + return RUNTIME.totalMemory() - RUNTIME.freeMemory(); + } + }; + + interface HeapUsageProvider { + long maxMemory(); + + long usedMemory(); + } private final Map>> posDeletes = Maps.newHashMap(); private final List completedFiles = Lists.newArrayList(); @@ -61,6 +82,9 @@ public class SortedPosDeleteWriter implements Closeable { private final FileFormat format; private final TaskWriterKey writerKey; private final long recordsNumThreshold; + private final double heapUsageRatioThreshold; + private final int heapFlushMinRecords; + private final HeapUsageProvider heapUsageProvider; private int records = 0; @@ -72,7 +96,35 @@ public SortedPosDeleteWriter( long mask, long index, StructLike partitionKey, - long recordsNumThreshold) { + long recordsNumThreshold, + double heapUsageRatioThreshold, + int heapFlushMinRecords) { + this( + appenderFactory, + fileFactory, + io, + format, + mask, + index, + partitionKey, + recordsNumThreshold, + heapUsageRatioThreshold, + heapFlushMinRecords, + RUNTIME_HEAP_USAGE); + } + + SortedPosDeleteWriter( + FileAppenderFactory appenderFactory, + OutputFileFactory fileFactory, + AuthenticatedFileIO io, + FileFormat format, + long mask, + long index, + StructLike partitionKey, + long recordsNumThreshold, + double heapUsageRatioThreshold, + int heapFlushMinRecords, + HeapUsageProvider heapUsageProvider) { this.appenderFactory = appenderFactory; this.fileFactory = fileFactory; this.io = io; @@ -80,6 +132,31 @@ public SortedPosDeleteWriter( this.writerKey = new TaskWriterKey(partitionKey, DataTreeNode.of(mask, index), DataFileType.POS_DELETE_FILE); this.recordsNumThreshold = recordsNumThreshold; + this.heapUsageRatioThreshold = heapUsageRatioThreshold; + this.heapFlushMinRecords = heapFlushMinRecords; + this.heapUsageProvider = heapUsageProvider; + } + + public SortedPosDeleteWriter( + FileAppenderFactory appenderFactory, + OutputFileFactory fileFactory, + AuthenticatedFileIO io, + FileFormat format, + long mask, + long index, + StructLike partitionKey, + long recordsNumThreshold) { + this( + appenderFactory, + fileFactory, + io, + format, + mask, + index, + partitionKey, + recordsNumThreshold, + 0d, + 0); } public SortedPosDeleteWriter( @@ -98,7 +175,9 @@ public SortedPosDeleteWriter( mask, index, partitionKey, - DEFAULT_RECORDS_NUM_THRESHOLD); + DEFAULT_RECORDS_NUM_THRESHOLD, + 0d, + 0); } public SortedPosDeleteWriter( @@ -115,7 +194,31 @@ public SortedPosDeleteWriter( 0, 0, partitionKey, - DEFAULT_RECORDS_NUM_THRESHOLD); + DEFAULT_RECORDS_NUM_THRESHOLD, + 0d, + 0); + } + + public SortedPosDeleteWriter( + FileAppenderFactory appenderFactory, + OutputFileFactory fileFactory, + AuthenticatedFileIO io, + FileFormat format, + StructLike partitionKey, + long recordsNumThreshold, + double heapUsageRatioThreshold, + int heapFlushMinRecords) { + this( + appenderFactory, + fileFactory, + io, + format, + 0, + 0, + partitionKey, + recordsNumThreshold, + heapUsageRatioThreshold, + heapFlushMinRecords); } public void delete(CharSequence path, long pos) { @@ -132,9 +235,17 @@ public void delete(CharSequence path, long pos, T row) { records += 1; - // TODO Flush buffer based on the policy that checking whether whole heap memory size exceed the - // threshold. - if (records >= recordsNumThreshold) { + // Avoid querying JVM memory on every delete (may be costly on some JDKs). + // Sample heap usage every heapFlushMinRecords records at most. + boolean flushByHeap = false; + if (heapUsageRatioThreshold > 0 && heapUsageRatioThreshold < 1) { + int checkInterval = Math.max(1, heapFlushMinRecords); + if (records % checkInterval == 0) { + flushByHeap = shouldFlushByHeap(); + } + } + + if (records >= recordsNumThreshold || flushByHeap) { flushDeletes(); } } @@ -208,6 +319,24 @@ private void flushDeletes() { completedFiles.add(writer.toDeleteFile()); } + private boolean shouldFlushByHeap() { + // Guard: allow disabling heap-based flushing by setting an invalid ratio. + if (heapUsageRatioThreshold <= 0 || heapUsageRatioThreshold >= 1) { + return false; + } + // Guard: avoid flushing too frequently on small buffers. + if (records < Math.max(1, heapFlushMinRecords)) { + return false; + } + long max = heapUsageProvider.maxMemory(); + if (max <= 0) { + return false; + } + // Approximate current heap usage; do not force GC here. + long used = heapUsageProvider.usedMemory(); + return used >= (long) (max * heapUsageRatioThreshold); + } + private static class PosRow { private final long pos; private final R row; diff --git a/amoro-format-iceberg/src/main/java/org/apache/amoro/optimizing/MixedIcebergRewriteExecutor.java b/amoro-format-iceberg/src/main/java/org/apache/amoro/optimizing/MixedIcebergRewriteExecutor.java index f274e300ae..fcf958b93c 100644 --- a/amoro-format-iceberg/src/main/java/org/apache/amoro/optimizing/MixedIcebergRewriteExecutor.java +++ b/amoro-format-iceberg/src/main/java/org/apache/amoro/optimizing/MixedIcebergRewriteExecutor.java @@ -55,7 +55,8 @@ protected FileWriter, DeleteWriteResult> posWriter() { encryptionManager(), getTransactionId(input.rePosDeletedDataFilesForMixed()), baseLocation(), - table.spec()); + table.spec(), + table.properties()); } @Override diff --git a/amoro-format-iceberg/src/main/java/org/apache/amoro/table/TableProperties.java b/amoro-format-iceberg/src/main/java/org/apache/amoro/table/TableProperties.java index 015c68d480..d4f4fdf848 100644 --- a/amoro-format-iceberg/src/main/java/org/apache/amoro/table/TableProperties.java +++ b/amoro-format-iceberg/src/main/java/org/apache/amoro/table/TableProperties.java @@ -283,6 +283,16 @@ private TableProperties() {} org.apache.iceberg.TableProperties.WRITE_TARGET_FILE_SIZE_BYTES; public static final long WRITE_TARGET_FILE_SIZE_BYTES_DEFAULT = 134217728; // 128 MB + public static final String POS_DELETE_FLUSH_HEAP_RATIO = "pos-delete.flush.heap.ratio"; + public static final double POS_DELETE_FLUSH_HEAP_RATIO_DEFAULT = 0.8d; + + public static final String POS_DELETE_FLUSH_RECORDS = "pos-delete.flush.records"; + public static final long POS_DELETE_FLUSH_RECORDS_DEFAULT = Long.MAX_VALUE; + + public static final String POS_DELETE_FLUSH_HEAP_MIN_RECORDS = + "pos-delete.flush.heap.min-records"; + public static final int POS_DELETE_FLUSH_HEAP_MIN_RECORDS_DEFAULT = 1000; + public static final String UPSERT_ENABLED = "write.upsert.enabled"; public static final boolean UPSERT_ENABLED_DEFAULT = false; diff --git a/amoro-format-iceberg/src/test/java/org/apache/amoro/io/TestMixedTreeNodePosDeleteWriter.java b/amoro-format-iceberg/src/test/java/org/apache/amoro/io/TestMixedTreeNodePosDeleteWriter.java index 2f58540ac0..975ecad8c7 100644 --- a/amoro-format-iceberg/src/test/java/org/apache/amoro/io/TestMixedTreeNodePosDeleteWriter.java +++ b/amoro-format-iceberg/src/test/java/org/apache/amoro/io/TestMixedTreeNodePosDeleteWriter.java @@ -75,7 +75,8 @@ public void test() throws IOException { table.encryption(), 1L, table.location(), - table.spec()); + table.spec(), + table.properties()); writer.setTreeNode(DataTreeNode.ofId(4)); writer.delete("a", 0); diff --git a/amoro-format-iceberg/src/test/java/org/apache/amoro/io/writer/TestSortedPosDeleteWriterHeapFlush.java b/amoro-format-iceberg/src/test/java/org/apache/amoro/io/writer/TestSortedPosDeleteWriterHeapFlush.java new file mode 100644 index 0000000000..34ec0cfa89 --- /dev/null +++ b/amoro-format-iceberg/src/test/java/org/apache/amoro/io/writer/TestSortedPosDeleteWriterHeapFlush.java @@ -0,0 +1,353 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.amoro.io.writer; + +import static org.junit.jupiter.api.Assertions.assertEquals; + +import org.apache.amoro.BasicTableTestHelper; +import org.apache.amoro.TableFormat; +import org.apache.amoro.catalog.BasicCatalogTestHelper; +import org.apache.amoro.catalog.TableTestBase; +import org.apache.amoro.table.UnkeyedTable; +import org.apache.iceberg.DeleteFile; +import org.apache.iceberg.FileFormat; +import org.apache.iceberg.MetadataColumns; +import org.apache.iceberg.MetricsModes; +import org.apache.iceberg.StructLike; +import org.apache.iceberg.TestHelpers; +import org.apache.iceberg.data.GenericAppenderFactory; +import org.apache.iceberg.data.Record; +import org.junit.jupiter.api.AfterAll; +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.BeforeAll; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; + +import java.io.IOException; +import java.util.Arrays; +import java.util.List; +import java.util.stream.Collectors; + +public class TestSortedPosDeleteWriterHeapFlush extends TableTestBase { + + public TestSortedPosDeleteWriterHeapFlush() { + super( + new BasicCatalogTestHelper(TableFormat.MIXED_ICEBERG), + new BasicTableTestHelper(false, false)); + } + + @BeforeAll + public static void startAms() throws Exception { + TEST_AMS.before(); + } + + @AfterAll + public static void stopAms() { + TEST_AMS.after(); + } + + @BeforeEach + public void setup() throws Exception { + temp.create(); + setupCatalog(); + setupTable(); + } + + @AfterEach + public void tearDown() { + if (getCatalogMeta() != null) { + dropTable(); + dropCatalog(); + } + temp.delete(); + } + + @Test + public void testFlushTriggeredByHeapUsage() throws IOException { + SortedPosDeleteWriter writer = + newWriter( + 0.8d, + 2, + new SortedPosDeleteWriter.HeapUsageProvider() { + @Override + public long maxMemory() { + return 100L; + } + + @Override + public long usedMemory() { + return 90L; + } + }); + + writer.delete("file-a", 1L); + writer.delete("file-a", 2L); + writer.delete("file-a", 3L); + + List deleteFiles = writer.complete(); + assertEquals(2, deleteFiles.size()); + List recordCounts = + deleteFiles.stream().map(DeleteFile::recordCount).sorted().collect(Collectors.toList()); + assertEquals(Arrays.asList(1L, 2L), recordCounts); + } + + @Test + public void testHeapFlushDisabledByRatio() throws IOException { + SortedPosDeleteWriter writer = + newWriter( + 0d, + 1, + new SortedPosDeleteWriter.HeapUsageProvider() { + @Override + public long maxMemory() { + return 100L; + } + + @Override + public long usedMemory() { + return 100L; + } + }); + + writer.delete("file-b", 1L); + writer.delete("file-b", 2L); + writer.delete("file-b", 3L); + + List deleteFiles = writer.complete(); + assertEquals(1, deleteFiles.size()); + assertEquals(3L, deleteFiles.get(0).recordCount()); + } + + @Test + public void testHeapFlushDisabledByRatioAtLeastOne() throws IOException { + SortedPosDeleteWriter writer = + newWriter( + 1.0d, + 1, + new SortedPosDeleteWriter.HeapUsageProvider() { + @Override + public long maxMemory() { + return 100L; + } + + @Override + public long usedMemory() { + return 100L; + } + }); + + writer.delete("file-c", 1L); + writer.delete("file-c", 2L); + writer.delete("file-c", 3L); + + List deleteFiles = writer.complete(); + assertEquals(1, deleteFiles.size()); + assertEquals(3L, deleteFiles.get(0).recordCount()); + } + + @Test + public void testHeapFlushDisabledByNegativeRatio() throws IOException { + SortedPosDeleteWriter writer = + newWriter( + -0.1d, + 1, + new SortedPosDeleteWriter.HeapUsageProvider() { + @Override + public long maxMemory() { + return 100L; + } + + @Override + public long usedMemory() { + return 100L; + } + }); + + writer.delete("file-d", 1L); + writer.delete("file-d", 2L); + writer.delete("file-d", 3L); + + List deleteFiles = writer.complete(); + assertEquals(1, deleteFiles.size()); + assertEquals(3L, deleteFiles.get(0).recordCount()); + } + + @Test + public void testHeapFlushDisabledWhenMaxMemoryUnavailable() throws IOException { + SortedPosDeleteWriter writer = + newWriter( + 0.8d, + 1, + new SortedPosDeleteWriter.HeapUsageProvider() { + @Override + public long maxMemory() { + return 0L; + } + + @Override + public long usedMemory() { + return 100L; + } + }); + + writer.delete("file-e", 1L); + writer.delete("file-e", 2L); + writer.delete("file-e", 3L); + + List deleteFiles = writer.complete(); + assertEquals(1, deleteFiles.size()); + assertEquals(3L, deleteFiles.get(0).recordCount()); + } + + @Test + public void testHeapFlushWhenMinRecordsZero() throws IOException { + SortedPosDeleteWriter writer = + newWriter( + 0.8d, + 0, + new SortedPosDeleteWriter.HeapUsageProvider() { + @Override + public long maxMemory() { + return 100L; + } + + @Override + public long usedMemory() { + return 90L; + } + }); + + writer.delete("file-f", 1L); + writer.delete("file-f", 2L); + writer.delete("file-f", 3L); + + List deleteFiles = writer.complete(); + assertEquals(3, deleteFiles.size()); + assertEquals(Arrays.asList(1L, 1L, 1L), recordCounts(deleteFiles)); + } + + @Test + public void testMultipleHeapFlushCycles() throws IOException { + SortedPosDeleteWriter writer = + newWriter( + 0.8d, + 2, + new SortedPosDeleteWriter.HeapUsageProvider() { + @Override + public long maxMemory() { + return 100L; + } + + @Override + public long usedMemory() { + return 90L; + } + }); + + writer.delete("file-g", 1L); + writer.delete("file-g", 2L); + writer.delete("file-g", 3L); + writer.delete("file-g", 4L); + writer.delete("file-g", 5L); + + List deleteFiles = writer.complete(); + assertEquals(3, deleteFiles.size()); + assertEquals(Arrays.asList(1L, 2L, 2L), recordCounts(deleteFiles)); + } + + @Test + public void testRecordsNumThresholdTriggersWithoutHeapSampling() throws IOException { + SortedPosDeleteWriter writer = + newWriter( + 3L, + 0.8d, + 10, + new SortedPosDeleteWriter.HeapUsageProvider() { + @Override + public long maxMemory() { + return 100L; + } + + @Override + public long usedMemory() { + return 90L; + } + }); + + writer.delete("file-h", 1L); + writer.delete("file-h", 2L); + writer.delete("file-h", 3L); + + List deleteFiles = writer.complete(); + assertEquals(1, deleteFiles.size()); + assertEquals(3L, deleteFiles.get(0).recordCount()); + } + + private SortedPosDeleteWriter newWriter( + double heapUsageRatioThreshold, + int heapFlushMinRecords, + SortedPosDeleteWriter.HeapUsageProvider heapUsageProvider) { + return newWriter( + Long.MAX_VALUE, heapUsageRatioThreshold, heapFlushMinRecords, heapUsageProvider); + } + + private SortedPosDeleteWriter newWriter( + long recordsNumThreshold, + double heapUsageRatioThreshold, + int heapFlushMinRecords, + SortedPosDeleteWriter.HeapUsageProvider heapUsageProvider) { + UnkeyedTable base = getBaseStore(); + GenericAppenderFactory appenderFactory = new GenericAppenderFactory(base.schema(), base.spec()); + appenderFactory.setAll(getMixedTable().properties()); + appenderFactory.set( + org.apache.iceberg.TableProperties.METRICS_MODE_COLUMN_CONF_PREFIX + + MetadataColumns.DELETE_FILE_PATH.name(), + MetricsModes.Full.get().toString()); + appenderFactory.set( + org.apache.iceberg.TableProperties.METRICS_MODE_COLUMN_CONF_PREFIX + + MetadataColumns.DELETE_FILE_POS.name(), + MetricsModes.Full.get().toString()); + + StructLike partitionData = TestHelpers.Row.of(); + return new SortedPosDeleteWriter<>( + appenderFactory, + new CommonOutputFileFactory( + base.location(), + base.spec(), + FileFormat.PARQUET, + getMixedTable().io(), + base.encryption(), + 0, + 0, + 0L), + getMixedTable().io(), + FileFormat.PARQUET, + 0, + 0, + partitionData, + recordsNumThreshold, + heapUsageRatioThreshold, + heapFlushMinRecords, + heapUsageProvider); + } + + private List recordCounts(List deleteFiles) { + return deleteFiles.stream().map(DeleteFile::recordCount).sorted().collect(Collectors.toList()); + } +} diff --git a/amoro-format-mixed/amoro-mixed-hive/src/main/java/org/apache/amoro/hive/io/writer/AdaptHiveGenericTaskWriterBuilder.java b/amoro-format-mixed/amoro-mixed-hive/src/main/java/org/apache/amoro/hive/io/writer/AdaptHiveGenericTaskWriterBuilder.java index 2d5e44af06..5bc7a65f20 100644 --- a/amoro-format-mixed/amoro-mixed-hive/src/main/java/org/apache/amoro/hive/io/writer/AdaptHiveGenericTaskWriterBuilder.java +++ b/amoro-format-mixed/amoro-mixed-hive/src/main/java/org/apache/amoro/hive/io/writer/AdaptHiveGenericTaskWriterBuilder.java @@ -161,6 +161,21 @@ public SortedPosDeleteWriter buildBasePosDeleteWriter( org.apache.iceberg.TableProperties.METRICS_MODE_COLUMN_CONF_PREFIX + MetadataColumns.DELETE_FILE_POS.name(), MetricsModes.Full.get().toString()); + double heapUsageRatioThreshold = + PropertyUtil.propertyAsDouble( + table.properties(), + TableProperties.POS_DELETE_FLUSH_HEAP_RATIO, + TableProperties.POS_DELETE_FLUSH_HEAP_RATIO_DEFAULT); + long recordsNumThreshold = + PropertyUtil.propertyAsLong( + table.properties(), + TableProperties.POS_DELETE_FLUSH_RECORDS, + TableProperties.POS_DELETE_FLUSH_RECORDS_DEFAULT); + int heapFlushMinRecords = + PropertyUtil.propertyAsInt( + table.properties(), + TableProperties.POS_DELETE_FLUSH_HEAP_MIN_RECORDS, + TableProperties.POS_DELETE_FLUSH_HEAP_MIN_RECORDS_DEFAULT); return new SortedPosDeleteWriter<>( appenderFactory, new CommonOutputFileFactory( @@ -176,7 +191,10 @@ public SortedPosDeleteWriter buildBasePosDeleteWriter( fileFormat, mask, index, - partitionKey); + partitionKey, + recordsNumThreshold, + heapUsageRatioThreshold, + heapFlushMinRecords); } private GenericBaseTaskWriter buildBaseWriter(LocationKind locationKind) { diff --git a/amoro-format-mixed/amoro-mixed-hive/src/main/java/org/apache/amoro/hive/optimizing/MixedHiveRewriteExecutor.java b/amoro-format-mixed/amoro-mixed-hive/src/main/java/org/apache/amoro/hive/optimizing/MixedHiveRewriteExecutor.java index 2c32301e4d..bf4ac56cde 100644 --- a/amoro-format-mixed/amoro-mixed-hive/src/main/java/org/apache/amoro/hive/optimizing/MixedHiveRewriteExecutor.java +++ b/amoro-format-mixed/amoro-mixed-hive/src/main/java/org/apache/amoro/hive/optimizing/MixedHiveRewriteExecutor.java @@ -65,7 +65,8 @@ protected FileWriter, DeleteWriteResult> posWriter() { encryptionManager(), getTransactionId(input.rePosDeletedDataFilesForMixed()), baseLocation(), - table.spec()); + table.spec(), + table.properties()); } @Override diff --git a/amoro-format-mixed/amoro-mixed-spark/v3.3/amoro-mixed-spark-3.3/src/main/java/org/apache/amoro/spark/io/UnkeyedUpsertSparkWriter.java b/amoro-format-mixed/amoro-mixed-spark/v3.3/amoro-mixed-spark-3.3/src/main/java/org/apache/amoro/spark/io/UnkeyedUpsertSparkWriter.java index 969ee9bb8b..2b6da019b4 100644 --- a/amoro-format-mixed/amoro-mixed-spark/v3.3/amoro-mixed-spark-3.3/src/main/java/org/apache/amoro/spark/io/UnkeyedUpsertSparkWriter.java +++ b/amoro-format-mixed/amoro-mixed-spark/v3.3/amoro-mixed-spark-3.3/src/main/java/org/apache/amoro/spark/io/UnkeyedUpsertSparkWriter.java @@ -28,6 +28,7 @@ import org.apache.amoro.spark.SparkInternalRowCastWrapper; import org.apache.amoro.spark.SparkInternalRowWrapper; import org.apache.amoro.table.MixedTable; +import org.apache.amoro.table.TableProperties; import org.apache.iceberg.DataFile; import org.apache.iceberg.DeleteFile; import org.apache.iceberg.FileFormat; @@ -38,6 +39,7 @@ import org.apache.iceberg.io.TaskWriter; import org.apache.iceberg.io.WriteResult; import org.apache.iceberg.spark.SparkSchemaUtil; +import org.apache.iceberg.util.PropertyUtil; import org.apache.spark.sql.catalyst.InternalRow; import java.io.IOException; @@ -57,6 +59,9 @@ public class UnkeyedUpsertSparkWriter implements TaskWriter { private final Schema schema; private final MixedTable table; private final SparkBaseTaskWriter writer; + private final double heapUsageRatioThreshold; + private final long recordsNumThreshold; + private final int heapFlushMinRecords; private final Map> writerMap = new HashMap<>(); private boolean closed = false; @@ -73,6 +78,21 @@ public UnkeyedUpsertSparkWriter( this.format = format; this.schema = schema; this.writer = writer; + this.heapUsageRatioThreshold = + PropertyUtil.propertyAsDouble( + table.properties(), + TableProperties.POS_DELETE_FLUSH_HEAP_RATIO, + TableProperties.POS_DELETE_FLUSH_HEAP_RATIO_DEFAULT); + this.recordsNumThreshold = + PropertyUtil.propertyAsLong( + table.properties(), + TableProperties.POS_DELETE_FLUSH_RECORDS, + TableProperties.POS_DELETE_FLUSH_RECORDS_DEFAULT); + this.heapFlushMinRecords = + PropertyUtil.propertyAsInt( + table.properties(), + TableProperties.POS_DELETE_FLUSH_HEAP_MIN_RECORDS, + TableProperties.POS_DELETE_FLUSH_HEAP_MIN_RECORDS_DEFAULT); } @Override @@ -90,7 +110,14 @@ public void write(T row) throws IOException { if (writerMap.get(partitionKey) == null) { SortedPosDeleteWriter writer = new SortedPosDeleteWriter<>( - appenderFactory, fileFactory, table.io(), format, partitionKey); + appenderFactory, + fileFactory, + table.io(), + format, + partitionKey, + recordsNumThreshold, + heapUsageRatioThreshold, + heapFlushMinRecords); writerMap.putIfAbsent(partitionKey, writer); } if (internalRow.getChangeAction() == ChangeAction.DELETE) { diff --git a/amoro-format-mixed/amoro-mixed-spark/v3.4/amoro-mixed-spark-3.4/src/main/java/org/apache/amoro/spark/io/UnkeyedUpsertSparkWriter.java b/amoro-format-mixed/amoro-mixed-spark/v3.4/amoro-mixed-spark-3.4/src/main/java/org/apache/amoro/spark/io/UnkeyedUpsertSparkWriter.java index 969ee9bb8b..2b6da019b4 100644 --- a/amoro-format-mixed/amoro-mixed-spark/v3.4/amoro-mixed-spark-3.4/src/main/java/org/apache/amoro/spark/io/UnkeyedUpsertSparkWriter.java +++ b/amoro-format-mixed/amoro-mixed-spark/v3.4/amoro-mixed-spark-3.4/src/main/java/org/apache/amoro/spark/io/UnkeyedUpsertSparkWriter.java @@ -28,6 +28,7 @@ import org.apache.amoro.spark.SparkInternalRowCastWrapper; import org.apache.amoro.spark.SparkInternalRowWrapper; import org.apache.amoro.table.MixedTable; +import org.apache.amoro.table.TableProperties; import org.apache.iceberg.DataFile; import org.apache.iceberg.DeleteFile; import org.apache.iceberg.FileFormat; @@ -38,6 +39,7 @@ import org.apache.iceberg.io.TaskWriter; import org.apache.iceberg.io.WriteResult; import org.apache.iceberg.spark.SparkSchemaUtil; +import org.apache.iceberg.util.PropertyUtil; import org.apache.spark.sql.catalyst.InternalRow; import java.io.IOException; @@ -57,6 +59,9 @@ public class UnkeyedUpsertSparkWriter implements TaskWriter { private final Schema schema; private final MixedTable table; private final SparkBaseTaskWriter writer; + private final double heapUsageRatioThreshold; + private final long recordsNumThreshold; + private final int heapFlushMinRecords; private final Map> writerMap = new HashMap<>(); private boolean closed = false; @@ -73,6 +78,21 @@ public UnkeyedUpsertSparkWriter( this.format = format; this.schema = schema; this.writer = writer; + this.heapUsageRatioThreshold = + PropertyUtil.propertyAsDouble( + table.properties(), + TableProperties.POS_DELETE_FLUSH_HEAP_RATIO, + TableProperties.POS_DELETE_FLUSH_HEAP_RATIO_DEFAULT); + this.recordsNumThreshold = + PropertyUtil.propertyAsLong( + table.properties(), + TableProperties.POS_DELETE_FLUSH_RECORDS, + TableProperties.POS_DELETE_FLUSH_RECORDS_DEFAULT); + this.heapFlushMinRecords = + PropertyUtil.propertyAsInt( + table.properties(), + TableProperties.POS_DELETE_FLUSH_HEAP_MIN_RECORDS, + TableProperties.POS_DELETE_FLUSH_HEAP_MIN_RECORDS_DEFAULT); } @Override @@ -90,7 +110,14 @@ public void write(T row) throws IOException { if (writerMap.get(partitionKey) == null) { SortedPosDeleteWriter writer = new SortedPosDeleteWriter<>( - appenderFactory, fileFactory, table.io(), format, partitionKey); + appenderFactory, + fileFactory, + table.io(), + format, + partitionKey, + recordsNumThreshold, + heapUsageRatioThreshold, + heapFlushMinRecords); writerMap.putIfAbsent(partitionKey, writer); } if (internalRow.getChangeAction() == ChangeAction.DELETE) { diff --git a/amoro-format-mixed/amoro-mixed-spark/v3.5/amoro-mixed-spark-3.5/src/main/java/org/apache/amoro/spark/io/UnkeyedUpsertSparkWriter.java b/amoro-format-mixed/amoro-mixed-spark/v3.5/amoro-mixed-spark-3.5/src/main/java/org/apache/amoro/spark/io/UnkeyedUpsertSparkWriter.java index 969ee9bb8b..2b6da019b4 100644 --- a/amoro-format-mixed/amoro-mixed-spark/v3.5/amoro-mixed-spark-3.5/src/main/java/org/apache/amoro/spark/io/UnkeyedUpsertSparkWriter.java +++ b/amoro-format-mixed/amoro-mixed-spark/v3.5/amoro-mixed-spark-3.5/src/main/java/org/apache/amoro/spark/io/UnkeyedUpsertSparkWriter.java @@ -28,6 +28,7 @@ import org.apache.amoro.spark.SparkInternalRowCastWrapper; import org.apache.amoro.spark.SparkInternalRowWrapper; import org.apache.amoro.table.MixedTable; +import org.apache.amoro.table.TableProperties; import org.apache.iceberg.DataFile; import org.apache.iceberg.DeleteFile; import org.apache.iceberg.FileFormat; @@ -38,6 +39,7 @@ import org.apache.iceberg.io.TaskWriter; import org.apache.iceberg.io.WriteResult; import org.apache.iceberg.spark.SparkSchemaUtil; +import org.apache.iceberg.util.PropertyUtil; import org.apache.spark.sql.catalyst.InternalRow; import java.io.IOException; @@ -57,6 +59,9 @@ public class UnkeyedUpsertSparkWriter implements TaskWriter { private final Schema schema; private final MixedTable table; private final SparkBaseTaskWriter writer; + private final double heapUsageRatioThreshold; + private final long recordsNumThreshold; + private final int heapFlushMinRecords; private final Map> writerMap = new HashMap<>(); private boolean closed = false; @@ -73,6 +78,21 @@ public UnkeyedUpsertSparkWriter( this.format = format; this.schema = schema; this.writer = writer; + this.heapUsageRatioThreshold = + PropertyUtil.propertyAsDouble( + table.properties(), + TableProperties.POS_DELETE_FLUSH_HEAP_RATIO, + TableProperties.POS_DELETE_FLUSH_HEAP_RATIO_DEFAULT); + this.recordsNumThreshold = + PropertyUtil.propertyAsLong( + table.properties(), + TableProperties.POS_DELETE_FLUSH_RECORDS, + TableProperties.POS_DELETE_FLUSH_RECORDS_DEFAULT); + this.heapFlushMinRecords = + PropertyUtil.propertyAsInt( + table.properties(), + TableProperties.POS_DELETE_FLUSH_HEAP_MIN_RECORDS, + TableProperties.POS_DELETE_FLUSH_HEAP_MIN_RECORDS_DEFAULT); } @Override @@ -90,7 +110,14 @@ public void write(T row) throws IOException { if (writerMap.get(partitionKey) == null) { SortedPosDeleteWriter writer = new SortedPosDeleteWriter<>( - appenderFactory, fileFactory, table.io(), format, partitionKey); + appenderFactory, + fileFactory, + table.io(), + format, + partitionKey, + recordsNumThreshold, + heapUsageRatioThreshold, + heapFlushMinRecords); writerMap.putIfAbsent(partitionKey, writer); } if (internalRow.getChangeAction() == ChangeAction.DELETE) {