From 4da05589cb081bbfa8a31a9eaa0afc6b01968158 Mon Sep 17 00:00:00 2001 From: Gerald Berger Date: Tue, 21 Apr 2026 18:03:54 +0200 Subject: [PATCH 1/5] fix(incremental-scan): treat EXISTING manifest entries as appends when from=None When `from_snapshot_id` is None (scan from the beginning of time), the incremental scan now uses full-scan semantics instead of silently dropping EXISTING manifest entries. Previously, data files from expired snapshots that appear as EXISTING entries in later manifests were missed, causing incomplete results. Changes: - `IncrementalPlanContext` gains a `from_snapshot_id: Option` field to track whether the user provided an explicit starting snapshot. - `build_manifest_file_contexts`: when `from=None`, reads ALL manifests from `to_snapshot`'s manifest list without filtering by `added_snapshot_id`, and clears `filter_fn` so entries from expired snapshots pass through. - `plan_files`: when `from=None`, treats EXISTING data entries as appends (alongside Added entries) and skips Deleted entries; also skips `from_snapshot_collection` since equality deletes are already handled per-file via `AppendedFileScanTask`. - Adds regression test that simulates an expired snapshot: snapshot 1 is removed from the catalog metadata while its data files (referenced as EXISTING entries in snapshot 2's manifest) must still appear in the append stream. Co-Authored-By: Claude Sonnet 4.6 --- .../iceberg/src/scan/incremental/context.rs | 41 +++++- crates/iceberg/src/scan/incremental/mod.rs | 22 +++- crates/iceberg/src/scan/incremental/tests.rs | 124 ++++++++++++++++++ 3 files changed, 176 insertions(+), 11 deletions(-) diff --git a/crates/iceberg/src/scan/incremental/context.rs b/crates/iceberg/src/scan/incremental/context.rs index c60c3461d8..874e09b4f1 100644 --- a/crates/iceberg/src/scan/incremental/context.rs +++ b/crates/iceberg/src/scan/incremental/context.rs @@ -38,6 +38,11 @@ pub(crate) struct IncrementalPlanContext { /// The snapshot to start the incremental scan from. pub from_snapshot: SnapshotRef, + /// The user-provided from_snapshot_id (None means "scan from the beginning"). + /// Used to distinguish a true "from beginning" scan (full scan semantics) from + /// an incremental scan with an explicit starting snapshot. + pub from_snapshot_id: Option, + /// The metadata of the table being scanned. pub table_metadata: TableMetadataRef, @@ -67,13 +72,41 @@ impl IncrementalPlanContext { delete_file_idx: DeleteFileIndex, delete_file_tx: Sender, ) -> Result> + 'static>> { - // Collect all snapshot IDs (all operation types are supported) - let snapshot_ids: HashSet = self.snapshots.iter().map(|s| s.snapshot_id()).collect(); - // Separate delete and data manifests to ensure deletes are processed first. // This prevents deadlock by ensuring delete processing completes // (and builds the delete filter) before data manifests are fetched. - let (delete_manifests, data_manifests, filter_fn) = { + let (delete_manifests, data_manifests, filter_fn) = if self.from_snapshot_id.is_none() { + // When from=None, use full-scan semantics: read ALL manifests from to_snapshot's + // manifest list without filtering by added_snapshot_id. This handles tables where + // older snapshots have been expired — their data files appear as EXISTING entries + // in later manifests and would otherwise be silently dropped. + let to_snapshot = self.snapshots.first().expect("snapshots is non-empty"); + let manifest_list = self + .object_cache + .get_manifest_list(to_snapshot, &self.table_metadata) + .await?; + + let mut delete_manifests = HashSet::::new(); + let mut data_manifests = HashSet::::new(); + for entry in manifest_list.entries() { + if entry.content == ManifestContentType::Deletes { + delete_manifests.insert(entry.clone()); + } else { + data_manifests.insert(entry.clone()); + } + } + + // No entry-level filtering: include entries from expired snapshots too. + ( + delete_manifests, + data_manifests, + None::>, + ) + } else { + // Collect all snapshot IDs (all operation types are supported) + let snapshot_ids: HashSet = + self.snapshots.iter().map(|s| s.snapshot_id()).collect(); + let mut delete_manifests = HashSet::::new(); let mut data_manifests = HashSet::::new(); diff --git a/crates/iceberg/src/scan/incremental/mod.rs b/crates/iceberg/src/scan/incremental/mod.rs index 74b9090f4e..690f88e8b5 100644 --- a/crates/iceberg/src/scan/incremental/mod.rs +++ b/crates/iceberg/src/scan/incremental/mod.rs @@ -370,6 +370,7 @@ impl<'a> IncrementalTableScanBuilder<'a> { let plan_context = IncrementalPlanContext { snapshots, from_snapshot: snapshot_from, + from_snapshot_id: self.from_snapshot_id, table_metadata: self.table.metadata_ref(), to_snapshot_schema: schema, object_cache: self.table.object_cache().clone(), @@ -504,8 +505,11 @@ impl IncrementalTableScan { // Collect data files that were live at from_snapshot. These are needed to generate // equality delete tasks for files that predate the scan range. - // Runs in parallel with the rest of the planning. - let from_snapshot_collection = if all_deletes.iter().any(is_equality_delete) { + // Skipped when from=None because in that case all data files are treated as appends + // (full-scan semantics), so equality deletes are handled via AppendedFileScanTask. + let from_snapshot_collection = if self.plan_context.from_snapshot_id.is_some() + && all_deletes.iter().any(is_equality_delete) + { Some(Self::spawn_manifest_entry_collection( self.plan_context.from_snapshot.clone(), self.plan_context.table_metadata.clone(), @@ -536,6 +540,7 @@ impl IncrementalTableScan { // Process the data file [`ManifestEntry`] stream in parallel let filter = delete_filter.clone(); let table_metadata = self.plan_context.table_metadata.clone(); + let from_snapshot_is_none = self.plan_context.from_snapshot_id.is_none(); spawn(async move { let result = manifest_entry_data_ctx_rx .map(|me_ctx| Ok((me_ctx, file_scan_task_tx.clone()))) @@ -545,8 +550,13 @@ impl IncrementalTableScan { let filter = filter.clone(); let table_metadata = table_metadata.clone(); async move { - if manifest_entry_context.manifest_entry.status() - == ManifestStatus::Added + let status = manifest_entry_context.manifest_entry.status(); + // When from=None (full-scan semantics), treat both Added and Existing + // entries as appends. Existing entries arise from expired snapshots or + // manifest rewrites — they represent live files that must be included. + // Deleted entries are skipped since they are not live. + if status == ManifestStatus::Added + || (from_snapshot_is_none && status == ManifestStatus::Existing) { spawn(async move { Self::process_data_manifest_entry( @@ -558,9 +568,7 @@ impl IncrementalTableScan { .await }) .await - } else if manifest_entry_context.manifest_entry.status() - == ManifestStatus::Deleted - { + } else if status == ManifestStatus::Deleted && !from_snapshot_is_none { spawn(async move { Self::process_deleted_data_manifest_entry( tx, diff --git a/crates/iceberg/src/scan/incremental/tests.rs b/crates/iceberg/src/scan/incremental/tests.rs index 468e834b76..7115929a23 100644 --- a/crates/iceberg/src/scan/incremental/tests.rs +++ b/crates/iceberg/src/scan/incremental/tests.rs @@ -2692,6 +2692,130 @@ async fn test_incremental_scan_includes_root_when_from_is_none() { ); } +#[tokio::test] +async fn test_incremental_scan_from_none_includes_existing_entries_from_expired_snapshots() { + // Simulate a table where older snapshots have been expired from the catalog. + // The only visible snapshot's manifest has EXISTING entries for data files that + // were added by the now-expired snapshot. Their snapshot_id is not in the + // accessible ancestry, so the old incremental-scan code dropped them silently. + // After the fix, from=None uses full-scan semantics and includes EXISTING entries. + + let fixture = IncrementalTestFixture::new(vec![ + // Snapshot 1 (will be expired): adds rows 1 and 2 + Operation::Add( + vec![(1, "a".to_string()), (2, "b".to_string())], + "snap1-data.parquet".to_string(), + ), + // Snapshot 2 (remains visible): adds row 3 + Operation::Add(vec![(3, "c".to_string())], "snap2-data.parquet".to_string()), + ]) + .await; + + // Snapshot 2's manifest (snap-2-manifest-list.avro) contains: + // - EXISTING entry for snap1-data.parquet (snapshot_id = 1, i.e. expired) + // - ADDED entry for snap2-data.parquet (snapshot_id = 2) + // + // Build table metadata that includes ONLY snapshot 2, simulating expiry of + // snapshot 1. Snapshot 2's parent-snapshot-id still points at 1, but since 1 + // is absent, ancestors_of() stops at 2, giving snapshot_ids = {2}. + // Snapshot 2's manifest has Existing entries with snapshot_id=1 which is NOT in + // {2}, so the old code silently dropped them. + let snap2_manifest_list = format!( + "{}/metadata/snap-2-manifest-list.avro", + fixture.table_location + ); + let metadata_json = format!( + r#"{{ + "format-version": 2, + "table-uuid": "{uuid}", + "location": "{loc}", + "last-sequence-number": 1, + "last-updated-ms": 1602638573590, + "last-column-id": 2, + "current-schema-id": 0, + "schemas": [ + {{ + "type": "struct", + "schema-id": 0, + "fields": [ + {{"id": 1, "name": "n", "required": true, "type": "int"}}, + {{"id": 2, "name": "data", "required": true, "type": "string"}} + ] + }} + ], + "default-spec-id": 0, + "partition-specs": [{{"spec-id": 0, "fields": []}}], + "last-partition-id": 0, + "default-sort-order-id": 0, + "sort-orders": [{{"order-id": 0, "fields": []}}], + "properties": {{}}, + "current-snapshot-id": 2, + "snapshots": [ + {{ + "snapshot-id": 2, + "parent-snapshot-id": 1, + "timestamp-ms": 1515100956770, + "sequence-number": 1, + "summary": {{"operation": "append"}}, + "manifest-list": "{mlist}", + "schema-id": 0 + }} + ], + "snapshot-log": [{{"snapshot-id": 2, "timestamp-ms": 1515100956770}}], + "metadata-log": [] +}}"#, + uuid = Uuid::new_v4(), + loc = fixture.table_location, + mlist = snap2_manifest_list, + ); + let modified_metadata: TableMetadata = serde_json::from_str(&metadata_json).unwrap(); + + let table = Table::builder() + .metadata(modified_metadata) + .identifier(TableIdent::from_strs(["db", "incremental_test"]).unwrap()) + .file_io(fixture.table.file_io().clone()) + .metadata_location( + format!("{}/metadata/v1.json", fixture.table_location).as_str(), + ) + .build() + .unwrap(); + + // from=None: should return ALL live rows including those from the expired snapshot. + let scan = table.incremental_scan(None, None).build().unwrap(); + let stream = scan.to_arrow().await.unwrap(); + let batches: Vec<_> = stream.try_collect().await.unwrap(); + + let mut n_values: Vec = Vec::new(); + for (batch_type, batch) in &batches { + if *batch_type == crate::arrow::IncrementalBatchType::Append { + let arr = batch + .column_by_name("n") + .unwrap() + .as_primitive::(); + for i in 0..batch.num_rows() { + n_values.push(arr.value(i)); + } + } + } + n_values.sort(); + + assert_eq!( + n_values, + vec![1, 2, 3], + "from=None must include EXISTING entries from expired snapshots" + ); + + let delete_rows: usize = batches + .iter() + .filter(|(t, _)| *t == crate::arrow::IncrementalBatchType::Delete) + .map(|(_, b)| b.num_rows()) + .sum(); + assert_eq!( + delete_rows, 0, + "from=None should produce no delete-stream entries" + ); +} + #[tokio::test] async fn test_incremental_scan_with_file_column() { // Test that the _file metadata column works correctly in incremental scans From 0e6f7a857ad720eab3509a30ceb68841f0512cfb Mon Sep 17 00:00:00 2001 From: Gerald Berger Date: Tue, 21 Apr 2026 18:08:18 +0200 Subject: [PATCH 2/5] Format --- crates/iceberg/src/scan/incremental/tests.rs | 4 +--- 1 file changed, 1 insertion(+), 3 deletions(-) diff --git a/crates/iceberg/src/scan/incremental/tests.rs b/crates/iceberg/src/scan/incremental/tests.rs index 7115929a23..285226c193 100644 --- a/crates/iceberg/src/scan/incremental/tests.rs +++ b/crates/iceberg/src/scan/incremental/tests.rs @@ -2774,9 +2774,7 @@ async fn test_incremental_scan_from_none_includes_existing_entries_from_expired_ .metadata(modified_metadata) .identifier(TableIdent::from_strs(["db", "incremental_test"]).unwrap()) .file_io(fixture.table.file_io().clone()) - .metadata_location( - format!("{}/metadata/v1.json", fixture.table_location).as_str(), - ) + .metadata_location(format!("{}/metadata/v1.json", fixture.table_location).as_str()) .build() .unwrap(); From 37bee4fddf0dd759a36f744b6b4e3bc14b3ebad3 Mon Sep 17 00:00:00 2001 From: Gerald Berger Date: Tue, 21 Apr 2026 18:12:34 +0200 Subject: [PATCH 3/5] ci: fix zizmor ref-version-mismatch warnings in workflow files Update version comments to match the exact tags that the pinned commit SHAs correspond to, fixing zizmor v1.23.1 ref-version-mismatch findings. Co-Authored-By: Claude Sonnet 4.6 --- .github/workflows/bindings_python_ci.yml | 2 +- .github/workflows/codeql.yml | 4 ++-- .github/workflows/release_python.yml | 8 ++++---- .github/workflows/release_python_nightly.yml | 8 ++++---- 4 files changed, 11 insertions(+), 11 deletions(-) diff --git a/.github/workflows/bindings_python_ci.yml b/.github/workflows/bindings_python_ci.yml index a7abfcbeed..a02ae9f0af 100644 --- a/.github/workflows/bindings_python_ci.yml +++ b/.github/workflows/bindings_python_ci.yml @@ -95,7 +95,7 @@ jobs: - uses: actions/setup-python@a309ff8b426b58ec0e2a45f0f869d46889d02405 # v6 with: python-version: 3.12 - - uses: PyO3/maturin-action@04ac600d27cdf7a9a280dadf7147097c42b757ad # v1 + - uses: PyO3/maturin-action@04ac600d27cdf7a9a280dadf7147097c42b757ad # v1.50.1 with: working-directory: "bindings/python" command: build diff --git a/.github/workflows/codeql.yml b/.github/workflows/codeql.yml index 81bc6b16f8..fe0459aeb7 100644 --- a/.github/workflows/codeql.yml +++ b/.github/workflows/codeql.yml @@ -46,11 +46,11 @@ jobs: persist-credentials: false - name: Initialize CodeQL - uses: github/codeql-action/init@c10b8064de6f491fea524254123dbe5e09572f13 # v4 + uses: github/codeql-action/init@c10b8064de6f491fea524254123dbe5e09572f13 # v4.35.1 with: languages: actions - name: Perform CodeQL Analysis - uses: github/codeql-action/analyze@c10b8064de6f491fea524254123dbe5e09572f13 # v4 + uses: github/codeql-action/analyze@c10b8064de6f491fea524254123dbe5e09572f13 # v4.35.1 with: category: "/language:actions" diff --git a/.github/workflows/release_python.yml b/.github/workflows/release_python.yml index b19fa165dc..c9817e064c 100644 --- a/.github/workflows/release_python.yml +++ b/.github/workflows/release_python.yml @@ -124,13 +124,13 @@ jobs: env: NEEDS_VALIDATE_RELEASE_TAG_OUTPUTS_CARGO_VERSION: ${{ needs.validate-release-tag.outputs.cargo-version }} - - uses: PyO3/maturin-action@04ac600d27cdf7a9a280dadf7147097c42b757ad # v1 + - uses: PyO3/maturin-action@04ac600d27cdf7a9a280dadf7147097c42b757ad # v1.50.1 with: working-directory: "bindings/python" command: sdist args: -o dist - name: Upload sdist - uses: actions/upload-artifact@bbbca2ddaa5d8feaa63e36b76fdaad77386f024f # v7 + uses: actions/upload-artifact@bbbca2ddaa5d8feaa63e36b76fdaad77386f024f # v7.0.0 with: name: wheels-sdist path: bindings/python/dist @@ -184,7 +184,7 @@ jobs: uses: ./.github/actions/setup-builder with: rust-version: ${{ steps.get-msrv.outputs.msrv }} - - uses: PyO3/maturin-action@04ac600d27cdf7a9a280dadf7147097c42b757ad # v1 + - uses: PyO3/maturin-action@04ac600d27cdf7a9a280dadf7147097c42b757ad # v1.50.1 with: target: ${{ matrix.target }} manylinux: ${{ matrix.manylinux || 'auto' }} @@ -192,7 +192,7 @@ jobs: command: build args: --release -o dist -i python3.12 # Explicitly set interpreter; manylinux containers have multiple Pythons and maturin may pick an older one - name: Upload wheels - uses: actions/upload-artifact@bbbca2ddaa5d8feaa63e36b76fdaad77386f024f # v7 + uses: actions/upload-artifact@bbbca2ddaa5d8feaa63e36b76fdaad77386f024f # v7.0.0 with: name: wheels-${{ matrix.os }}-${{ matrix.target }} path: bindings/python/dist diff --git a/.github/workflows/release_python_nightly.yml b/.github/workflows/release_python_nightly.yml index 66ae0e1db2..55695784e9 100644 --- a/.github/workflows/release_python_nightly.yml +++ b/.github/workflows/release_python_nightly.yml @@ -48,14 +48,14 @@ jobs: with: timestamp: ${{ needs.set-version.outputs.TIMESTAMP }} - - uses: PyO3/maturin-action@04ac600d27cdf7a9a280dadf7147097c42b757ad # v1 + - uses: PyO3/maturin-action@04ac600d27cdf7a9a280dadf7147097c42b757ad # v1.50.1 with: working-directory: "bindings/python" command: sdist args: -o dist - name: Upload sdist - uses: actions/upload-artifact@bbbca2ddaa5d8feaa63e36b76fdaad77386f024f # v7 + uses: actions/upload-artifact@bbbca2ddaa5d8feaa63e36b76fdaad77386f024f # v7.0.0 with: name: wheels-sdist path: bindings/python/dist @@ -98,7 +98,7 @@ jobs: with: rust-version: ${{ steps.get-msrv.outputs.msrv }} - - uses: PyO3/maturin-action@04ac600d27cdf7a9a280dadf7147097c42b757ad # v1 + - uses: PyO3/maturin-action@04ac600d27cdf7a9a280dadf7147097c42b757ad # v1.50.1 with: target: ${{ matrix.target }} manylinux: ${{ matrix.manylinux || 'auto' }} @@ -107,7 +107,7 @@ jobs: args: --release -o dist -i python3.12 # Explicitly set interpreter; manylinux containers have multiple Pythons and maturin may pick an older one - name: Upload wheels - uses: actions/upload-artifact@bbbca2ddaa5d8feaa63e36b76fdaad77386f024f # v7 + uses: actions/upload-artifact@bbbca2ddaa5d8feaa63e36b76fdaad77386f024f # v7.0.0 with: name: wheels-${{ matrix.os }}-${{ matrix.target }} path: bindings/python/dist From 189bde69e3911f5c757ca3c8a52dca102a91809d Mon Sep 17 00:00:00 2001 From: Gerald Berger Date: Tue, 21 Apr 2026 18:31:14 +0200 Subject: [PATCH 4/5] refactor(test): clean up expired-snapshot test using fixture helpers Extract `scan_and_verify` free function from `verify_incremental_scan` so tests can pass `Option` snapshot IDs and an arbitrary `Table`. Add `IncrementalTestFixture::with_expired_snapshots(n)` that rebuilds table metadata with the `n` oldest snapshots removed, encapsulating the inline JSON construction that was in the test body. The regression test drops from ~80 lines to ~20. Co-Authored-By: Claude Sonnet 4.6 --- crates/iceberg/src/scan/incremental/tests.rs | 391 ++++++++++--------- 1 file changed, 210 insertions(+), 181 deletions(-) diff --git a/crates/iceberg/src/scan/incremental/tests.rs b/crates/iceberg/src/scan/incremental/tests.rs index 285226c193..3cb7e7b43e 100644 --- a/crates/iceberg/src/scan/incremental/tests.rs +++ b/crates/iceberg/src/scan/incremental/tests.rs @@ -1424,89 +1424,212 @@ impl IncrementalTestFixture { expected_appends: Vec<(i32, &str)>, expected_deletes: Vec<(i64, &str)>, ) { - use arrow_array::cast::AsArray; - use arrow_select::concat::concat_batches; - use futures::TryStreamExt; + scan_and_verify( + &self.table, + Some(from_snapshot_id), + Some(to_snapshot_id), + expected_appends, + expected_deletes, + ) + .await; + } - let incremental_scan = self - .table - .incremental_scan(Some(from_snapshot_id), Some(to_snapshot_id)) - .build() - .unwrap(); + /// Return a new `Table` backed by the same files but with the `num_to_expire` oldest + /// snapshots removed from the metadata, simulating catalog-level snapshot expiry. + pub fn with_expired_snapshots(&self, num_to_expire: usize) -> Table { + use std::collections::HashSet; - let stream = incremental_scan.to_arrow().await.unwrap(); - let batches: Vec<_> = stream.try_collect().await.unwrap(); + let metadata = self.table.metadata(); - // Separate appends and deletes - let append_batches: Vec<_> = batches + let mut snapshots: Vec<_> = metadata.snapshots().collect(); + snapshots.sort_by_key(|s| s.snapshot_id()); + let kept = &snapshots[num_to_expire..]; + + let kept_ids: HashSet = kept.iter().map(|s| s.snapshot_id()).collect(); + + let snapshots_str = kept .iter() - .filter(|(t, _)| *t == crate::arrow::IncrementalBatchType::Append) - .map(|(_, b)| b.clone()) - .collect(); + .map(|s| { + let parent_str = s + .parent_snapshot_id() + .filter(|pid| kept_ids.contains(pid)) + .map(|pid| format!(r#""parent-snapshot-id": {pid},"#)) + .unwrap_or_default(); + format!( + r#" {{ + "snapshot-id": {}, + {} + "timestamp-ms": {}, + "sequence-number": {}, + "summary": {{"operation": "{}"}}, + "manifest-list": "{}", + "schema-id": {} + }}"#, + s.snapshot_id(), + parent_str, + s.timestamp_ms(), + s.sequence_number(), + s.summary().operation.as_str(), + s.manifest_list(), + s.schema_id().unwrap_or(0), + ) + }) + .collect::>() + .join(",\n"); - let delete_batches: Vec<_> = batches + let snapshot_log_str = kept .iter() - .filter(|(t, _)| *t == crate::arrow::IncrementalBatchType::Delete) - .map(|(_, b)| b.clone()) + .map(|s| { + format!( + r#" {{"snapshot-id": {}, "timestamp-ms": {}}}"#, + s.snapshot_id(), + s.timestamp_ms() + ) + }) + .collect::>() + .join(",\n"); + + let last_sequence_number = kept.iter().map(|s| s.sequence_number()).max().unwrap_or(0); + let current_snapshot_id = metadata.current_snapshot_id().unwrap(); + + let metadata_json = format!( + r#"{{ + "format-version": 2, + "table-uuid": "{}", + "location": "{}", + "last-sequence-number": {}, + "last-updated-ms": 1602638573590, + "last-column-id": 2, + "current-schema-id": 0, + "schemas": [ + {{ + "type": "struct", + "schema-id": 0, + "fields": [ + {{"id": 1, "name": "n", "required": true, "type": "int"}}, + {{"id": 2, "name": "data", "required": true, "type": "string"}} + ] + }} + ], + "default-spec-id": 0, + "partition-specs": [{{"spec-id": 0, "fields": []}}], + "last-partition-id": 0, + "default-sort-order-id": 0, + "sort-orders": [{{"order-id": 0, "fields": []}}], + "properties": {{}}, + "current-snapshot-id": {}, + "snapshots": [ +{} + ], + "snapshot-log": [ +{} + ], + "metadata-log": [] +}}"#, + Uuid::new_v4(), + self.table_location, + last_sequence_number, + current_snapshot_id, + snapshots_str, + snapshot_log_str, + ); + + let modified_metadata: TableMetadata = serde_json::from_str(&metadata_json).unwrap(); + + Table::builder() + .metadata(modified_metadata) + .identifier(self.table.identifier().clone()) + .file_io(self.table.file_io().clone()) + .metadata_location( + format!("{}/metadata/v1.json", self.table_location).as_str(), + ) + .build() + .unwrap() + } +} + +/// Run an incremental scan on `table` and assert the results match `expected_appends` / +/// `expected_deletes`. Pass `None` for either snapshot ID to use full-scan / current +/// semantics (the same as `table.incremental_scan(None, None)`). +async fn scan_and_verify( + table: &Table, + from_snapshot_id: Option, + to_snapshot_id: Option, + expected_appends: Vec<(i32, &str)>, + expected_deletes: Vec<(i64, &str)>, +) { + use arrow_array::cast::AsArray; + use arrow_select::concat::concat_batches; + use futures::TryStreamExt; + + let incremental_scan = table + .incremental_scan(from_snapshot_id, to_snapshot_id) + .build() + .unwrap(); + + let stream = incremental_scan.to_arrow().await.unwrap(); + let batches: Vec<_> = stream.try_collect().await.unwrap(); + + let append_batches: Vec<_> = batches + .iter() + .filter(|(t, _)| *t == crate::arrow::IncrementalBatchType::Append) + .map(|(_, b)| b.clone()) + .collect(); + + let delete_batches: Vec<_> = batches + .iter() + .filter(|(t, _)| *t == crate::arrow::IncrementalBatchType::Delete) + .map(|(_, b)| b.clone()) + .collect(); + + if !append_batches.is_empty() { + let append_batch = + concat_batches(&append_batches[0].schema(), append_batches.iter()).unwrap(); + + let n_array = append_batch + .column(0) + .as_primitive::(); + let data_array = append_batch.column(1).as_string::(); + + let mut appended_pairs: Vec<(i32, String)> = (0..append_batch.num_rows()) + .map(|i| (n_array.value(i), data_array.value(i).to_string())) .collect(); + appended_pairs.sort(); - // Verify appended records - if !append_batches.is_empty() { - let append_batch = - concat_batches(&append_batches[0].schema(), append_batches.iter()).unwrap(); - - let n_array = append_batch - .column(0) - .as_primitive::(); - let data_array = append_batch.column(1).as_string::(); - - let mut appended_pairs: Vec<(i32, String)> = (0..append_batch.num_rows()) - .map(|i| (n_array.value(i), data_array.value(i).to_string())) - .collect(); - appended_pairs.sort(); - - let expected_appends: Vec<(i32, String)> = expected_appends - .into_iter() - .map(|(n, s)| (n, s.to_string())) - .collect(); - - assert_eq!(appended_pairs, expected_appends); - } else { - assert!(expected_appends.is_empty(), "Expected appends but got none"); - } + let mut expected: Vec<(i32, String)> = expected_appends + .into_iter() + .map(|(n, s)| (n, s.to_string())) + .collect(); + expected.sort(); - // Verify deleted records - if !delete_batches.is_empty() { - let delete_batch = - concat_batches(&delete_batches[0].schema(), delete_batches.iter()).unwrap(); - - // The file path column is first (column 0) - let file_path_column = delete_batch.column(0); - let file_path_array = file_path_column.as_string::(); - - // The pos column is second (column 1) and is Int64 - let pos_array = delete_batch - .column(1) - .as_primitive::(); - - let mut deleted_pairs: Vec<(i64, String)> = (0..delete_batch.num_rows()) - .map(|i| { - let pos = pos_array.value(i); - let file_path = file_path_array.value(i).to_string(); - (pos, file_path) - }) - .collect(); - deleted_pairs.sort(); - - let expected_deletes: Vec<(i64, String)> = expected_deletes - .into_iter() - .map(|(pos, file)| (pos, file.to_string())) - .collect(); - - assert_eq!(deleted_pairs, expected_deletes); - } else { - assert!(expected_deletes.is_empty(), "Expected deletes but got none"); - } + assert_eq!(appended_pairs, expected); + } else { + assert!(expected_appends.is_empty(), "Expected appends but got none"); + } + + if !delete_batches.is_empty() { + let delete_batch = + concat_batches(&delete_batches[0].schema(), delete_batches.iter()).unwrap(); + + let file_path_array = delete_batch.column(0).as_string::(); + let pos_array = delete_batch + .column(1) + .as_primitive::(); + + let mut deleted_pairs: Vec<(i64, String)> = (0..delete_batch.num_rows()) + .map(|i| (pos_array.value(i), file_path_array.value(i).to_string())) + .collect(); + deleted_pairs.sort(); + + let mut expected: Vec<(i64, String)> = expected_deletes + .into_iter() + .map(|(pos, file)| (pos, file.to_string())) + .collect(); + expected.sort(); + + assert_eq!(deleted_pairs, expected); + } else { + assert!(expected_deletes.is_empty(), "Expected deletes but got none"); } } @@ -2695,123 +2818,29 @@ async fn test_incremental_scan_includes_root_when_from_is_none() { #[tokio::test] async fn test_incremental_scan_from_none_includes_existing_entries_from_expired_snapshots() { // Simulate a table where older snapshots have been expired from the catalog. - // The only visible snapshot's manifest has EXISTING entries for data files that - // were added by the now-expired snapshot. Their snapshot_id is not in the - // accessible ancestry, so the old incremental-scan code dropped them silently. - // After the fix, from=None uses full-scan semantics and includes EXISTING entries. - + // Snapshot 2's manifest has EXISTING entries for files added by the now-expired + // snapshot 1. The old code dropped them silently; after the fix, from=None uses + // full-scan semantics and includes all EXISTING entries. let fixture = IncrementalTestFixture::new(vec![ - // Snapshot 1 (will be expired): adds rows 1 and 2 Operation::Add( vec![(1, "a".to_string()), (2, "b".to_string())], "snap1-data.parquet".to_string(), ), - // Snapshot 2 (remains visible): adds row 3 Operation::Add(vec![(3, "c".to_string())], "snap2-data.parquet".to_string()), ]) .await; - // Snapshot 2's manifest (snap-2-manifest-list.avro) contains: - // - EXISTING entry for snap1-data.parquet (snapshot_id = 1, i.e. expired) - // - ADDED entry for snap2-data.parquet (snapshot_id = 2) - // - // Build table metadata that includes ONLY snapshot 2, simulating expiry of - // snapshot 1. Snapshot 2's parent-snapshot-id still points at 1, but since 1 - // is absent, ancestors_of() stops at 2, giving snapshot_ids = {2}. - // Snapshot 2's manifest has Existing entries with snapshot_id=1 which is NOT in - // {2}, so the old code silently dropped them. - let snap2_manifest_list = format!( - "{}/metadata/snap-2-manifest-list.avro", - fixture.table_location - ); - let metadata_json = format!( - r#"{{ - "format-version": 2, - "table-uuid": "{uuid}", - "location": "{loc}", - "last-sequence-number": 1, - "last-updated-ms": 1602638573590, - "last-column-id": 2, - "current-schema-id": 0, - "schemas": [ - {{ - "type": "struct", - "schema-id": 0, - "fields": [ - {{"id": 1, "name": "n", "required": true, "type": "int"}}, - {{"id": 2, "name": "data", "required": true, "type": "string"}} - ] - }} - ], - "default-spec-id": 0, - "partition-specs": [{{"spec-id": 0, "fields": []}}], - "last-partition-id": 0, - "default-sort-order-id": 0, - "sort-orders": [{{"order-id": 0, "fields": []}}], - "properties": {{}}, - "current-snapshot-id": 2, - "snapshots": [ - {{ - "snapshot-id": 2, - "parent-snapshot-id": 1, - "timestamp-ms": 1515100956770, - "sequence-number": 1, - "summary": {{"operation": "append"}}, - "manifest-list": "{mlist}", - "schema-id": 0 - }} - ], - "snapshot-log": [{{"snapshot-id": 2, "timestamp-ms": 1515100956770}}], - "metadata-log": [] -}}"#, - uuid = Uuid::new_v4(), - loc = fixture.table_location, - mlist = snap2_manifest_list, - ); - let modified_metadata: TableMetadata = serde_json::from_str(&metadata_json).unwrap(); - - let table = Table::builder() - .metadata(modified_metadata) - .identifier(TableIdent::from_strs(["db", "incremental_test"]).unwrap()) - .file_io(fixture.table.file_io().clone()) - .metadata_location(format!("{}/metadata/v1.json", fixture.table_location).as_str()) - .build() - .unwrap(); - - // from=None: should return ALL live rows including those from the expired snapshot. - let scan = table.incremental_scan(None, None).build().unwrap(); - let stream = scan.to_arrow().await.unwrap(); - let batches: Vec<_> = stream.try_collect().await.unwrap(); - - let mut n_values: Vec = Vec::new(); - for (batch_type, batch) in &batches { - if *batch_type == crate::arrow::IncrementalBatchType::Append { - let arr = batch - .column_by_name("n") - .unwrap() - .as_primitive::(); - for i in 0..batch.num_rows() { - n_values.push(arr.value(i)); - } - } - } - n_values.sort(); - - assert_eq!( - n_values, - vec![1, 2, 3], - "from=None must include EXISTING entries from expired snapshots" - ); + // Rebuild the table as if snapshot 1 was expired (only snapshot 2 visible). + let table = fixture.with_expired_snapshots(1); - let delete_rows: usize = batches - .iter() - .filter(|(t, _)| *t == crate::arrow::IncrementalBatchType::Delete) - .map(|(_, b)| b.num_rows()) - .sum(); - assert_eq!( - delete_rows, 0, - "from=None should produce no delete-stream entries" - ); + scan_and_verify( + &table, + None, + None, + vec![(1, "a"), (2, "b"), (3, "c")], + vec![], + ) + .await; } #[tokio::test] From 2c75ac1d18f817c7662c07b1d4a3db7fe84e7d53 Mon Sep 17 00:00:00 2001 From: Gerald Berger Date: Tue, 21 Apr 2026 18:31:26 +0200 Subject: [PATCH 5/5] Format --- crates/iceberg/src/scan/incremental/tests.rs | 4 +--- 1 file changed, 1 insertion(+), 3 deletions(-) diff --git a/crates/iceberg/src/scan/incremental/tests.rs b/crates/iceberg/src/scan/incremental/tests.rs index 3cb7e7b43e..2a53c9ff37 100644 --- a/crates/iceberg/src/scan/incremental/tests.rs +++ b/crates/iceberg/src/scan/incremental/tests.rs @@ -1540,9 +1540,7 @@ impl IncrementalTestFixture { .metadata(modified_metadata) .identifier(self.table.identifier().clone()) .file_io(self.table.file_io().clone()) - .metadata_location( - format!("{}/metadata/v1.json", self.table_location).as_str(), - ) + .metadata_location(format!("{}/metadata/v1.json", self.table_location).as_str()) .build() .unwrap() }