Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .github/workflows/bindings_python_ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -95,7 +95,7 @@ jobs:
- uses: actions/setup-python@a309ff8b426b58ec0e2a45f0f869d46889d02405 # v6
with:
python-version: 3.12
- uses: PyO3/maturin-action@04ac600d27cdf7a9a280dadf7147097c42b757ad # v1
- uses: PyO3/maturin-action@04ac600d27cdf7a9a280dadf7147097c42b757ad # v1.50.1
with:
working-directory: "bindings/python"
command: build
Expand Down
4 changes: 2 additions & 2 deletions .github/workflows/codeql.yml
Original file line number Diff line number Diff line change
Expand Up @@ -46,11 +46,11 @@ jobs:
persist-credentials: false

- name: Initialize CodeQL
uses: github/codeql-action/init@c10b8064de6f491fea524254123dbe5e09572f13 # v4
uses: github/codeql-action/init@c10b8064de6f491fea524254123dbe5e09572f13 # v4.35.1
with:
languages: actions

- name: Perform CodeQL Analysis
uses: github/codeql-action/analyze@c10b8064de6f491fea524254123dbe5e09572f13 # v4
uses: github/codeql-action/analyze@c10b8064de6f491fea524254123dbe5e09572f13 # v4.35.1
with:
category: "/language:actions"
8 changes: 4 additions & 4 deletions .github/workflows/release_python.yml
Original file line number Diff line number Diff line change
Expand Up @@ -124,13 +124,13 @@ jobs:
env:
NEEDS_VALIDATE_RELEASE_TAG_OUTPUTS_CARGO_VERSION: ${{ needs.validate-release-tag.outputs.cargo-version }}

- uses: PyO3/maturin-action@04ac600d27cdf7a9a280dadf7147097c42b757ad # v1
- uses: PyO3/maturin-action@04ac600d27cdf7a9a280dadf7147097c42b757ad # v1.50.1
with:
working-directory: "bindings/python"
command: sdist
args: -o dist
- name: Upload sdist
uses: actions/upload-artifact@bbbca2ddaa5d8feaa63e36b76fdaad77386f024f # v7
uses: actions/upload-artifact@bbbca2ddaa5d8feaa63e36b76fdaad77386f024f # v7.0.0
with:
name: wheels-sdist
path: bindings/python/dist
Expand Down Expand Up @@ -184,15 +184,15 @@ jobs:
uses: ./.github/actions/setup-builder
with:
rust-version: ${{ steps.get-msrv.outputs.msrv }}
- uses: PyO3/maturin-action@04ac600d27cdf7a9a280dadf7147097c42b757ad # v1
- uses: PyO3/maturin-action@04ac600d27cdf7a9a280dadf7147097c42b757ad # v1.50.1
with:
target: ${{ matrix.target }}
manylinux: ${{ matrix.manylinux || 'auto' }}
working-directory: "bindings/python"
command: build
args: --release -o dist -i python3.12 # Explicitly set interpreter; manylinux containers have multiple Pythons and maturin may pick an older one
- name: Upload wheels
uses: actions/upload-artifact@bbbca2ddaa5d8feaa63e36b76fdaad77386f024f # v7
uses: actions/upload-artifact@bbbca2ddaa5d8feaa63e36b76fdaad77386f024f # v7.0.0
with:
name: wheels-${{ matrix.os }}-${{ matrix.target }}
path: bindings/python/dist
Expand Down
8 changes: 4 additions & 4 deletions .github/workflows/release_python_nightly.yml
Original file line number Diff line number Diff line change
Expand Up @@ -48,14 +48,14 @@ jobs:
with:
timestamp: ${{ needs.set-version.outputs.TIMESTAMP }}

- uses: PyO3/maturin-action@04ac600d27cdf7a9a280dadf7147097c42b757ad # v1
- uses: PyO3/maturin-action@04ac600d27cdf7a9a280dadf7147097c42b757ad # v1.50.1
with:
working-directory: "bindings/python"
command: sdist
args: -o dist

- name: Upload sdist
uses: actions/upload-artifact@bbbca2ddaa5d8feaa63e36b76fdaad77386f024f # v7
uses: actions/upload-artifact@bbbca2ddaa5d8feaa63e36b76fdaad77386f024f # v7.0.0
with:
name: wheels-sdist
path: bindings/python/dist
Expand Down Expand Up @@ -98,7 +98,7 @@ jobs:
with:
rust-version: ${{ steps.get-msrv.outputs.msrv }}

- uses: PyO3/maturin-action@04ac600d27cdf7a9a280dadf7147097c42b757ad # v1
- uses: PyO3/maturin-action@04ac600d27cdf7a9a280dadf7147097c42b757ad # v1.50.1
with:
target: ${{ matrix.target }}
manylinux: ${{ matrix.manylinux || 'auto' }}
Expand All @@ -107,7 +107,7 @@ jobs:
args: --release -o dist -i python3.12 # Explicitly set interpreter; manylinux containers have multiple Pythons and maturin may pick an older one

