diff --git a/amoro-ams/src/main/java/org/apache/amoro/server/utils/IcebergTableUtil.java b/amoro-ams/src/main/java/org/apache/amoro/server/utils/IcebergTableUtil.java index 0b96dddd80..d7d388b853 100644 --- a/amoro-ams/src/main/java/org/apache/amoro/server/utils/IcebergTableUtil.java +++ b/amoro-ams/src/main/java/org/apache/amoro/server/utils/IcebergTableUtil.java @@ -274,16 +274,23 @@ public static AbstractOptimizingPlanner createOptimizingPlanner( MixedTable table, double availableCore, long maxInputSizePerThread) { - Expression partitionFilter = - tableRuntime.getPendingInput() == null - ? Expressions.alwaysTrue() - : tableRuntime.getPendingInput().getPartitions().entrySet().stream() - .map( - entry -> - ExpressionUtil.convertPartitionDataToDataFilter( - table, entry.getKey(), entry.getValue())) - .reduce(Expressions::or) - .orElse(Expressions.alwaysTrue()); + AbstractOptimizingEvaluator.PendingInput pendingInput = tableRuntime.getPendingInput(); + Expression partitionFilter; + if (pendingInput == null) { + partitionFilter = Expressions.alwaysTrue(); + } else { + // Rebuild the in-memory partitions map from serialized partition paths, + // since the partitions field is @JsonIgnore and lost during DB persistence. + pendingInput.rebuildPartitions(table); + partitionFilter = + pendingInput.getPartitions().entrySet().stream() + .map( + entry -> + ExpressionUtil.convertPartitionDataToDataFilter( + table, entry.getKey(), entry.getValue())) + .reduce(Expressions::or) + .orElse(Expressions.alwaysTrue()); + } long processId = snowflakeIdGenerator.generateId(); ServerTableIdentifier identifier = tableRuntime.getTableIdentifier(); OptimizingConfig config = tableRuntime.getOptimizingConfig(); diff --git a/amoro-ams/src/test/java/org/apache/amoro/server/optimizing/plan/TestOptimizingEvaluator.java b/amoro-ams/src/test/java/org/apache/amoro/server/optimizing/plan/TestOptimizingEvaluator.java index 8b67cb5685..dfdad3c262 100644 --- a/amoro-ams/src/test/java/org/apache/amoro/server/optimizing/plan/TestOptimizingEvaluator.java +++ b/amoro-ams/src/test/java/org/apache/amoro/server/optimizing/plan/TestOptimizingEvaluator.java @@ -33,6 +33,7 @@ import org.apache.amoro.shade.guava32.com.google.common.collect.Lists; import org.apache.amoro.shade.guava32.com.google.common.collect.Maps; import org.apache.amoro.shade.guava32.com.google.common.collect.Sets; +import org.apache.amoro.shade.jackson2.com.fasterxml.jackson.databind.ObjectMapper; import org.apache.amoro.table.TableProperties; import org.apache.amoro.table.TableSnapshot; import org.apache.iceberg.DataFile; @@ -122,6 +123,65 @@ public void testFragmentFiles() { assertInput(pendingInput, FileInfo.buildFileInfo(dataFiles)); } + @Test + public void testPendingInputSerializationRoundTrip() throws Exception { + closeFullOptimizingInterval(); + updateBaseHashBucket(1); + List newRecords = + OptimizingTestHelpers.generateRecord(tableTestHelper(), 1, 4, "2022-01-01T12:00:00"); + long transactionId = beginTransaction(); + OptimizingTestHelpers.appendBase( + getMixedTable(), + tableTestHelper().writeBaseStore(getMixedTable(), transactionId, newRecords, false)); + + newRecords = + OptimizingTestHelpers.generateRecord(tableTestHelper(), 5, 8, "2022-01-01T12:00:00"); + transactionId = beginTransaction(); + OptimizingTestHelpers.appendBase( + getMixedTable(), + tableTestHelper().writeBaseStore(getMixedTable(), transactionId, newRecords, false)); + + AbstractOptimizingEvaluator evaluator = buildOptimizingEvaluator(); + Assert.assertTrue(evaluator.isNecessary()); + AbstractOptimizingEvaluator.PendingInput original = evaluator.getOptimizingPendingInput(); + Assert.assertFalse(original.getPartitions().isEmpty()); + Assert.assertFalse(original.getPartitionPaths().isEmpty()); + + // Build partition paths before serialization (simulating what happens before DB persistence) + original.buildPartitionPaths(getMixedTable()); + + // Serialize to JSON and deserialize back (simulating DB persistence round-trip) + ObjectMapper mapper = new ObjectMapper(); + String json = mapper.writeValueAsString(original); + AbstractOptimizingEvaluator.PendingInput deserialized = + mapper.readValue(json, AbstractOptimizingEvaluator.PendingInput.class); + + // After deserialization, partitions map should be empty (it's @JsonIgnore) + Assert.assertTrue(deserialized.getPartitions().isEmpty()); + // But partitionPaths should be preserved + Assert.assertEquals(original.getPartitionPaths(), deserialized.getPartitionPaths()); + + // Rebuild partitions from paths + deserialized.rebuildPartitions(getMixedTable()); + Assert.assertFalse(deserialized.getPartitions().isEmpty()); + Assert.assertEquals(original.getPartitions().size(), deserialized.getPartitions().size()); + // Verify each specId has the same number of partitions + for (Map.Entry> entry : original.getPartitions().entrySet()) { + Assert.assertTrue(deserialized.getPartitions().containsKey(entry.getKey())); + Assert.assertEquals( + entry.getValue().size(), deserialized.getPartitions().get(entry.getKey()).size()); + } + + // Verify other fields are preserved + Assert.assertEquals(original.getDataFileCount(), deserialized.getDataFileCount()); + Assert.assertEquals(original.getDataFileSize(), deserialized.getDataFileSize()); + Assert.assertEquals( + original.getEqualityDeleteFileCount(), deserialized.getEqualityDeleteFileCount()); + Assert.assertEquals( + original.getPositionalDeleteFileCount(), deserialized.getPositionalDeleteFileCount()); + Assert.assertEquals(original.getHealthScore(), deserialized.getHealthScore()); + } + @Test public void testFragmentFilesWithPartitionFilterTimeStamp() { getMixedTable() diff --git a/amoro-format-iceberg/src/main/java/org/apache/amoro/optimizing/plan/AbstractOptimizingEvaluator.java b/amoro-format-iceberg/src/main/java/org/apache/amoro/optimizing/plan/AbstractOptimizingEvaluator.java index 5f9061987e..a4c87fc170 100644 --- a/amoro-format-iceberg/src/main/java/org/apache/amoro/optimizing/plan/AbstractOptimizingEvaluator.java +++ b/amoro-format-iceberg/src/main/java/org/apache/amoro/optimizing/plan/AbstractOptimizingEvaluator.java @@ -29,10 +29,14 @@ import org.apache.amoro.shade.guava32.com.google.common.collect.Maps; import org.apache.amoro.shade.guava32.com.google.common.collect.Sets; import org.apache.amoro.shade.jackson2.com.fasterxml.jackson.annotation.JsonIgnore; +import org.apache.amoro.shade.jackson2.com.fasterxml.jackson.annotation.JsonProperty; import org.apache.amoro.table.KeyedTableSnapshot; import org.apache.amoro.table.MixedTable; import org.apache.amoro.table.TableSnapshot; import org.apache.amoro.utils.ExpressionUtil; +import org.apache.amoro.utils.MixedDataFiles; +import org.apache.amoro.utils.MixedTableUtil; +import org.apache.amoro.utils.TablePropertyUtil; import org.apache.iceberg.PartitionSpec; import org.apache.iceberg.Snapshot; import org.apache.iceberg.SnapshotSummary; @@ -166,22 +170,29 @@ public PendingInput getPendingInput() { // to be inconsistent with the snapshot summary of iceberg if (TableFormat.ICEBERG == mixedTable.format()) { Snapshot snapshot = mixedTable.asUnkeyedTable().snapshot(currentSnapshot.snapshotId()); - return new PendingInput(partitionPlanMap.values(), snapshot); + PendingInput pendingInput = new PendingInput(partitionPlanMap.values(), snapshot); + pendingInput.buildPartitionPaths(mixedTable); + return pendingInput; } - return new PendingInput(partitionPlanMap.values()); + PendingInput pendingInput = new PendingInput(partitionPlanMap.values()); + pendingInput.buildPartitionPaths(mixedTable); + return pendingInput; } public PendingInput getOptimizingPendingInput() { if (!isInitialized) { initEvaluator(); } - return new PendingInput(needOptimizingPlanMap.values()); + PendingInput pendingInput = new PendingInput(needOptimizingPlanMap.values()); + pendingInput.buildPartitionPaths(mixedTable); + return pendingInput; } public static class PendingInput { @JsonIgnore private final Map> partitions = Maps.newHashMap(); + private Set partitionPaths = Sets.newHashSet(); private int totalFileCount = 0; private long totalFileSize = 0L; @@ -275,6 +286,72 @@ public Map> getPartitions() { return partitions; } + /** + * Get serializable partition paths for JSON persistence. Each entry is in the format + * "specId:partitionPath", e.g. "1:dt=2024-01-01/hour=00". + */ + @JsonProperty("partitionPaths") + public Set getPartitionPaths() { + return partitionPaths; + } + + @JsonProperty("partitionPaths") + public void setPartitionPaths(Set partitionPaths) { + this.partitionPaths = partitionPaths != null ? partitionPaths : Sets.newHashSet(); + } + + /** + * Rebuild the in-memory partitions map from serializable partition paths. Must be called before + * {@link #getPartitions()} when PendingInput is deserialized from JSON. + * + * @param table the table to resolve partition specs + */ + public void rebuildPartitions(MixedTable table) { + if (!partitions.isEmpty() || partitionPaths.isEmpty()) { + return; + } + for (String path : partitionPaths) { + try { + int colonIdx = path.indexOf(':'); + if (colonIdx <= 0) { + continue; + } + int specId = Integer.parseInt(path.substring(0, colonIdx)); + String partitionPath = path.substring(colonIdx + 1); + PartitionSpec spec = MixedTableUtil.getMixedTablePartitionSpecById(table, specId); + if (spec != null) { + StructLike struct; + if (spec.isUnpartitioned() || partitionPath.isEmpty()) { + // For unpartitioned tables, use an empty record since there's no path to parse + struct = TablePropertyUtil.EMPTY_STRUCT; + } else { + struct = MixedDataFiles.data(spec, partitionPath); + } + partitions.computeIfAbsent(specId, k -> Sets.newHashSet()).add(struct); + } + } catch (Exception e) { + LOG.warn("Failed to rebuild partition from path: {}", path, e); + } + } + } + + /** Build serializable partition paths from in-memory partitions. Called before persisting. */ + public void buildPartitionPaths(MixedTable table) { + if (!partitionPaths.isEmpty()) { + return; + } + for (Map.Entry> entry : partitions.entrySet()) { + int specId = entry.getKey(); + PartitionSpec spec = MixedTableUtil.getMixedTablePartitionSpecById(table, specId); + if (spec == null) { + continue; + } + for (StructLike struct : entry.getValue()) { + partitionPaths.add(specId + ":" + spec.partitionToPath(struct)); + } + } + } + public int getDataFileCount() { return dataFileCount; } diff --git a/amoro-format-iceberg/src/test/java/org/apache/amoro/optimizing/plan/TestPendingInputPartitionSerialization.java b/amoro-format-iceberg/src/test/java/org/apache/amoro/optimizing/plan/TestPendingInputPartitionSerialization.java new file mode 100644 index 0000000000..2f5f3e3ad9 --- /dev/null +++ b/amoro-format-iceberg/src/test/java/org/apache/amoro/optimizing/plan/TestPendingInputPartitionSerialization.java @@ -0,0 +1,205 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.amoro.optimizing.plan; + +import org.apache.amoro.TableFormat; +import org.apache.amoro.optimizing.plan.AbstractOptimizingEvaluator.PendingInput; +import org.apache.amoro.shade.guava32.com.google.common.collect.Sets; +import org.apache.amoro.shade.jackson2.com.fasterxml.jackson.databind.ObjectMapper; +import org.apache.amoro.table.MixedTable; +import org.apache.amoro.utils.MixedDataFiles; +import org.apache.amoro.utils.TablePropertyUtil; +import org.apache.iceberg.PartitionSpec; +import org.apache.iceberg.Schema; +import org.apache.iceberg.StructLike; +import org.apache.iceberg.types.Types; +import org.junit.Assert; +import org.junit.Test; +import org.mockito.Mockito; + +/** + * Tests for PendingInput partition serialization round-trip with custom partition transforms + * (month, hour) and unpartitioned tables. Verifies that MixedDataFiles.data() correctly handles + * transforms that Iceberg's native DataFiles.data() cannot parse. + */ +public class TestPendingInputPartitionSerialization { + + private static final Schema SCHEMA = + new Schema( + Types.NestedField.required(1, "id", Types.IntegerType.get()), + Types.NestedField.required(2, "ts", Types.LongType.get()), + Types.NestedField.required(3, "op_time", Types.TimestampType.withoutZone())); + + private MixedTable mockTable(PartitionSpec spec) { + MixedTable mock = Mockito.mock(MixedTable.class); + Mockito.when(mock.format()).thenReturn(TableFormat.MIXED_ICEBERG); + Mockito.when(mock.spec()).thenReturn(spec); + return mock; + } + + /** + * Test rebuildPartitions with month partition transform. Month partitions from Mixed-format are + * serialized as "yyyy-MM" (e.g., "month=2024-01"), which Iceberg's native DataFiles.data() cannot + * parse (it expects an integer month offset). MixedDataFiles.data() handles this correctly. + */ + @Test + public void testMonthPartitionTransformRebuild() throws Exception { + PartitionSpec spec = PartitionSpec.builderFor(SCHEMA).month("op_time", "month").build(); + MixedTable mockTable = mockTable(spec); + + // Directly construct partition path as Mixed-format serializes it: "month=2024-01" + String partitionPath = "month=2024-01"; + String fullPath = spec.specId() + ":" + partitionPath; + + PendingInput input = new PendingInput(); + input.getPartitionPaths().add(fullPath); + + // Verify MixedDataFiles.data() can parse this + StructLike parsed = MixedDataFiles.data(spec, partitionPath); + Assert.assertNotNull("MixedDataFiles should parse month partition", parsed); + + // Rebuild partitions from paths + input.rebuildPartitions(mockTable); + Assert.assertFalse("Partitions should be rebuilt", input.getPartitions().isEmpty()); + Assert.assertEquals(1, input.getPartitions().get(spec.specId()).size()); + } + + /** + * Test rebuildPartitions with hour partition transform. Hour partitions from Mixed-format are + * serialized as "yyyy-MM-dd-HH", which Iceberg's native parser cannot handle. + */ + @Test + public void testHourPartitionTransformRebuild() throws Exception { + PartitionSpec spec = PartitionSpec.builderFor(SCHEMA).hour("op_time", "hour").build(); + MixedTable mockTable = mockTable(spec); + + // Directly construct partition path as Mixed-format serializes it: "hour=2024-01-15-10" + String partitionPath = "hour=2024-01-15-10"; + String fullPath = spec.specId() + ":" + partitionPath; + + PendingInput input = new PendingInput(); + input.getPartitionPaths().add(fullPath); + + // Verify MixedDataFiles.data() can parse this + StructLike parsed = MixedDataFiles.data(spec, partitionPath); + Assert.assertNotNull("MixedDataFiles should parse hour partition", parsed); + + // Rebuild partitions from paths + input.rebuildPartitions(mockTable); + Assert.assertFalse("Partitions should be rebuilt", input.getPartitions().isEmpty()); + Assert.assertEquals(1, input.getPartitions().get(spec.specId()).size()); + } + + /** + * Test full round-trip: buildPartitionPaths -> JSON serialize -> JSON deserialize -> + * rebuildPartitions for month transform. This simulates the complete DB persistence cycle. + */ + @Test + public void testMonthPartitionFullRoundTrip() throws Exception { + PartitionSpec spec = PartitionSpec.builderFor(SCHEMA).month("op_time", "month").build(); + MixedTable mockTable = mockTable(spec); + + PendingInput input = new PendingInput(); + // Add two different month partitions + input.getPartitionPaths().add(spec.specId() + ":month=2024-01"); + input.getPartitionPaths().add(spec.specId() + ":month=2024-06"); + + // JSON round-trip + ObjectMapper mapper = new ObjectMapper(); + String json = mapper.writeValueAsString(input); + PendingInput deserialized = mapper.readValue(json, PendingInput.class); + + Assert.assertEquals(2, deserialized.getPartitionPaths().size()); + + // Rebuild + deserialized.rebuildPartitions(mockTable); + Assert.assertEquals(2, deserialized.getPartitions().get(spec.specId()).size()); + } + + /** + * Test round-trip for unpartitioned tables. Unpartitioned tables use EMPTY_STRUCT and should + * serialize as "specId:" with empty path. + */ + @Test + public void testUnpartitionedTableRoundTrip() throws Exception { + PartitionSpec spec = PartitionSpec.unpartitioned(); + MixedTable mockTable = mockTable(spec); + + PendingInput input = new PendingInput(); + input + .getPartitions() + .computeIfAbsent(spec.specId(), k -> Sets.newHashSet()) + .add(TablePropertyUtil.EMPTY_STRUCT); + + // Build partition paths + input.buildPartitionPaths(mockTable); + Assert.assertFalse(input.getPartitionPaths().isEmpty()); + + String path = input.getPartitionPaths().iterator().next(); + Assert.assertTrue( + "Unpartitioned path should be specId: with empty path, got: " + path, + path.equals(spec.specId() + ":")); + + // JSON round-trip + ObjectMapper mapper = new ObjectMapper(); + String json = mapper.writeValueAsString(input); + PendingInput deserialized = mapper.readValue(json, PendingInput.class); + Assert.assertEquals(input.getPartitionPaths(), deserialized.getPartitionPaths()); + + // Rebuild + deserialized.rebuildPartitions(mockTable); + Assert.assertFalse(deserialized.getPartitions().isEmpty()); + Assert.assertEquals(1, deserialized.getPartitions().get(spec.specId()).size()); + + // Verify the rebuilt struct has 0 fields (EMPTY_STRUCT) + StructLike rebuilt = deserialized.getPartitions().get(spec.specId()).iterator().next(); + Assert.assertEquals(0, rebuilt.size()); + } + + /** Test that Hive null sentinel is handled without throwing exceptions. */ + @Test + public void testHiveNullSentinel() throws Exception { + PartitionSpec spec = PartitionSpec.builderFor(SCHEMA).month("op_time", "month").build(); + MixedTable mockTable = mockTable(spec); + + PendingInput input = new PendingInput(); + input.getPartitionPaths().add(spec.specId() + ":month=__HIVE_DEFAULT_PARTITION__"); + + // Rebuild should not throw + input.rebuildPartitions(mockTable); + // Result may vary - key is no exception + } + + /** Test that empty PendingInput (no partitions) round-trips correctly. */ + @Test + public void testEmptyPendingInput() throws Exception { + PartitionSpec spec = PartitionSpec.unpartitioned(); + MixedTable mockTable = mockTable(spec); + + PendingInput input = new PendingInput(); + // Both partitions and partitionPaths are empty + + input.buildPartitionPaths(mockTable); + Assert.assertTrue( + "Empty input should have no partition paths", input.getPartitionPaths().isEmpty()); + + input.rebuildPartitions(mockTable); + Assert.assertTrue("Empty input should have no partitions", input.getPartitions().isEmpty()); + } +}