diff --git a/.github/workflows/bindings_python_ci.yml b/.github/workflows/bindings_python_ci.yml index a7abfcbeed..a02ae9f0af 100644 --- a/.github/workflows/bindings_python_ci.yml +++ b/.github/workflows/bindings_python_ci.yml @@ -95,7 +95,7 @@ jobs: - uses: actions/setup-python@a309ff8b426b58ec0e2a45f0f869d46889d02405 # v6 with: python-version: 3.12 - - uses: PyO3/maturin-action@04ac600d27cdf7a9a280dadf7147097c42b757ad # v1 + - uses: PyO3/maturin-action@04ac600d27cdf7a9a280dadf7147097c42b757ad # v1.50.1 with: working-directory: "bindings/python" command: build diff --git a/.github/workflows/codeql.yml b/.github/workflows/codeql.yml index 81bc6b16f8..fe0459aeb7 100644 --- a/.github/workflows/codeql.yml +++ b/.github/workflows/codeql.yml @@ -46,11 +46,11 @@ jobs: persist-credentials: false - name: Initialize CodeQL - uses: github/codeql-action/init@c10b8064de6f491fea524254123dbe5e09572f13 # v4 + uses: github/codeql-action/init@c10b8064de6f491fea524254123dbe5e09572f13 # v4.35.1 with: languages: actions - name: Perform CodeQL Analysis - uses: github/codeql-action/analyze@c10b8064de6f491fea524254123dbe5e09572f13 # v4 + uses: github/codeql-action/analyze@c10b8064de6f491fea524254123dbe5e09572f13 # v4.35.1 with: category: "/language:actions" diff --git a/.github/workflows/release_python.yml b/.github/workflows/release_python.yml index b19fa165dc..c9817e064c 100644 --- a/.github/workflows/release_python.yml +++ b/.github/workflows/release_python.yml @@ -124,13 +124,13 @@ jobs: env: NEEDS_VALIDATE_RELEASE_TAG_OUTPUTS_CARGO_VERSION: ${{ needs.validate-release-tag.outputs.cargo-version }} - - uses: PyO3/maturin-action@04ac600d27cdf7a9a280dadf7147097c42b757ad # v1 + - uses: PyO3/maturin-action@04ac600d27cdf7a9a280dadf7147097c42b757ad # v1.50.1 with: working-directory: "bindings/python" command: sdist args: -o dist - name: Upload sdist - uses: actions/upload-artifact@bbbca2ddaa5d8feaa63e36b76fdaad77386f024f # v7 + uses: actions/upload-artifact@bbbca2ddaa5d8feaa63e36b76fdaad77386f024f # v7.0.0 with: name: wheels-sdist path: bindings/python/dist @@ -184,7 +184,7 @@ jobs: uses: ./.github/actions/setup-builder with: rust-version: ${{ steps.get-msrv.outputs.msrv }} - - uses: PyO3/maturin-action@04ac600d27cdf7a9a280dadf7147097c42b757ad # v1 + - uses: PyO3/maturin-action@04ac600d27cdf7a9a280dadf7147097c42b757ad # v1.50.1 with: target: ${{ matrix.target }} manylinux: ${{ matrix.manylinux || 'auto' }} @@ -192,7 +192,7 @@ jobs: command: build args: --release -o dist -i python3.12 # Explicitly set interpreter; manylinux containers have multiple Pythons and maturin may pick an older one - name: Upload wheels - uses: actions/upload-artifact@bbbca2ddaa5d8feaa63e36b76fdaad77386f024f # v7 + uses: actions/upload-artifact@bbbca2ddaa5d8feaa63e36b76fdaad77386f024f # v7.0.0 with: name: wheels-${{ matrix.os }}-${{ matrix.target }} path: bindings/python/dist diff --git a/.github/workflows/release_python_nightly.yml b/.github/workflows/release_python_nightly.yml index 66ae0e1db2..55695784e9 100644 --- a/.github/workflows/release_python_nightly.yml +++ b/.github/workflows/release_python_nightly.yml @@ -48,14 +48,14 @@ jobs: with: timestamp: ${{ needs.set-version.outputs.TIMESTAMP }} - - uses: PyO3/maturin-action@04ac600d27cdf7a9a280dadf7147097c42b757ad # v1 + - uses: PyO3/maturin-action@04ac600d27cdf7a9a280dadf7147097c42b757ad # v1.50.1 with: working-directory: "bindings/python" command: sdist args: -o dist - name: Upload sdist - uses: actions/upload-artifact@bbbca2ddaa5d8feaa63e36b76fdaad77386f024f # v7 + uses: actions/upload-artifact@bbbca2ddaa5d8feaa63e36b76fdaad77386f024f # v7.0.0 with: name: wheels-sdist path: bindings/python/dist @@ -98,7 +98,7 @@ jobs: with: rust-version: ${{ steps.get-msrv.outputs.msrv }} - - uses: PyO3/maturin-action@04ac600d27cdf7a9a280dadf7147097c42b757ad # v1 + - uses: PyO3/maturin-action@04ac600d27cdf7a9a280dadf7147097c42b757ad # v1.50.1 with: target: ${{ matrix.target }} manylinux: ${{ matrix.manylinux || 'auto' }} @@ -107,7 +107,7 @@ jobs: args: --release -o dist -i python3.12 # Explicitly set interpreter; manylinux containers have multiple Pythons and maturin may pick an older one - name: Upload wheels - uses: actions/upload-artifact@bbbca2ddaa5d8feaa63e36b76fdaad77386f024f # v7 + uses: actions/upload-artifact@bbbca2ddaa5d8feaa63e36b76fdaad77386f024f # v7.0.0 with: name: wheels-${{ matrix.os }}-${{ matrix.target }} path: bindings/python/dist diff --git a/crates/iceberg/src/scan/incremental/context.rs b/crates/iceberg/src/scan/incremental/context.rs index c60c3461d8..874e09b4f1 100644 --- a/crates/iceberg/src/scan/incremental/context.rs +++ b/crates/iceberg/src/scan/incremental/context.rs @@ -38,6 +38,11 @@ pub(crate) struct IncrementalPlanContext { /// The snapshot to start the incremental scan from. pub from_snapshot: SnapshotRef, + /// The user-provided from_snapshot_id (None means "scan from the beginning"). + /// Used to distinguish a true "from beginning" scan (full scan semantics) from + /// an incremental scan with an explicit starting snapshot. + pub from_snapshot_id: Option, + /// The metadata of the table being scanned. pub table_metadata: TableMetadataRef, @@ -67,13 +72,41 @@ impl IncrementalPlanContext { delete_file_idx: DeleteFileIndex, delete_file_tx: Sender, ) -> Result> + 'static>> { - // Collect all snapshot IDs (all operation types are supported) - let snapshot_ids: HashSet = self.snapshots.iter().map(|s| s.snapshot_id()).collect(); - // Separate delete and data manifests to ensure deletes are processed first. // This prevents deadlock by ensuring delete processing completes // (and builds the delete filter) before data manifests are fetched. - let (delete_manifests, data_manifests, filter_fn) = { + let (delete_manifests, data_manifests, filter_fn) = if self.from_snapshot_id.is_none() { + // When from=None, use full-scan semantics: read ALL manifests from to_snapshot's + // manifest list without filtering by added_snapshot_id. This handles tables where + // older snapshots have been expired — their data files appear as EXISTING entries + // in later manifests and would otherwise be silently dropped. + let to_snapshot = self.snapshots.first().expect("snapshots is non-empty"); + let manifest_list = self + .object_cache + .get_manifest_list(to_snapshot, &self.table_metadata) + .await?; + + let mut delete_manifests = HashSet::::new(); + let mut data_manifests = HashSet::::new(); + for entry in manifest_list.entries() { + if entry.content == ManifestContentType::Deletes { + delete_manifests.insert(entry.clone()); + } else { + data_manifests.insert(entry.clone()); + } + } + + // No entry-level filtering: include entries from expired snapshots too. + ( + delete_manifests, + data_manifests, + None::>, + ) + } else { + // Collect all snapshot IDs (all operation types are supported) + let snapshot_ids: HashSet = + self.snapshots.iter().map(|s| s.snapshot_id()).collect(); + let mut delete_manifests = HashSet::::new(); let mut data_manifests = HashSet::::new(); diff --git a/crates/iceberg/src/scan/incremental/mod.rs b/crates/iceberg/src/scan/incremental/mod.rs index 74b9090f4e..690f88e8b5 100644 --- a/crates/iceberg/src/scan/incremental/mod.rs +++ b/crates/iceberg/src/scan/incremental/mod.rs @@ -370,6 +370,7 @@ impl<'a> IncrementalTableScanBuilder<'a> { let plan_context = IncrementalPlanContext { snapshots, from_snapshot: snapshot_from, + from_snapshot_id: self.from_snapshot_id, table_metadata: self.table.metadata_ref(), to_snapshot_schema: schema, object_cache: self.table.object_cache().clone(), @@ -504,8 +505,11 @@ impl IncrementalTableScan { // Collect data files that were live at from_snapshot. These are needed to generate // equality delete tasks for files that predate the scan range. - // Runs in parallel with the rest of the planning. - let from_snapshot_collection = if all_deletes.iter().any(is_equality_delete) { + // Skipped when from=None because in that case all data files are treated as appends + // (full-scan semantics), so equality deletes are handled via AppendedFileScanTask. + let from_snapshot_collection = if self.plan_context.from_snapshot_id.is_some() + && all_deletes.iter().any(is_equality_delete) + { Some(Self::spawn_manifest_entry_collection( self.plan_context.from_snapshot.clone(), self.plan_context.table_metadata.clone(), @@ -536,6 +540,7 @@ impl IncrementalTableScan { // Process the data file [`ManifestEntry`] stream in parallel let filter = delete_filter.clone(); let table_metadata = self.plan_context.table_metadata.clone(); + let from_snapshot_is_none = self.plan_context.from_snapshot_id.is_none(); spawn(async move { let result = manifest_entry_data_ctx_rx .map(|me_ctx| Ok((me_ctx, file_scan_task_tx.clone()))) @@ -545,8 +550,13 @@ impl IncrementalTableScan { let filter = filter.clone(); let table_metadata = table_metadata.clone(); async move { - if manifest_entry_context.manifest_entry.status() - == ManifestStatus::Added + let status = manifest_entry_context.manifest_entry.status(); + // When from=None (full-scan semantics), treat both Added and Existing + // entries as appends. Existing entries arise from expired snapshots or + // manifest rewrites — they represent live files that must be included. + // Deleted entries are skipped since they are not live. + if status == ManifestStatus::Added + || (from_snapshot_is_none && status == ManifestStatus::Existing) { spawn(async move { Self::process_data_manifest_entry( @@ -558,9 +568,7 @@ impl IncrementalTableScan { .await }) .await - } else if manifest_entry_context.manifest_entry.status() - == ManifestStatus::Deleted - { + } else if status == ManifestStatus::Deleted && !from_snapshot_is_none { spawn(async move { Self::process_deleted_data_manifest_entry( tx, diff --git a/crates/iceberg/src/scan/incremental/tests.rs b/crates/iceberg/src/scan/incremental/tests.rs index 468e834b76..2a53c9ff37 100644 --- a/crates/iceberg/src/scan/incremental/tests.rs +++ b/crates/iceberg/src/scan/incremental/tests.rs @@ -1424,89 +1424,210 @@ impl IncrementalTestFixture { expected_appends: Vec<(i32, &str)>, expected_deletes: Vec<(i64, &str)>, ) { - use arrow_array::cast::AsArray; - use arrow_select::concat::concat_batches; - use futures::TryStreamExt; + scan_and_verify( + &self.table, + Some(from_snapshot_id), + Some(to_snapshot_id), + expected_appends, + expected_deletes, + ) + .await; + } - let incremental_scan = self - .table - .incremental_scan(Some(from_snapshot_id), Some(to_snapshot_id)) - .build() - .unwrap(); + /// Return a new `Table` backed by the same files but with the `num_to_expire` oldest + /// snapshots removed from the metadata, simulating catalog-level snapshot expiry. + pub fn with_expired_snapshots(&self, num_to_expire: usize) -> Table { + use std::collections::HashSet; - let stream = incremental_scan.to_arrow().await.unwrap(); - let batches: Vec<_> = stream.try_collect().await.unwrap(); + let metadata = self.table.metadata(); - // Separate appends and deletes - let append_batches: Vec<_> = batches + let mut snapshots: Vec<_> = metadata.snapshots().collect(); + snapshots.sort_by_key(|s| s.snapshot_id()); + let kept = &snapshots[num_to_expire..]; + + let kept_ids: HashSet = kept.iter().map(|s| s.snapshot_id()).collect(); + + let snapshots_str = kept .iter() - .filter(|(t, _)| *t == crate::arrow::IncrementalBatchType::Append) - .map(|(_, b)| b.clone()) - .collect(); + .map(|s| { + let parent_str = s + .parent_snapshot_id() + .filter(|pid| kept_ids.contains(pid)) + .map(|pid| format!(r#""parent-snapshot-id": {pid},"#)) + .unwrap_or_default(); + format!( + r#" {{ + "snapshot-id": {}, + {} + "timestamp-ms": {}, + "sequence-number": {}, + "summary": {{"operation": "{}"}}, + "manifest-list": "{}", + "schema-id": {} + }}"#, + s.snapshot_id(), + parent_str, + s.timestamp_ms(), + s.sequence_number(), + s.summary().operation.as_str(), + s.manifest_list(), + s.schema_id().unwrap_or(0), + ) + }) + .collect::>() + .join(",\n"); - let delete_batches: Vec<_> = batches + let snapshot_log_str = kept .iter() - .filter(|(t, _)| *t == crate::arrow::IncrementalBatchType::Delete) - .map(|(_, b)| b.clone()) + .map(|s| { + format!( + r#" {{"snapshot-id": {}, "timestamp-ms": {}}}"#, + s.snapshot_id(), + s.timestamp_ms() + ) + }) + .collect::>() + .join(",\n"); + + let last_sequence_number = kept.iter().map(|s| s.sequence_number()).max().unwrap_or(0); + let current_snapshot_id = metadata.current_snapshot_id().unwrap(); + + let metadata_json = format!( + r#"{{ + "format-version": 2, + "table-uuid": "{}", + "location": "{}", + "last-sequence-number": {}, + "last-updated-ms": 1602638573590, + "last-column-id": 2, + "current-schema-id": 0, + "schemas": [ + {{ + "type": "struct", + "schema-id": 0, + "fields": [ + {{"id": 1, "name": "n", "required": true, "type": "int"}}, + {{"id": 2, "name": "data", "required": true, "type": "string"}} + ] + }} + ], + "default-spec-id": 0, + "partition-specs": [{{"spec-id": 0, "fields": []}}], + "last-partition-id": 0, + "default-sort-order-id": 0, + "sort-orders": [{{"order-id": 0, "fields": []}}], + "properties": {{}}, + "current-snapshot-id": {}, + "snapshots": [ +{} + ], + "snapshot-log": [ +{} + ], + "metadata-log": [] +}}"#, + Uuid::new_v4(), + self.table_location, + last_sequence_number, + current_snapshot_id, + snapshots_str, + snapshot_log_str, + ); + + let modified_metadata: TableMetadata = serde_json::from_str(&metadata_json).unwrap(); + + Table::builder() + .metadata(modified_metadata) + .identifier(self.table.identifier().clone()) + .file_io(self.table.file_io().clone()) + .metadata_location(format!("{}/metadata/v1.json", self.table_location).as_str()) + .build() + .unwrap() + } +} + +/// Run an incremental scan on `table` and assert the results match `expected_appends` / +/// `expected_deletes`. Pass `None` for either snapshot ID to use full-scan / current +/// semantics (the same as `table.incremental_scan(None, None)`). +async fn scan_and_verify( + table: &Table, + from_snapshot_id: Option, + to_snapshot_id: Option, + expected_appends: Vec<(i32, &str)>, + expected_deletes: Vec<(i64, &str)>, +) { + use arrow_array::cast::AsArray; + use arrow_select::concat::concat_batches; + use futures::TryStreamExt; + + let incremental_scan = table + .incremental_scan(from_snapshot_id, to_snapshot_id) + .build() + .unwrap(); + + let stream = incremental_scan.to_arrow().await.unwrap(); + let batches: Vec<_> = stream.try_collect().await.unwrap(); + + let append_batches: Vec<_> = batches + .iter() + .filter(|(t, _)| *t == crate::arrow::IncrementalBatchType::Append) + .map(|(_, b)| b.clone()) + .collect(); + + let delete_batches: Vec<_> = batches + .iter() + .filter(|(t, _)| *t == crate::arrow::IncrementalBatchType::Delete) + .map(|(_, b)| b.clone()) + .collect(); + + if !append_batches.is_empty() { + let append_batch = + concat_batches(&append_batches[0].schema(), append_batches.iter()).unwrap(); + + let n_array = append_batch + .column(0) + .as_primitive::(); + let data_array = append_batch.column(1).as_string::(); + + let mut appended_pairs: Vec<(i32, String)> = (0..append_batch.num_rows()) + .map(|i| (n_array.value(i), data_array.value(i).to_string())) .collect(); + appended_pairs.sort(); - // Verify appended records - if !append_batches.is_empty() { - let append_batch = - concat_batches(&append_batches[0].schema(), append_batches.iter()).unwrap(); - - let n_array = append_batch - .column(0) - .as_primitive::(); - let data_array = append_batch.column(1).as_string::(); - - let mut appended_pairs: Vec<(i32, String)> = (0..append_batch.num_rows()) - .map(|i| (n_array.value(i), data_array.value(i).to_string())) - .collect(); - appended_pairs.sort(); - - let expected_appends: Vec<(i32, String)> = expected_appends - .into_iter() - .map(|(n, s)| (n, s.to_string())) - .collect(); - - assert_eq!(appended_pairs, expected_appends); - } else { - assert!(expected_appends.is_empty(), "Expected appends but got none"); - } + let mut expected: Vec<(i32, String)> = expected_appends + .into_iter() + .map(|(n, s)| (n, s.to_string())) + .collect(); + expected.sort(); - // Verify deleted records - if !delete_batches.is_empty() { - let delete_batch = - concat_batches(&delete_batches[0].schema(), delete_batches.iter()).unwrap(); - - // The file path column is first (column 0) - let file_path_column = delete_batch.column(0); - let file_path_array = file_path_column.as_string::(); - - // The pos column is second (column 1) and is Int64 - let pos_array = delete_batch - .column(1) - .as_primitive::(); - - let mut deleted_pairs: Vec<(i64, String)> = (0..delete_batch.num_rows()) - .map(|i| { - let pos = pos_array.value(i); - let file_path = file_path_array.value(i).to_string(); - (pos, file_path) - }) - .collect(); - deleted_pairs.sort(); - - let expected_deletes: Vec<(i64, String)> = expected_deletes - .into_iter() - .map(|(pos, file)| (pos, file.to_string())) - .collect(); - - assert_eq!(deleted_pairs, expected_deletes); - } else { - assert!(expected_deletes.is_empty(), "Expected deletes but got none"); - } + assert_eq!(appended_pairs, expected); + } else { + assert!(expected_appends.is_empty(), "Expected appends but got none"); + } + + if !delete_batches.is_empty() { + let delete_batch = + concat_batches(&delete_batches[0].schema(), delete_batches.iter()).unwrap(); + + let file_path_array = delete_batch.column(0).as_string::(); + let pos_array = delete_batch + .column(1) + .as_primitive::(); + + let mut deleted_pairs: Vec<(i64, String)> = (0..delete_batch.num_rows()) + .map(|i| (pos_array.value(i), file_path_array.value(i).to_string())) + .collect(); + deleted_pairs.sort(); + + let mut expected: Vec<(i64, String)> = expected_deletes + .into_iter() + .map(|(pos, file)| (pos, file.to_string())) + .collect(); + expected.sort(); + + assert_eq!(deleted_pairs, expected); + } else { + assert!(expected_deletes.is_empty(), "Expected deletes but got none"); } } @@ -2692,6 +2813,34 @@ async fn test_incremental_scan_includes_root_when_from_is_none() { ); } +#[tokio::test] +async fn test_incremental_scan_from_none_includes_existing_entries_from_expired_snapshots() { + // Simulate a table where older snapshots have been expired from the catalog. + // Snapshot 2's manifest has EXISTING entries for files added by the now-expired + // snapshot 1. The old code dropped them silently; after the fix, from=None uses + // full-scan semantics and includes all EXISTING entries. + let fixture = IncrementalTestFixture::new(vec![ + Operation::Add( + vec![(1, "a".to_string()), (2, "b".to_string())], + "snap1-data.parquet".to_string(), + ), + Operation::Add(vec![(3, "c".to_string())], "snap2-data.parquet".to_string()), + ]) + .await; + + // Rebuild the table as if snapshot 1 was expired (only snapshot 2 visible). + let table = fixture.with_expired_snapshots(1); + + scan_and_verify( + &table, + None, + None, + vec![(1, "a"), (2, "b"), (3, "c")], + vec![], + ) + .await; +} + #[tokio::test] async fn test_incremental_scan_with_file_column() { // Test that the _file metadata column works correctly in incremental scans