- name: Upload wheels
uses: actions/upload-artifact@bbbca2ddaa5d8feaa63e36b76fdaad77386f024f # v7
uses: actions/upload-artifact@bbbca2ddaa5d8feaa63e36b76fdaad77386f024f # v7.0.0
with:
name: wheels-${{ matrix.os }}-${{ matrix.target }}
path: bindings/python/dist
Expand Down
41 changes: 37 additions & 4 deletions crates/iceberg/src/scan/incremental/context.rs
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,11 @@ pub(crate) struct IncrementalPlanContext {
/// The snapshot to start the incremental scan from.
pub from_snapshot: SnapshotRef,

/// The user-provided from_snapshot_id (None means "scan from the beginning").
/// Used to distinguish a true "from beginning" scan (full scan semantics) from
/// an incremental scan with an explicit starting snapshot.
pub from_snapshot_id: Option<i64>,

/// The metadata of the table being scanned.
pub table_metadata: TableMetadataRef,

Expand Down Expand Up @@ -67,13 +72,41 @@ impl IncrementalPlanContext {
delete_file_idx: DeleteFileIndex,
delete_file_tx: Sender<ManifestEntryContext>,
) -> Result<Box<impl Iterator<Item = Result<ManifestFileContext>> + 'static>> {
// Collect all snapshot IDs (all operation types are supported)
let snapshot_ids: HashSet<i64> = self.snapshots.iter().map(|s| s.snapshot_id()).collect();

// Separate delete and data manifests to ensure deletes are processed first.
// This prevents deadlock by ensuring delete processing completes
// (and builds the delete filter) before data manifests are fetched.
let (delete_manifests, data_manifests, filter_fn) = {
let (delete_manifests, data_manifests, filter_fn) = if self.from_snapshot_id.is_none() {
// When from=None, use full-scan semantics: read ALL manifests from to_snapshot's
// manifest list without filtering by added_snapshot_id. This handles tables where
// older snapshots have been expired — their data files appear as EXISTING entries
// in later manifests and would otherwise be silently dropped.
let to_snapshot = self.snapshots.first().expect("snapshots is non-empty");
let manifest_list = self
.object_cache
.get_manifest_list(to_snapshot, &self.table_metadata)
.await?;

let mut delete_manifests = HashSet::<ManifestFile>::new();
let mut data_manifests = HashSet::<ManifestFile>::new();
for entry in manifest_list.entries() {
if entry.content == ManifestContentType::Deletes {
delete_manifests.insert(entry.clone());
} else {
data_manifests.insert(entry.clone());
}
}

// No entry-level filtering: include entries from expired snapshots too.
(
delete_manifests,
data_manifests,
None::<Arc<ManifestEntryFilterFn>>,
)
} else {
// Collect all snapshot IDs (all operation types are supported)
let snapshot_ids: HashSet<i64> =
self.snapshots.iter().map(|s| s.snapshot_id()).collect();

let mut delete_manifests = HashSet::<ManifestFile>::new();
let mut data_manifests = HashSet::<ManifestFile>::new();

Expand Down
22 changes: 15 additions & 7 deletions crates/iceberg/src/scan/incremental/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -370,6 +370,7 @@ impl<'a> IncrementalTableScanBuilder<'a> {
let plan_context = IncrementalPlanContext {
snapshots,
from_snapshot: snapshot_from,
from_snapshot_id: self.from_snapshot_id,
table_metadata: self.table.metadata_ref(),
to_snapshot_schema: schema,
object_cache: self.table.object_cache().clone(),
Expand Down Expand Up @@ -504,8 +505,11 @@ impl IncrementalTableScan {

// Collect data files that were live at from_snapshot. These are needed to generate
// equality delete tasks for files that predate the scan range.
// Runs in parallel with the rest of the planning.
let from_snapshot_collection = if all_deletes.iter().any(is_equality_delete) {
// Skipped when from=None because in that case all data files are treated as appends
// (full-scan semantics), so equality deletes are handled via AppendedFileScanTask.
let from_snapshot_collection = if self.plan_context.from_snapshot_id.is_some()
&& all_deletes.iter().any(is_equality_delete)
{
Some(Self::spawn_manifest_entry_collection(
self.plan_context.from_snapshot.clone(),
self.plan_context.table_metadata.clone(),
Expand Down Expand Up @@ -536,6 +540,7 @@ impl IncrementalTableScan {
// Process the data file [`ManifestEntry`] stream in parallel
let filter = delete_filter.clone();
let table_metadata = self.plan_context.table_metadata.clone();
let from_snapshot_is_none = self.plan_context.from_snapshot_id.is_none();
spawn(async move {
let result = manifest_entry_data_ctx_rx
.map(|me_ctx| Ok((me_ctx, file_scan_task_tx.clone())))
Expand All @@ -545,8 +550,13 @@ impl IncrementalTableScan {
let filter = filter.clone();
let table_metadata = table_metadata.clone();
async move {
if manifest_entry_context.manifest_entry.status()
== ManifestStatus::Added
let status = manifest_entry_context.manifest_entry.status();
// When from=None (full-scan semantics), treat both Added and Existing
// entries as appends. Existing entries arise from expired snapshots or
// manifest rewrites — they represent live files that must be included.
// Deleted entries are skipped since they are not live.
if status == ManifestStatus::Added
|| (from_snapshot_is_none && status == ManifestStatus::Existing)
{
spawn(async move {
Self::process_data_manifest_entry(
Expand All @@ -558,9 +568,7 @@ impl IncrementalTableScan {
.await
})
.await
} else if manifest_entry_context.manifest_entry.status()
== ManifestStatus::Deleted
{
} else if status == ManifestStatus::Deleted && !from_snapshot_is_none {
spawn(async move {
Self::process_deleted_data_manifest_entry(
tx,
Expand Down
Loading
Loading