From 9d99d19b81b00d9ef8847b2933c3f2ef75c7eeb6 Mon Sep 17 00:00:00 2001 From: Jayant Shrivastava Date: Tue, 17 Feb 2026 21:48:32 +0000 Subject: [PATCH 1/6] proto: serialize and dedupe dynamic filters Informs: https://github.com/datafusion-contrib/datafusion-distributed/issues/180 Closes: https://github.com/apache/datafusion/issues/20418 Consider this scenario 1. You have a plan with a `HashJoinExec` and `DataSourceExec` 2. You run the physical optimizer and the `DataSourceExec` accepts `DynamicFilterPhysicalExpr` pushdown from the `HashJoinExec` 3. You serialize the plan, deserialize it, and execute it What should happen is that the dynamic filter should "work", meaning 1. When you deserialize the plan, both the `HashJoinExec` and `DataSourceExec` should have pointers to the same `DynamicFilterPhysicalExpr` 2. The `DynamicFilterPhysicalExpr` should be updated during execution by the `HashJoinExec` and the `DataSourceExec` should filter out rows This does not happen today for a few reasons, a couple of which this PR aims to address 1. `DynamicFilterPhysicalExpr` is not survive round-tripping. The internal exprs get inlined (ex. it may be serialized as `Literal`) 2. Even if `DynamicFilterPhysicalExpr` survives round-tripping, during pushdown, it's often the case that the `DynamicFilterPhysicalExpr` is rewritten. In this case, you have two `DynamicFilterPhysicalExpr` which are different `Arc`s but share the same `Inner` dynamic filter state. The current `DeduplicatingProtoConverter` does not handle this specific form of deduping. This PR aims to fix those problems by adding serde for `DynamicFilterPhysicalExpr` and deduping logic for the inner state of dynamic filters. It does not yet add a test for the `HashJoinExec` and `DataSourceExec` filter pushdown case, but this is relevant follow up work. I tried to keep the PR small for reviewers. Yes, via unit tests. `DynamicFilterPhysicalExpr` are now serialized by the default codec --- .../physical-expr-common/src/physical_expr.rs | 46 ++- .../src/expressions/dynamic_filters.rs | 237 ++++++++++- .../physical-expr/src/expressions/mod.rs | 2 +- datafusion/proto/proto/datafusion.proto | 27 +- datafusion/proto/src/generated/pbjson.rs | 294 +++++++++++++- datafusion/proto/src/generated/prost.rs | 37 +- .../proto/src/physical_plan/from_proto.rs | 45 +++ datafusion/proto/src/physical_plan/mod.rs | 96 +++-- .../proto/src/physical_plan/to_proto.rs | 93 +++-- .../tests/cases/roundtrip_physical_plan.rs | 379 +++++++++++++++++- 10 files changed, 1165 insertions(+), 91 deletions(-) diff --git a/datafusion/physical-expr-common/src/physical_expr.rs b/datafusion/physical-expr-common/src/physical_expr.rs index 7107b0a9004d3..376994fe7f271 100644 --- a/datafusion/physical-expr-common/src/physical_expr.rs +++ b/datafusion/physical-expr-common/src/physical_expr.rs @@ -18,7 +18,7 @@ use std::any::Any; use std::fmt; use std::fmt::{Debug, Display, Formatter}; -use std::hash::{Hash, Hasher}; +use std::hash::{DefaultHasher, Hash, Hasher}; use std::sync::Arc; use crate::utils::scatter; @@ -441,6 +441,50 @@ pub trait PhysicalExpr: Any + Send + Sync + Display + Debug + DynEq + DynHash { fn placement(&self) -> ExpressionPlacement { ExpressionPlacement::KeepInPlace } + + /// Returns a composite identifier for a [`PhysicalExpr`]. Note that if the expression + /// is dropped, then the returned id is no longer valid. + fn expr_id(self: Arc, salt: &[u64]) -> Option { + Some(PhysicalExprId::new(expr_id_from_arc(&self, salt), None)) + } +} + +/// A composite identifier for [`PhysicalExpr`]. +#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)] +pub struct PhysicalExprId { + exact: u64, + shallow: Option, +} + +impl PhysicalExprId { + /// Create a new [`PhysicalExprId`]. Both ids must be globally unique within + /// a process. + pub fn new(exact: u64, shallow: Option) -> Self { + Self { exact, shallow } + } + + /// Returns the identifier for the full expression tree, including children. + pub fn exact(&self) -> u64 { + self.exact + } + + /// Returns the identifier for just the expression root, ignoring children. + pub fn shallow(&self) -> Option { + self.shallow + } +} + +/// Computes a unique identifier for a type contained within an [`Arc`]. It hashes +/// the [`Arc`] pointer to create a process-local identifier that remains valid +/// only while that allocation is still alive. +pub fn expr_id_from_arc(expr: &Arc, salt: &[u64]) -> u64 { + let mut hasher = DefaultHasher::new(); + let ptr = Arc::as_ptr(expr) as *const () as u64; + ptr.hash(&mut hasher); + for &salt in salt { + salt.hash(&mut hasher); + } + hasher.finish() } #[deprecated( diff --git a/datafusion/physical-expr/src/expressions/dynamic_filters.rs b/datafusion/physical-expr/src/expressions/dynamic_filters.rs index d285f8b377eca..2a408e86a8127 100644 --- a/datafusion/physical-expr/src/expressions/dynamic_filters.rs +++ b/datafusion/physical-expr/src/expressions/dynamic_filters.rs @@ -26,7 +26,9 @@ use datafusion_common::{ tree_node::{Transformed, TransformedResult, TreeNode}, }; use datafusion_expr::ColumnarValue; -use datafusion_physical_expr_common::physical_expr::DynHash; +use datafusion_physical_expr_common::physical_expr::{ + DynHash, PhysicalExprId, expr_id_from_arc, +}; /// State of a dynamic filter, tracking both updates and completion. #[derive(Debug, Clone, Copy, PartialEq, Eq)] @@ -88,6 +90,116 @@ struct Inner { is_complete: bool, } +/// An atomic snapshot of a [`DynamicFilterPhysicalExpr`] used to reconstruct the expression during +/// serialization / deserialization. +pub struct DynamicFilterSnapshot { + children: Vec>, + remapped_children: Option>>, + // Inner state. + generation: u64, + inner_expr: Arc, + is_complete: bool, +} + +impl DynamicFilterSnapshot { + pub fn new( + children: Vec>, + remapped_children: Option>>, + generation: u64, + inner_expr: Arc, + is_complete: bool, + ) -> Self { + Self { + children, + remapped_children, + generation, + inner_expr, + is_complete, + } + } + + pub fn children(&self) -> &[Arc] { + &self.children + } + + pub fn remapped_children(&self) -> Option<&[Arc]> { + self.remapped_children.as_deref() + } + + pub fn generation(&self) -> u64 { + self.generation + } + + pub fn inner_expr(&self) -> &Arc { + &self.inner_expr + } + + pub fn is_complete(&self) -> bool { + self.is_complete + } +} +impl Display for DynamicFilterSnapshot { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + write!( + f, + "DynamicFilterSnapshot {{ children: {:?}, remapped_children: {:?}, generation: {}, inner_expr: {:?}, is_complete: {} }}", + self.children, + self.remapped_children, + self.generation, + self.inner_expr, + self.is_complete + ) + } +} + +impl From for DynamicFilterPhysicalExpr { + fn from(snapshot: DynamicFilterSnapshot) -> Self { + let DynamicFilterSnapshot { + children, + remapped_children, + generation, + inner_expr, + is_complete, + } = snapshot; + + let state = if is_complete { + FilterState::Complete { generation } + } else { + FilterState::InProgress { generation } + }; + let (state_watch, _) = watch::channel(state); + + Self { + children, + remapped_children, + inner: Arc::new(RwLock::new(Inner { + generation, + expr: inner_expr, + is_complete, + })), + state_watch, + data_type: Arc::new(RwLock::new(None)), + nullable: Arc::new(RwLock::new(None)), + } + } +} + +impl From<&DynamicFilterPhysicalExpr> for DynamicFilterSnapshot { + fn from(expr: &DynamicFilterPhysicalExpr) -> Self { + let (generation, inner_expr, is_complete) = { + let inner = expr.inner.read(); + (inner.generation, Arc::clone(&inner.expr), inner.is_complete) + }; + DynamicFilterSnapshot { + children: expr.children.clone(), + remapped_children: expr.remapped_children.clone(), + generation, + inner_expr, + is_complete, + } + } +} + impl Inner { fn new(expr: Arc) -> Self { Self { @@ -448,6 +560,13 @@ impl PhysicalExpr for DynamicFilterPhysicalExpr { // Return the current generation of the expression. self.inner.read().generation } + + fn expr_id(self: Arc, salt: &[u64]) -> Option { + Some(PhysicalExprId::new( + expr_id_from_arc(&self, salt), + Some(expr_id_from_arc(&self.inner, salt)), + )) + } } #[cfg(test)] @@ -867,4 +986,120 @@ mod test { "Hash should be stable after update (identity-based)" ); } + + #[test] + fn test_current_snapshot_roundtrip() { + let schema = Arc::new(Schema::new(vec![Field::new("a", DataType::Int32, false)])); + let col_a = col("a", &schema).unwrap(); + + // Create a dynamic filter with children + let expr = Arc::new(BinaryExpr::new( + Arc::clone(&col_a), + datafusion_expr::Operator::Gt, + lit(10) as Arc, + )); + let filter = DynamicFilterPhysicalExpr::new( + vec![Arc::clone(&col_a)], + expr as Arc, + ); + + // Update expression and mark complete + filter + .update(lit(42) as Arc) + .expect("Update should succeed"); + filter.mark_complete(); + + // Take a snapshot and reconstruct + let snapshot = DynamicFilterSnapshot::from(&filter); + let reconstructed = DynamicFilterPhysicalExpr::from(snapshot); + + // String representations should be equal + assert_eq!( + DynamicFilterSnapshot::from(&filter).to_string(), + DynamicFilterSnapshot::from(&reconstructed).to_string(), + ); + } + + #[tokio::test] + async fn test_with_new_children_preserves_shared_inner_state() { + // Create a source filter + let source = Arc::new(DynamicFilterPhysicalExpr::new( + vec![], + lit(42) as Arc, + )); + + // Create a target filter with different children + let schema = Arc::new(Schema::new(vec![Field::new("x", DataType::Int32, false)])); + let col_x = col("x", &schema).unwrap(); + // Rebuild a wrapper from the source filter using the target's children. + let combined = Arc::clone(&source) + .with_new_children(vec![Arc::clone(&col_x)]) + .unwrap(); + + let combined_expr_id = Arc::clone(&combined) + .expr_id(&[]) + .expect("combined filter should have an expr_id"); + let source_expr_id = Arc::clone(&source) + .expr_id(&[]) + .expect("source filter should have an expr_id"); + assert_ne!( + combined_expr_id.exact(), + source_expr_id.exact(), + "with_new_children should produce a distinct outer Arc identity", + ); + assert_eq!( + combined_expr_id.shallow(), + source_expr_id.shallow(), + "dynamic filters with shared inner state should have the same shallow identity", + ); + + let combined_dyn_filter = combined + .as_any() + .downcast_ref::() + .unwrap(); + + // Verify the children are unchanged. + assert_eq!( + format!("{:?}", combined.children()), + format!("{:?}", vec![col_x]), + "Combined filter's children should be unchanged" + ); + + // Verify inner expression changed to the one from source. + assert_eq!( + format!("{:?}", combined_dyn_filter.current().unwrap()), + format!("{:?}", lit(42)), + "Combined filter should have inner expression from source filter" + ); + + // Verify that completing one filter also completes the other. + let combined_binding = Arc::clone(&combined) as Arc; + #[expect(clippy::disallowed_methods)] + let wait_handle = tokio::task::spawn({ + async move { + let df = combined_binding + .as_any() + .downcast_ref::() + .unwrap(); + df.wait_complete().await; + format!("{:?}", df.current().unwrap()) + } + }); + source.update(lit(999) as Arc).unwrap(); + source.mark_complete(); + + // The linked filter should be notified via the shared watch + let result = tokio::time::timeout(std::time::Duration::from_secs(1), wait_handle) + .await + .expect( + "linked filter should have been notified of completion within timeout", + ) + .expect("task should not panic"); + + assert_eq!( + result, + format!("{:?}", lit(999)), + "linked filter should see source's updated inner expression" + ); + } } diff --git a/datafusion/physical-expr/src/expressions/mod.rs b/datafusion/physical-expr/src/expressions/mod.rs index 1e082355486f6..b40848828fa41 100644 --- a/datafusion/physical-expr/src/expressions/mod.rs +++ b/datafusion/physical-expr/src/expressions/mod.rs @@ -43,7 +43,7 @@ pub use case::{CaseExpr, case}; pub use cast::{CastExpr, cast}; pub use column::{Column, col, with_new_schema}; pub use datafusion_expr::utils::format_state_name; -pub use dynamic_filters::DynamicFilterPhysicalExpr; +pub use dynamic_filters::{DynamicFilterPhysicalExpr, DynamicFilterSnapshot}; pub use in_list::{InListExpr, in_list}; pub use is_not_null::{IsNotNullExpr, is_not_null}; pub use is_null::{IsNullExpr, is_null}; diff --git a/datafusion/proto/proto/datafusion.proto b/datafusion/proto/proto/datafusion.proto index 9464e85727d4f..afeca09e623f2 100644 --- a/datafusion/proto/proto/datafusion.proto +++ b/datafusion/proto/proto/datafusion.proto @@ -871,17 +871,18 @@ message PhysicalExtensionNode { } // physical expressions +message PhysicalExprId { + uint64 exact = 1; + optional uint64 shallow = 2; +} + message PhysicalExprNode { // Was date_time_interval_expr reserved 17; - // Unique identifier for this expression to do deduplication during deserialization. - // When serializing, this is set to a unique identifier for each combination of - // expression, process and serialization run. - // When deserializing, if this ID has been seen before, the cached Arc is returned - // instead of creating a new one, enabling reconstruction of referential integrity - // across serde roundtrips. - optional uint64 expr_id = 30; + // Unique identifier for this expression used during deserialization to restore + // referential integrity across serde roundtrips. + PhysicalExprId expr_id = 31; oneof ExprType { // column references @@ -920,9 +921,19 @@ message PhysicalExprNode { UnknownColumn unknown_column = 20; PhysicalHashExprNode hash_expr = 21; + + PhysicalDynamicFilterNode dynamic_filter = 22; } } +message PhysicalDynamicFilterNode { + repeated PhysicalExprNode children = 1; + repeated PhysicalExprNode remapped_children = 2; + uint64 generation = 3; + PhysicalExprNode inner_expr = 4; + bool is_complete = 5; +} + message PhysicalScalarUdfNode { string name = 1; repeated PhysicalExprNode args = 2; @@ -1474,4 +1485,4 @@ message AsyncFuncExecNode { message BufferExecNode { PhysicalPlanNode input = 1; uint64 capacity = 2; -} \ No newline at end of file +} diff --git a/datafusion/proto/src/generated/pbjson.rs b/datafusion/proto/src/generated/pbjson.rs index c81e2fabe1185..d007b146bd3b7 100644 --- a/datafusion/proto/src/generated/pbjson.rs +++ b/datafusion/proto/src/generated/pbjson.rs @@ -16514,6 +16514,278 @@ impl<'de> serde::Deserialize<'de> for PhysicalDateTimeIntervalExprNode { deserializer.deserialize_struct("datafusion.PhysicalDateTimeIntervalExprNode", FIELDS, GeneratedVisitor) } } +impl serde::Serialize for PhysicalDynamicFilterNode { + #[allow(deprecated)] + fn serialize(&self, serializer: S) -> std::result::Result + where + S: serde::Serializer, + { + use serde::ser::SerializeStruct; + let mut len = 0; + if !self.children.is_empty() { + len += 1; + } + if !self.remapped_children.is_empty() { + len += 1; + } + if self.generation != 0 { + len += 1; + } + if self.inner_expr.is_some() { + len += 1; + } + if self.is_complete { + len += 1; + } + let mut struct_ser = serializer.serialize_struct("datafusion.PhysicalDynamicFilterNode", len)?; + if !self.children.is_empty() { + struct_ser.serialize_field("children", &self.children)?; + } + if !self.remapped_children.is_empty() { + struct_ser.serialize_field("remappedChildren", &self.remapped_children)?; + } + if self.generation != 0 { + #[allow(clippy::needless_borrow)] + #[allow(clippy::needless_borrows_for_generic_args)] + struct_ser.serialize_field("generation", ToString::to_string(&self.generation).as_str())?; + } + if let Some(v) = self.inner_expr.as_ref() { + struct_ser.serialize_field("innerExpr", v)?; + } + if self.is_complete { + struct_ser.serialize_field("isComplete", &self.is_complete)?; + } + struct_ser.end() + } +} +impl<'de> serde::Deserialize<'de> for PhysicalDynamicFilterNode { + #[allow(deprecated)] + fn deserialize(deserializer: D) -> std::result::Result + where + D: serde::Deserializer<'de>, + { + const FIELDS: &[&str] = &[ + "children", + "remapped_children", + "remappedChildren", + "generation", + "inner_expr", + "innerExpr", + "is_complete", + "isComplete", + ]; + + #[allow(clippy::enum_variant_names)] + enum GeneratedField { + Children, + RemappedChildren, + Generation, + InnerExpr, + IsComplete, + } + impl<'de> serde::Deserialize<'de> for GeneratedField { + fn deserialize(deserializer: D) -> std::result::Result + where + D: serde::Deserializer<'de>, + { + struct GeneratedVisitor; + + impl serde::de::Visitor<'_> for GeneratedVisitor { + type Value = GeneratedField; + + fn expecting(&self, formatter: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + write!(formatter, "expected one of: {:?}", &FIELDS) + } + + #[allow(unused_variables)] + fn visit_str(self, value: &str) -> std::result::Result + where + E: serde::de::Error, + { + match value { + "children" => Ok(GeneratedField::Children), + "remappedChildren" | "remapped_children" => Ok(GeneratedField::RemappedChildren), + "generation" => Ok(GeneratedField::Generation), + "innerExpr" | "inner_expr" => Ok(GeneratedField::InnerExpr), + "isComplete" | "is_complete" => Ok(GeneratedField::IsComplete), + _ => Err(serde::de::Error::unknown_field(value, FIELDS)), + } + } + } + deserializer.deserialize_identifier(GeneratedVisitor) + } + } + struct GeneratedVisitor; + impl<'de> serde::de::Visitor<'de> for GeneratedVisitor { + type Value = PhysicalDynamicFilterNode; + + fn expecting(&self, formatter: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + formatter.write_str("struct datafusion.PhysicalDynamicFilterNode") + } + + fn visit_map(self, mut map_: V) -> std::result::Result + where + V: serde::de::MapAccess<'de>, + { + let mut children__ = None; + let mut remapped_children__ = None; + let mut generation__ = None; + let mut inner_expr__ = None; + let mut is_complete__ = None; + while let Some(k) = map_.next_key()? { + match k { + GeneratedField::Children => { + if children__.is_some() { + return Err(serde::de::Error::duplicate_field("children")); + } + children__ = Some(map_.next_value()?); + } + GeneratedField::RemappedChildren => { + if remapped_children__.is_some() { + return Err(serde::de::Error::duplicate_field("remappedChildren")); + } + remapped_children__ = Some(map_.next_value()?); + } + GeneratedField::Generation => { + if generation__.is_some() { + return Err(serde::de::Error::duplicate_field("generation")); + } + generation__ = + Some(map_.next_value::<::pbjson::private::NumberDeserialize<_>>()?.0) + ; + } + GeneratedField::InnerExpr => { + if inner_expr__.is_some() { + return Err(serde::de::Error::duplicate_field("innerExpr")); + } + inner_expr__ = map_.next_value()?; + } + GeneratedField::IsComplete => { + if is_complete__.is_some() { + return Err(serde::de::Error::duplicate_field("isComplete")); + } + is_complete__ = Some(map_.next_value()?); + } + } + } + Ok(PhysicalDynamicFilterNode { + children: children__.unwrap_or_default(), + remapped_children: remapped_children__.unwrap_or_default(), + generation: generation__.unwrap_or_default(), + inner_expr: inner_expr__, + is_complete: is_complete__.unwrap_or_default(), + }) + } + } + deserializer.deserialize_struct("datafusion.PhysicalDynamicFilterNode", FIELDS, GeneratedVisitor) + } +} +impl serde::Serialize for PhysicalExprId { + #[allow(deprecated)] + fn serialize(&self, serializer: S) -> std::result::Result + where + S: serde::Serializer, + { + use serde::ser::SerializeStruct; + let mut len = 0; + if self.exact != 0 { + len += 1; + } + if self.shallow.is_some() { + len += 1; + } + let mut struct_ser = serializer.serialize_struct("datafusion.PhysicalExprId", len)?; + if self.exact != 0 { + struct_ser.serialize_field("exact", ToString::to_string(&self.exact).as_str())?; + } + if let Some(v) = self.shallow.as_ref() { + struct_ser.serialize_field("shallow", ToString::to_string(&v).as_str())?; + } + struct_ser.end() + } +} +impl<'de> serde::Deserialize<'de> for PhysicalExprId { + #[allow(deprecated)] + fn deserialize(deserializer: D) -> std::result::Result + where + D: serde::Deserializer<'de>, + { + const FIELDS: &[&str] = &["exact", "shallow"]; + + enum GeneratedField { + Exact, + Shallow, + } + impl<'de> serde::Deserialize<'de> for GeneratedField { + fn deserialize(deserializer: D) -> std::result::Result + where + D: serde::Deserializer<'de>, + { + struct GeneratedVisitor; + impl serde::de::Visitor<'_> for GeneratedVisitor { + type Value = GeneratedField; + + fn expecting(&self, formatter: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + write!(formatter, "expected one of: {:?}", &FIELDS) + } + + fn visit_str(self, value: &str) -> std::result::Result + where + E: serde::de::Error, + { + match value { + "exact" => Ok(GeneratedField::Exact), + "shallow" => Ok(GeneratedField::Shallow), + _ => Err(serde::de::Error::unknown_field(value, FIELDS)), + } + } + } + deserializer.deserialize_identifier(GeneratedVisitor) + } + } + struct GeneratedVisitor; + impl<'de> serde::de::Visitor<'de> for GeneratedVisitor { + type Value = PhysicalExprId; + + fn expecting(&self, formatter: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + formatter.write_str("struct datafusion.PhysicalExprId") + } + + fn visit_map(self, mut map_: V) -> std::result::Result + where + V: serde::de::MapAccess<'de>, + { + let mut exact__ = None; + let mut shallow__ = None; + while let Some(k) = map_.next_key()? { + match k { + GeneratedField::Exact => { + if exact__.is_some() { + return Err(serde::de::Error::duplicate_field("exact")); + } + exact__ = + Some(map_.next_value::<::pbjson::private::NumberDeserialize<_>>()?.0) + ; + } + GeneratedField::Shallow => { + if shallow__.is_some() { + return Err(serde::de::Error::duplicate_field("shallow")); + } + shallow__ = + map_.next_value::<::std::option::Option<::pbjson::private::NumberDeserialize<_>>>()?.map(|x| x.0) + ; + } + } + } + Ok(PhysicalExprId { + exact: exact__.unwrap_or_default(), + shallow: shallow__, + }) + } + } + deserializer.deserialize_struct("datafusion.PhysicalExprId", FIELDS, GeneratedVisitor) + } +} impl serde::Serialize for PhysicalExprNode { #[allow(deprecated)] fn serialize(&self, serializer: S) -> std::result::Result @@ -16530,9 +16802,7 @@ impl serde::Serialize for PhysicalExprNode { } let mut struct_ser = serializer.serialize_struct("datafusion.PhysicalExprNode", len)?; if let Some(v) = self.expr_id.as_ref() { - #[allow(clippy::needless_borrow)] - #[allow(clippy::needless_borrows_for_generic_args)] - struct_ser.serialize_field("exprId", ToString::to_string(&v).as_str())?; + struct_ser.serialize_field("exprId", v)?; } if let Some(v) = self.expr_type.as_ref() { match v { @@ -16593,6 +16863,9 @@ impl serde::Serialize for PhysicalExprNode { physical_expr_node::ExprType::HashExpr(v) => { struct_ser.serialize_field("hashExpr", v)?; } + physical_expr_node::ExprType::DynamicFilter(v) => { + struct_ser.serialize_field("dynamicFilter", v)?; + } } } struct_ser.end() @@ -16639,6 +16912,8 @@ impl<'de> serde::Deserialize<'de> for PhysicalExprNode { "unknownColumn", "hash_expr", "hashExpr", + "dynamic_filter", + "dynamicFilter", ]; #[allow(clippy::enum_variant_names)] @@ -16663,6 +16938,7 @@ impl<'de> serde::Deserialize<'de> for PhysicalExprNode { Extension, UnknownColumn, HashExpr, + DynamicFilter, } impl<'de> serde::Deserialize<'de> for GeneratedField { fn deserialize(deserializer: D) -> std::result::Result @@ -16704,6 +16980,7 @@ impl<'de> serde::Deserialize<'de> for PhysicalExprNode { "extension" => Ok(GeneratedField::Extension), "unknownColumn" | "unknown_column" => Ok(GeneratedField::UnknownColumn), "hashExpr" | "hash_expr" => Ok(GeneratedField::HashExpr), + "dynamicFilter" | "dynamic_filter" => Ok(GeneratedField::DynamicFilter), _ => Err(serde::de::Error::unknown_field(value, FIELDS)), } } @@ -16731,9 +17008,7 @@ impl<'de> serde::Deserialize<'de> for PhysicalExprNode { if expr_id__.is_some() { return Err(serde::de::Error::duplicate_field("exprId")); } - expr_id__ = - map_.next_value::<::std::option::Option<::pbjson::private::NumberDeserialize<_>>>()?.map(|x| x.0) - ; + expr_id__ = map_.next_value()?; } GeneratedField::Column => { if expr_type__.is_some() { @@ -16866,6 +17141,13 @@ impl<'de> serde::Deserialize<'de> for PhysicalExprNode { return Err(serde::de::Error::duplicate_field("hashExpr")); } expr_type__ = map_.next_value::<::std::option::Option<_>>()?.map(physical_expr_node::ExprType::HashExpr) +; + } + GeneratedField::DynamicFilter => { + if expr_type__.is_some() { + return Err(serde::de::Error::duplicate_field("dynamicFilter")); + } + expr_type__ = map_.next_value::<::std::option::Option<_>>()?.map(physical_expr_node::ExprType::DynamicFilter) ; } } diff --git a/datafusion/proto/src/generated/prost.rs b/datafusion/proto/src/generated/prost.rs index ff9133b1ced5b..acf5433ac72f7 100644 --- a/datafusion/proto/src/generated/prost.rs +++ b/datafusion/proto/src/generated/prost.rs @@ -1305,19 +1305,23 @@ pub struct PhysicalExtensionNode { pub inputs: ::prost::alloc::vec::Vec, } /// physical expressions +#[derive(Clone, PartialEq, ::prost::Message)] +pub struct PhysicalExprId { + #[prost(uint64, tag = "1")] + pub exact: u64, + #[prost(uint64, optional, tag = "2")] + pub shallow: ::core::option::Option, +} + #[derive(Clone, PartialEq, ::prost::Message)] pub struct PhysicalExprNode { - /// Unique identifier for this expression to do deduplication during deserialization. - /// When serializing, this is set to a unique identifier for each combination of - /// expression, process and serialization run. - /// When deserializing, if this ID has been seen before, the cached Arc is returned - /// instead of creating a new one, enabling reconstruction of referential integrity - /// across serde roundtrips. - #[prost(uint64, optional, tag = "30")] - pub expr_id: ::core::option::Option, + /// Unique identifier for this expression used during deserialization to restore + /// referential integrity across serde roundtrips. + #[prost(message, optional, tag = "31")] + pub expr_id: ::core::option::Option, #[prost( oneof = "physical_expr_node::ExprType", - tags = "1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 14, 15, 16, 18, 19, 20, 21" + tags = "1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 14, 15, 16, 18, 19, 20, 21, 22" )] pub expr_type: ::core::option::Option, } @@ -1370,9 +1374,24 @@ pub mod physical_expr_node { UnknownColumn(super::UnknownColumn), #[prost(message, tag = "21")] HashExpr(super::PhysicalHashExprNode), + #[prost(message, tag = "22")] + DynamicFilter(::prost::alloc::boxed::Box), } } #[derive(Clone, PartialEq, ::prost::Message)] +pub struct PhysicalDynamicFilterNode { + #[prost(message, repeated, tag = "1")] + pub children: ::prost::alloc::vec::Vec, + #[prost(message, repeated, tag = "2")] + pub remapped_children: ::prost::alloc::vec::Vec, + #[prost(uint64, tag = "3")] + pub generation: u64, + #[prost(message, optional, boxed, tag = "4")] + pub inner_expr: ::core::option::Option<::prost::alloc::boxed::Box>, + #[prost(bool, tag = "5")] + pub is_complete: bool, +} +#[derive(Clone, PartialEq, ::prost::Message)] pub struct PhysicalScalarUdfNode { #[prost(string, tag = "1")] pub name: ::prost::alloc::string::String, diff --git a/datafusion/proto/src/physical_plan/from_proto.rs b/datafusion/proto/src/physical_plan/from_proto.rs index 73a347331cb6b..c19c0dbf6d92a 100644 --- a/datafusion/proto/src/physical_plan/from_proto.rs +++ b/datafusion/proto/src/physical_plan/from_proto.rs @@ -58,6 +58,9 @@ use super::{ use crate::logical_plan::{self}; use crate::protobuf::physical_expr_node::ExprType; use crate::{convert_required, protobuf}; +use datafusion_physical_expr::expressions::{ + DynamicFilterPhysicalExpr, DynamicFilterSnapshot, +}; impl From<&protobuf::PhysicalColumn> for Column { fn from(c: &protobuf::PhysicalColumn) -> Column { @@ -490,6 +493,48 @@ pub fn parse_physical_expr_with_converter( hash_expr.description.clone(), )) } + ExprType::DynamicFilter(dynamic_filter) => { + let children = parse_physical_exprs( + &dynamic_filter.children, + ctx, + input_schema, + codec, + proto_converter, + )?; + + let remapped_children = if !dynamic_filter.remapped_children.is_empty() { + Some(parse_physical_exprs( + &dynamic_filter.remapped_children, + ctx, + input_schema, + codec, + proto_converter, + )?) + } else { + None + }; + + let inner_expr = parse_required_physical_expr( + dynamic_filter.inner_expr.as_deref(), + ctx, + "inner_expr", + input_schema, + codec, + proto_converter, + )?; + + // Recreate filter from snapshot + let snapshot = DynamicFilterSnapshot::new( + children, + remapped_children, + dynamic_filter.generation, + inner_expr, + dynamic_filter.is_complete, + ); + let base_filter: Arc = + Arc::new(DynamicFilterPhysicalExpr::from(snapshot)); + base_filter + } ExprType::Extension(extension) => { let inputs: Vec> = extension .inputs diff --git a/datafusion/proto/src/physical_plan/mod.rs b/datafusion/proto/src/physical_plan/mod.rs index 4e37e9f9528e5..6fb5bbcf53cfa 100644 --- a/datafusion/proto/src/physical_plan/mod.rs +++ b/datafusion/proto/src/physical_plan/mod.rs @@ -19,7 +19,6 @@ use std::any::Any; use std::cell::RefCell; use std::collections::HashMap; use std::fmt::Debug; -use std::hash::{DefaultHasher, Hash, Hasher}; use std::sync::Arc; use arrow::compute::SortOptions; @@ -58,6 +57,7 @@ use datafusion_functions_table::generate_series::{ use datafusion_physical_expr::aggregate::{AggregateExprBuilder, AggregateFunctionExpr}; use datafusion_physical_expr::async_scalar_function::AsyncFuncExpr; use datafusion_physical_expr::{LexOrdering, LexRequirement, PhysicalExprRef}; +use datafusion_physical_expr_common::physical_expr::PhysicalExprId; use datafusion_physical_plan::aggregates::{ AggregateExec, AggregateMode, LimitOptions, PhysicalGroupBy, }; @@ -3793,6 +3793,18 @@ struct DataEncoderTuple { } pub struct DefaultPhysicalProtoConverter; + +fn from_proto_expr_id(expr_id: &protobuf::PhysicalExprId) -> Result { + Ok(PhysicalExprId::new(expr_id.exact, expr_id.shallow)) +} + +fn to_proto_expr_id(expr_id: PhysicalExprId) -> protobuf::PhysicalExprId { + protobuf::PhysicalExprId { + exact: expr_id.exact(), + shallow: expr_id.shallow(), + } +} + impl PhysicalProtoConverterExtension for DefaultPhysicalProtoConverter { fn proto_to_execution_plan( &self, @@ -3841,10 +3853,10 @@ impl PhysicalProtoConverterExtension for DefaultPhysicalProtoConverter { } } -/// Internal serializer that adds expr_id to expressions. -/// Created fresh for each serialization operation. +/// Internal serializer that makes distinct expr_ids for each serialization session. struct DeduplicatingSerializer { - /// Random salt combined with pointer addresses and process ID to create globally unique expr_ids. + /// Random salt for this serializer which gets hashed with expression ids to create + /// unique ids for this serialization session. session_id: u64, } @@ -3900,17 +3912,9 @@ impl PhysicalProtoConverterExtension for DeduplicatingSerializer { codec: &dyn PhysicalExtensionCodec, ) -> Result { let mut proto = serialize_physical_expr_with_converter(expr, codec, self)?; - - // Hash session_id, pointer address, and process ID together to create expr_id. - // - session_id: random per serializer, prevents collisions when merging serializations - // - ptr: unique address per Arc within a process - // - pid: prevents collisions if serializer is shared across processes - let mut hasher = DefaultHasher::new(); - self.session_id.hash(&mut hasher); - (Arc::as_ptr(expr) as *const () as u64).hash(&mut hasher); - std::process::id().hash(&mut hasher); - proto.expr_id = Some(hasher.finish()); - + proto.expr_id = Arc::clone(expr) + .expr_id(&[self.session_id]) + .map(to_proto_expr_id); Ok(proto) } } @@ -3919,7 +3923,7 @@ impl PhysicalProtoConverterExtension for DeduplicatingSerializer { /// Created fresh for each deserialization operation. #[derive(Default)] struct DeduplicatingDeserializer { - /// Cache mapping expr_id to deserialized expressions. + /// Cache mapping exact and shallow expr ids to deserialized expressions. cache: RefCell>>, } @@ -3954,24 +3958,40 @@ impl PhysicalProtoConverterExtension for DeduplicatingDeserializer { where Self: Sized, { - if let Some(expr_id) = proto.expr_id { - // Check cache first - if let Some(cached) = self.cache.borrow().get(&expr_id) { - return Ok(Arc::clone(cached)); + let expr_id = proto.expr_id.as_ref().map(from_proto_expr_id).transpose()?; + + // The expressions are exactly the same, so return the cached expr. + if let Some(expr_id) = expr_id.as_ref() + && let Some(cached) = self.cache.borrow().get(&expr_id.exact()) + { + return Ok(Arc::clone(cached)); + } + + // Cache miss, we must deserialize the expr. + let mut expr = + parse_physical_expr_with_converter(proto, ctx, input_schema, codec, self)?; + + if let Some(expr_id) = expr_id { + if let Some(shallow_id) = expr_id.shallow() { + if let Some(cached_expr) = self.cache.borrow().get(&shallow_id) { + // Cache hit on shallow id. Re-use the cached expr with children from the + // deserialized expr. + let children = expr.children().into_iter().cloned().collect(); + expr = Arc::clone(cached_expr).with_new_children(children)?; + } else { + // Cache miss on shallow id. Cache the expr. + self.cache + .borrow_mut() + .insert(shallow_id, Arc::clone(&expr)); + } } - // Deserialize and cache - let expr = parse_physical_expr_with_converter( - proto, - ctx, - input_schema, - codec, - self, - )?; - self.cache.borrow_mut().insert(expr_id, Arc::clone(&expr)); - Ok(expr) - } else { - parse_physical_expr_with_converter(proto, ctx, input_schema, codec, self) + + self.cache + .borrow_mut() + .insert(expr_id.exact(), Arc::clone(&expr)); } + + Ok(expr) } fn physical_expr_to_proto( @@ -3986,13 +4006,13 @@ impl PhysicalProtoConverterExtension for DeduplicatingDeserializer { /// A proto converter that adds expression deduplication during serialization /// and deserialization. /// -/// During serialization, each expression's Arc pointer address is XORed with a -/// random session_id to create a salted `expr_id`. This prevents cross-process -/// collisions when serialized plans are merged. +/// During serialization, each expression's identifier is salted with a random +/// session_id. This prevents cross-process collisions when serialized plans are merged. /// -/// During deserialization, expressions with the same `expr_id` share the same -/// Arc, reducing memory usage for plans with duplicate expressions (e.g., large -/// IN lists) and supporting correctly linking [`DynamicFilterPhysicalExpr`] instances. +/// During deserialization, expressions with the same `expr_id` are reconstructed +/// according to the id variant, reducing memory usage for plans with duplicate +/// expressions (e.g., large IN lists) and preserving referential integrity for +/// [`DynamicFilterPhysicalExpr`] instances. /// /// This converter is stateless - it creates internal serializers/deserializers /// on demand for each operation. diff --git a/datafusion/proto/src/physical_plan/to_proto.rs b/datafusion/proto/src/physical_plan/to_proto.rs index 990a54cf94c7a..d7e4952c89078 100644 --- a/datafusion/proto/src/physical_plan/to_proto.rs +++ b/datafusion/proto/src/physical_plan/to_proto.rs @@ -33,11 +33,12 @@ use datafusion_datasource_parquet::file_format::ParquetSink; use datafusion_expr::WindowFrame; use datafusion_physical_expr::ScalarFunctionExpr; use datafusion_physical_expr::window::{SlidingAggregateWindowExpr, StandardWindowExpr}; -use datafusion_physical_expr_common::physical_expr::snapshot_physical_expr; +use datafusion_physical_expr_common::physical_expr::PhysicalExprId; use datafusion_physical_expr_common::sort_expr::PhysicalSortExpr; use datafusion_physical_plan::expressions::{ - BinaryExpr, CaseExpr, CastExpr, Column, InListExpr, IsNotNullExpr, IsNullExpr, - LikeExpr, Literal, NegativeExpr, NotExpr, TryCastExpr, UnKnownColumn, + BinaryExpr, CaseExpr, CastExpr, Column, DynamicFilterPhysicalExpr, + DynamicFilterSnapshot, InListExpr, IsNotNullExpr, IsNullExpr, LikeExpr, Literal, + NegativeExpr, NotExpr, TryCastExpr, UnKnownColumn, }; use datafusion_physical_plan::joins::{HashExpr, HashTableLookupExpr}; use datafusion_physical_plan::udaf::AggregateFunctionExpr; @@ -53,6 +54,13 @@ use crate::protobuf::{ physical_aggregate_expr_node, physical_window_expr_node, }; +fn to_proto_expr_id(expr_id: PhysicalExprId) -> protobuf::PhysicalExprId { + protobuf::PhysicalExprId { + exact: expr_id.exact(), + shallow: expr_id.shallow(), + } +} + #[expect(clippy::needless_pass_by_value)] pub fn serialize_physical_aggr_expr( aggr_expr: Arc, @@ -72,6 +80,7 @@ pub fn serialize_physical_aggr_expr( codec.try_encode_udaf(aggr_expr.fun(), &mut buf)?; Ok(protobuf::PhysicalExprNode { expr_id: None, + expr_type: Some(protobuf::physical_expr_node::ExprType::AggregateExpr( protobuf::PhysicalAggregateExprNode { aggregate_function: Some(physical_aggregate_expr_node::AggregateFunction::UserDefinedAggrFunction(name)), @@ -257,11 +266,6 @@ pub fn serialize_physical_expr_with_converter( codec: &dyn PhysicalExtensionCodec, proto_converter: &dyn PhysicalProtoConverterExtension, ) -> Result { - // Snapshot the expr in case it has dynamic predicate state so - // it can be serialized - let value = snapshot_physical_expr(Arc::clone(value))?; - let expr = value.as_any(); - // HashTableLookupExpr is used for dynamic filter pushdown in hash joins. // It contains an Arc (the build-side hash table) which // cannot be serialized - the hash table is a runtime structure built during @@ -275,7 +279,11 @@ pub fn serialize_physical_expr_with_converter( // // In distributed execution, the remote worker won't have access to the hash // table anyway, so the best we can do is skip this optimization. - if expr.downcast_ref::().is_some() { + if value + .as_any() + .downcast_ref::() + .is_some() + { let value = datafusion_proto_common::ScalarValue { value: Some(datafusion_proto_common::scalar_value::Value::BoolValue( true, @@ -287,7 +295,7 @@ pub fn serialize_physical_expr_with_converter( }); } - if let Some(expr) = expr.downcast_ref::() { + if let Some(expr) = value.as_any().downcast_ref::() { Ok(protobuf::PhysicalExprNode { expr_id: None, expr_type: Some(protobuf::physical_expr_node::ExprType::Column( @@ -297,7 +305,7 @@ pub fn serialize_physical_expr_with_converter( }, )), }) - } else if let Some(expr) = expr.downcast_ref::() { + } else if let Some(expr) = value.as_any().downcast_ref::() { Ok(protobuf::PhysicalExprNode { expr_id: None, expr_type: Some(protobuf::physical_expr_node::ExprType::UnknownColumn( @@ -306,7 +314,7 @@ pub fn serialize_physical_expr_with_converter( }, )), }) - } else if let Some(expr) = expr.downcast_ref::() { + } else if let Some(expr) = value.as_any().downcast_ref::() { let binary_expr = Box::new(protobuf::PhysicalBinaryExprNode { l: Some(Box::new( proto_converter.physical_expr_to_proto(expr.left(), codec)?, @@ -323,7 +331,7 @@ pub fn serialize_physical_expr_with_converter( binary_expr, )), }) - } else if let Some(expr) = expr.downcast_ref::() { + } else if let Some(expr) = value.as_any().downcast_ref::() { Ok(protobuf::PhysicalExprNode { expr_id: None, expr_type: Some( @@ -366,7 +374,7 @@ pub fn serialize_physical_expr_with_converter( ), ), }) - } else if let Some(expr) = expr.downcast_ref::() { + } else if let Some(expr) = value.as_any().downcast_ref::() { Ok(protobuf::PhysicalExprNode { expr_id: None, expr_type: Some(protobuf::physical_expr_node::ExprType::NotExpr(Box::new( @@ -377,7 +385,7 @@ pub fn serialize_physical_expr_with_converter( }, ))), }) - } else if let Some(expr) = expr.downcast_ref::() { + } else if let Some(expr) = value.as_any().downcast_ref::() { Ok(protobuf::PhysicalExprNode { expr_id: None, expr_type: Some(protobuf::physical_expr_node::ExprType::IsNullExpr( @@ -388,7 +396,7 @@ pub fn serialize_physical_expr_with_converter( }), )), }) - } else if let Some(expr) = expr.downcast_ref::() { + } else if let Some(expr) = value.as_any().downcast_ref::() { Ok(protobuf::PhysicalExprNode { expr_id: None, expr_type: Some(protobuf::physical_expr_node::ExprType::IsNotNullExpr( @@ -399,7 +407,7 @@ pub fn serialize_physical_expr_with_converter( }), )), }) - } else if let Some(expr) = expr.downcast_ref::() { + } else if let Some(expr) = value.as_any().downcast_ref::() { Ok(protobuf::PhysicalExprNode { expr_id: None, expr_type: Some(protobuf::physical_expr_node::ExprType::InList(Box::new( @@ -412,7 +420,7 @@ pub fn serialize_physical_expr_with_converter( }, ))), }) - } else if let Some(expr) = expr.downcast_ref::() { + } else if let Some(expr) = value.as_any().downcast_ref::() { Ok(protobuf::PhysicalExprNode { expr_id: None, expr_type: Some(protobuf::physical_expr_node::ExprType::Negative(Box::new( @@ -423,14 +431,14 @@ pub fn serialize_physical_expr_with_converter( }, ))), }) - } else if let Some(lit) = expr.downcast_ref::() { + } else if let Some(lit) = value.as_any().downcast_ref::() { Ok(protobuf::PhysicalExprNode { expr_id: None, expr_type: Some(protobuf::physical_expr_node::ExprType::Literal( lit.value().try_into()?, )), }) - } else if let Some(cast) = expr.downcast_ref::() { + } else if let Some(cast) = value.as_any().downcast_ref::() { Ok(protobuf::PhysicalExprNode { expr_id: None, expr_type: Some(protobuf::physical_expr_node::ExprType::Cast(Box::new( @@ -442,7 +450,7 @@ pub fn serialize_physical_expr_with_converter( }, ))), }) - } else if let Some(cast) = expr.downcast_ref::() { + } else if let Some(cast) = value.as_any().downcast_ref::() { Ok(protobuf::PhysicalExprNode { expr_id: None, expr_type: Some(protobuf::physical_expr_node::ExprType::TryCast(Box::new( @@ -454,7 +462,7 @@ pub fn serialize_physical_expr_with_converter( }, ))), }) - } else if let Some(expr) = expr.downcast_ref::() { + } else if let Some(expr) = value.as_any().downcast_ref::() { let mut buf = Vec::new(); codec.try_encode_udf(expr.fun(), &mut buf)?; Ok(protobuf::PhysicalExprNode { @@ -473,7 +481,7 @@ pub fn serialize_physical_expr_with_converter( }, )), }) - } else if let Some(expr) = expr.downcast_ref::() { + } else if let Some(expr) = value.as_any().downcast_ref::() { Ok(protobuf::PhysicalExprNode { expr_id: None, expr_type: Some(protobuf::physical_expr_node::ExprType::LikeExpr(Box::new( @@ -489,7 +497,7 @@ pub fn serialize_physical_expr_with_converter( }, ))), }) - } else if let Some(expr) = expr.downcast_ref::() { + } else if let Some(expr) = value.as_any().downcast_ref::() { Ok(protobuf::PhysicalExprNode { expr_id: None, expr_type: Some(protobuf::physical_expr_node::ExprType::HashExpr( @@ -504,9 +512,44 @@ pub fn serialize_physical_expr_with_converter( }, )), }) + } else if let Some(df) = value.as_any().downcast_ref::() { + // Capture all state atomically, including the internal id. + let snapshot = DynamicFilterSnapshot::from(df); + + let children = snapshot + .children() + .iter() + .map(|child| proto_converter.physical_expr_to_proto(child, codec)) + .collect::>>()?; + + let remapped_children = if let Some(remapped) = snapshot.remapped_children() { + remapped + .iter() + .map(|child| proto_converter.physical_expr_to_proto(child, codec)) + .collect::>>()? + } else { + vec![] + }; + + let inner_expr = Box::new( + proto_converter.physical_expr_to_proto(snapshot.inner_expr(), codec)?, + ); + + Ok(protobuf::PhysicalExprNode { + expr_id: None, + expr_type: Some(protobuf::physical_expr_node::ExprType::DynamicFilter( + Box::new(protobuf::PhysicalDynamicFilterNode { + children, + remapped_children, + generation: snapshot.generation(), + inner_expr: Some(inner_expr), + is_complete: snapshot.is_complete(), + }), + )), + }) } else { let mut buf: Vec = vec![]; - match codec.try_encode_expr(&value, &mut buf) { + match codec.try_encode_expr(value, &mut buf) { Ok(_) => { let inputs: Vec = value .children() diff --git a/datafusion/proto/tests/cases/roundtrip_physical_plan.rs b/datafusion/proto/tests/cases/roundtrip_physical_plan.rs index 15639bcd25bdd..2c4a978a8577c 100644 --- a/datafusion/proto/tests/cases/roundtrip_physical_plan.rs +++ b/datafusion/proto/tests/cases/roundtrip_physical_plan.rs @@ -122,6 +122,7 @@ use datafusion_proto::physical_plan::{ PhysicalProtoConverterExtension, }; use datafusion_proto::protobuf; +use datafusion_proto::protobuf::physical_plan_node::PhysicalPlanType; use datafusion_proto::protobuf::{PhysicalExprNode, PhysicalPlanNode}; use prost::Message; @@ -130,6 +131,10 @@ use crate::cases::{ MyRegexUdfNode, }; +use datafusion_physical_expr::expressions::DynamicFilterPhysicalExpr; +use datafusion_physical_expr::utils::reassign_expr_columns; +use datafusion_physical_expr_common::physical_expr::PhysicalExprId; + /// Perform a serde roundtrip and assert that the string representation of the before and after plans /// are identical. Note that this often isn't sufficient to guarantee that no information is /// lost during serde because the string representation of a plan often only shows a subset of state. @@ -3035,6 +3040,376 @@ fn test_deduplication_within_expr_deserialization() -> Result<()> { Ok(()) } +/// Create a [`DynamicFilterPhysicalExpr`] with child column expression "a" @ index 0. +fn make_dynamic_filter() -> Arc { + Arc::new(DynamicFilterPhysicalExpr::new( + vec![Arc::new(Column::new("a", 0)) as Arc], + lit(true), + )) as Arc +} + +/// Update a [`DynamicFilterPhysicalExpr`]'s children to support child schema "b" @ 0, "a" @ 1. +fn make_reassigned_dynamic_filter( + filter: Arc, +) -> Result<(Arc, Arc)> { + let schema = Arc::new(Schema::new(vec![ + Field::new("b", DataType::Int64, false), + Field::new("a", DataType::Int64, false), + ])); + let reassigned = reassign_expr_columns(filter, &schema)?; + Ok((schema, reassigned)) +} + +/// Extract a [`PhysicalExprId`] from a [`PhysicalExpr`] proto. +fn proto_expr_id(expr: &PhysicalExprNode) -> PhysicalExprId { + let expr_id = expr + .expr_id + .as_ref() + .expect("physical expr should have expr_id"); + PhysicalExprId::new(expr_id.exact, expr_id.shallow) +} + +fn proto_binary_children( + expr: &PhysicalExprNode, +) -> (&PhysicalExprNode, &PhysicalExprNode) { + match expr.expr_type.as_ref() { + Some(protobuf::physical_expr_node::ExprType::BinaryExpr(binary)) => ( + binary + .l + .as_deref() + .expect("binary expr should have a left child"), + binary + .r + .as_deref() + .expect("binary expr should have a right child"), + ), + _ => panic!("Expected PhysicalExprNode::BinaryExpr"), + } +} + +fn roundtrip_dynamic_filter_expr_pair( + filter_expr_1: Arc, + filter_expr_2: Arc, + schema: Arc, +) -> Result<( + PhysicalExprId, + PhysicalExprId, + Arc, + Arc, +)> { + let pair_expr = Arc::new(BinaryExpr::new( + Arc::clone(&filter_expr_1), + Operator::And, + Arc::clone(&filter_expr_2), + )) as Arc; + + let codec = DefaultPhysicalExtensionCodec {}; + let converter = DeduplicatingProtoConverter {}; + let proto = converter.physical_expr_to_proto(&pair_expr, &codec)?; + let (left_proto, right_proto) = proto_binary_children(&proto); + + let left_proto_expr_id = proto_expr_id(left_proto); + let right_proto_expr_id = proto_expr_id(right_proto); + + let ctx = SessionContext::new(); + let deserialized_expr = converter.proto_to_physical_expr( + &proto, + ctx.task_ctx().as_ref(), + &schema, + &codec, + )?; + + let binary = deserialized_expr + .as_any() + .downcast_ref::() + .expect("Expected BinaryExpr"); + + Ok(( + left_proto_expr_id, + right_proto_expr_id, + Arc::clone(binary.left()), + Arc::clone(binary.right()), + )) +} + +fn roundtrip_dynamic_filter_plan_pair( + filter_expr_1: Arc, + filter_expr_2: Arc, + schema: Arc, +) -> Result<( + PhysicalExprId, + PhysicalExprId, + Arc, + Arc, +)> { + let empty_exec = Arc::new(EmptyExec::new(schema)) as Arc; + let filter_exec_1 = + Arc::new(FilterExec::try_new(Arc::clone(&filter_expr_1), empty_exec)?) + as Arc; + let filter_exec_2 = Arc::new(FilterExec::try_new( + Arc::clone(&filter_expr_2), + filter_exec_1, + )?) as Arc; + + let codec = DefaultPhysicalExtensionCodec {}; + let converter = DeduplicatingProtoConverter {}; + let proto = converter.execution_plan_to_proto(&filter_exec_2, &codec)?; + + let outer_filter = match &proto.physical_plan_type { + Some(PhysicalPlanType::Filter(outer_filter)) => outer_filter, + _ => panic!("Expected outer PhysicalPlanType::Filter"), + }; + let inner_filter = match &outer_filter.input { + Some(inner_input) => match &inner_input.physical_plan_type { + Some(PhysicalPlanType::Filter(inner_filter)) => inner_filter, + _ => panic!("Expected inner PhysicalPlanType::Filter"), + }, + _ => panic!("Expected inner input"), + }; + + let left_proto_expr_id = proto_expr_id( + inner_filter + .expr + .as_ref() + .expect("inner filter should have a predicate"), + ); + let right_proto_expr_id = proto_expr_id( + outer_filter + .expr + .as_ref() + .expect("outer filter should have a predicate"), + ); + + let ctx = SessionContext::new(); + let deserialized_plan = + converter.proto_to_execution_plan(ctx.task_ctx().as_ref(), &codec, &proto)?; + + let outer_filter = deserialized_plan + .as_any() + .downcast_ref::() + .expect("Expected outer FilterExec"); + let right_filter = Arc::clone(outer_filter.predicate()); + let inner_filter = outer_filter.children()[0] + .as_any() + .downcast_ref::() + .expect("Expected inner FilterExec"); + let left_filter = Arc::clone(inner_filter.predicate()); + + Ok(( + left_proto_expr_id, + right_proto_expr_id, + left_filter, + right_filter, + )) +} + +fn assert_exact_dedupe( + left_proto_expr_id: PhysicalExprId, + right_proto_expr_id: PhysicalExprId, + left_filter: &Arc, + right_filter: &Arc, +) -> Result<()> { + assert_eq!( + left_proto_expr_id.exact(), + right_proto_expr_id.exact(), + "arc-cloned dynamic filters should serialize with the same exact expr id", + ); + assert_eq!( + left_proto_expr_id.shallow(), + right_proto_expr_id.shallow(), + "arc-cloned dynamic filters should serialize with the same shallow expr id", + ); + let left_expr_id = Arc::clone(left_filter) + .expr_id(&[]) + .expect("deserialized filter should have expr_id"); + let right_expr_id = Arc::clone(right_filter) + .expr_id(&[]) + .expect("deserialized filter should have expr_id"); + assert_eq!(left_expr_id.exact(), right_expr_id.exact()); + assert_eq!(left_expr_id.shallow(), right_expr_id.shallow()); + assert_dynamic_filter_shared_state(left_filter, right_filter) +} + +fn assert_shallow_dedupe( + left_proto_expr_id: PhysicalExprId, + right_proto_expr_id: PhysicalExprId, + left_filter: &Arc, + right_filter: &Arc, +) -> Result<()> { + assert_ne!( + left_proto_expr_id.exact(), + right_proto_expr_id.exact(), + "rewritten dynamic filters should serialize with different exact expr ids", + ); + assert_eq!( + left_proto_expr_id.shallow(), + right_proto_expr_id.shallow(), + "rewritten dynamic filters should serialize with the same shallow expr id", + ); + let left_expr_id = Arc::clone(left_filter) + .expr_id(&[]) + .expect("deserialized filter should have expr_id"); + let right_expr_id = Arc::clone(right_filter) + .expr_id(&[]) + .expect("deserialized filter should have expr_id"); + assert_ne!(left_expr_id.exact(), right_expr_id.exact()); + assert_eq!(left_expr_id.shallow(), right_expr_id.shallow()); + assert_dynamic_filter_shared_state(left_filter, right_filter) +} + +fn assert_dynamic_filter_shared_state( + left_filter: &Arc, + right_filter: &Arc, +) -> Result<()> { + let left_filter = left_filter + .as_any() + .downcast_ref::() + .expect("Expected dynamic filter"); + let right_filter = right_filter + .as_any() + .downcast_ref::() + .expect("Expected dynamic filter"); + + left_filter.update(lit(123_i64))?; + + assert_eq!( + left_filter.snapshot_generation(), + right_filter.snapshot_generation(), + "linked dynamic filters should observe the same generation after an update", + ); + assert_eq!( + format!("{:?}", left_filter.current()?), + format!("{:?}", right_filter.current()?), + "updating one linked dynamic filter should update the other", + ); + + Ok(()) +} + +#[test] +fn test_dynamic_filter_roundtrip_dedupe() -> Result<()> { + let schema = Arc::new(Schema::new(vec![Field::new("a", DataType::Int64, false)])); + let filter_expr_1 = make_dynamic_filter(); + let filter_expr_2 = Arc::clone(&filter_expr_1); + + let (left_proto_expr_id, right_proto_expr_id, left_filter, right_filter) = + roundtrip_dynamic_filter_expr_pair(filter_expr_1, filter_expr_2, schema)?; + + assert_exact_dedupe( + left_proto_expr_id, + right_proto_expr_id, + &left_filter, + &right_filter, + )?; + + Ok(()) +} + +#[test] +fn test_dynamic_filter_roundtrip_dedupe_shallow_expr_id() -> Result<()> { + let filter_expr_1 = make_dynamic_filter(); + let (schema, filter_expr_2) = + make_reassigned_dynamic_filter(Arc::clone(&filter_expr_1))?; + + let filter_2_before_roundtrip = filter_expr_2 + .as_any() + .downcast_ref::() + .expect("Expected dynamic filter"); + let expected_children = format!("{:?}", filter_2_before_roundtrip.children()); + let expected_current = format!("{:?}", filter_2_before_roundtrip.current()?); + + let (left_proto_expr_id, right_proto_expr_id, left_filter, right_filter) = + roundtrip_dynamic_filter_expr_pair(filter_expr_1, filter_expr_2, schema)?; + + let right_filter_dynamic = right_filter + .as_any() + .downcast_ref::() + .expect("Expected dynamic filter"); + assert_eq!( + expected_children, + format!("{:?}", right_filter_dynamic.children()), + "reassigned filter should preserve its transformed children after roundtrip", + ); + assert_eq!( + expected_current, + format!("{:?}", right_filter_dynamic.current()?), + "reassigned filter should preserve its visible predicate after roundtrip", + ); + + assert_shallow_dedupe( + left_proto_expr_id, + right_proto_expr_id, + &left_filter, + &right_filter, + )?; + + Ok(()) +} + +#[test] +fn test_dynamic_filter_plan_roundtrip_dedupe() -> Result<()> { + let schema = Arc::new(Schema::new(vec![Field::new("a", DataType::Int64, false)])); + let filter_expr_1 = make_dynamic_filter(); + let filter_expr_2 = Arc::clone(&filter_expr_1); + + let (left_proto_expr_id, right_proto_expr_id, left_filter, right_filter) = + roundtrip_dynamic_filter_plan_pair(filter_expr_1, filter_expr_2, schema)?; + + assert_exact_dedupe( + left_proto_expr_id, + right_proto_expr_id, + &left_filter, + &right_filter, + )?; + + Ok(()) +} + +/// Rountrip test for an execution plan where there are multiple instances of a dynamic filter +/// with different children. +#[test] +fn test_dynamic_filter_plan_roundtrip_dedupe_shallow_expr_id() -> Result<()> { + // filter_expr_1: original children + // filter_expr_2: has remapped children + let filter_expr_1 = make_dynamic_filter(); + let (schema, filter_expr_2) = + make_reassigned_dynamic_filter(Arc::clone(&filter_expr_1))?; + + let filter_2_before_roundtrip = filter_expr_2 + .as_any() + .downcast_ref::() + .expect("Expected dynamic filter"); + let expected_children = format!("{:?}", filter_2_before_roundtrip.children()); + let expected_current = format!("{:?}", filter_2_before_roundtrip.current()?); + + let (left_proto_expr_id, right_proto_expr_id, left_filter, right_filter) = + roundtrip_dynamic_filter_plan_pair(filter_expr_1, filter_expr_2, schema)?; + + let right_filter_dynamic = right_filter + .as_any() + .downcast_ref::() + .expect("Expected dynamic filter"); + assert_eq!( + expected_children, + format!("{:?}", right_filter_dynamic.children()), + "reassigned plan filter should preserve its transformed children after roundtrip", + ); + assert_eq!( + expected_current, + format!("{:?}", right_filter_dynamic.current()?), + "reassigned plan filter should preserve its visible predicate after roundtrip", + ); + + assert_shallow_dedupe( + left_proto_expr_id, + right_proto_expr_id, + &left_filter, + &right_filter, + )?; + + Ok(()) +} + /// Test that session_id rotates between top-level serialization operations. /// This verifies that each top-level serialization gets a fresh session_id, /// which prevents cross-process collisions when serialized plans are merged. @@ -3051,13 +3426,13 @@ fn test_session_id_rotation_between_serializations() -> Result<()> { // First serialization let proto1 = proto_converter.physical_expr_to_proto(&col_expr, &codec)?; - let expr_id1 = proto1.expr_id.expect("Expected expr_id to be set"); + let expr_id1 = proto_expr_id(&proto1); // Second serialization with the same converter // The session_id should have rotated, so the expr_id should be different // even though we're serializing the same expression (same pointer address) let proto2 = proto_converter.physical_expr_to_proto(&col_expr, &codec)?; - let expr_id2 = proto2.expr_id.expect("Expected expr_id to be set"); + let expr_id2 = proto_expr_id(&proto2); // The expr_ids should be different because session_id rotated assert_ne!( From b2eac547a5378d74df5d97e8833208f10e5e9c6d Mon Sep 17 00:00:00 2001 From: Jayant Shrivastava Date: Sun, 12 Apr 2026 19:41:08 +0000 Subject: [PATCH 2/6] wip --- .../src/expressions/dynamic_filters.rs | 76 +--- .../proto/src/physical_plan/to_proto.rs | 8 - .../tests/cases/roundtrip_physical_plan.rs | 381 +++++++----------- 3 files changed, 175 insertions(+), 290 deletions(-) diff --git a/datafusion/physical-expr/src/expressions/dynamic_filters.rs b/datafusion/physical-expr/src/expressions/dynamic_filters.rs index 2a408e86a8127..926b08e067b7d 100644 --- a/datafusion/physical-expr/src/expressions/dynamic_filters.rs +++ b/datafusion/physical-expr/src/expressions/dynamic_filters.rs @@ -1021,85 +1021,53 @@ mod test { } #[tokio::test] - async fn test_with_new_children_preserves_shared_inner_state() { + async fn test_expr_id() { // Create a source filter let source = Arc::new(DynamicFilterPhysicalExpr::new( vec![], lit(42) as Arc, )); + let source_clone = Arc::clone(&source); - // Create a target filter with different children + // Create a derived filter with different children let schema = Arc::new(Schema::new(vec![Field::new("x", DataType::Int32, false)])); let col_x = col("x", &schema).unwrap(); - // Rebuild a wrapper from the source filter using the target's children. - let combined = Arc::clone(&source) + let derived = Arc::clone(&source) .with_new_children(vec![Arc::clone(&col_x)]) .unwrap(); - let combined_expr_id = Arc::clone(&combined) + let derived_expr_id = Arc::clone(&derived) .expr_id(&[]) .expect("combined filter should have an expr_id"); let source_expr_id = Arc::clone(&source) .expr_id(&[]) .expect("source filter should have an expr_id"); - assert_ne!( - combined_expr_id.exact(), - source_expr_id.exact(), - "with_new_children should produce a distinct outer Arc identity", - ); - assert_eq!( - combined_expr_id.shallow(), - source_expr_id.shallow(), - "dynamic filters with shared inner state should have the same shallow identity", - ); - - let combined_dyn_filter = combined - .as_any() - .downcast_ref::() - .unwrap(); + let source_clone_expr_id = Arc::clone(&source_clone) + .expr_id(&[]) + .expect("source clone should have an expr_id"); - // Verify the children are unchanged. assert_eq!( - format!("{:?}", combined.children()), - format!("{:?}", vec![col_x]), - "Combined filter's children should be unchanged" + source_clone_expr_id.exact(), + source_expr_id.exact(), + "cloned filter should have the same exact id because the outer Arc is the same", ); - // Verify inner expression changed to the one from source. assert_eq!( - format!("{:?}", combined_dyn_filter.current().unwrap()), - format!("{:?}", lit(42)), - "Combined filter should have inner expression from source filter" + source_clone_expr_id.shallow(), + source_expr_id.shallow(), + "cloned filter should have the same shallow id because the inner state is the same", ); - // Verify that completing one filter also completes the other. - let combined_binding = Arc::clone(&combined) as Arc; - #[expect(clippy::disallowed_methods)] - let wait_handle = tokio::task::spawn({ - async move { - let df = combined_binding - .as_any() - .downcast_ref::() - .unwrap(); - df.wait_complete().await; - format!("{:?}", df.current().unwrap()) - } - }); - source.update(lit(999) as Arc).unwrap(); - source.mark_complete(); - - // The linked filter should be notified via the shared watch - let result = tokio::time::timeout(std::time::Duration::from_secs(1), wait_handle) - .await - .expect( - "linked filter should have been notified of completion within timeout", - ) - .expect("task should not panic"); + assert_ne!( + derived_expr_id.exact(), + source_expr_id.exact(), + "filters should have different exact ids because the children are different", + ); assert_eq!( - result, - format!("{:?}", lit(999)), - "linked filter should see source's updated inner expression" + derived_expr_id.shallow(), + source_expr_id.shallow(), + "filters should have the same shallow id because they are the same expression", ); } } diff --git a/datafusion/proto/src/physical_plan/to_proto.rs b/datafusion/proto/src/physical_plan/to_proto.rs index d7e4952c89078..03bda535fc611 100644 --- a/datafusion/proto/src/physical_plan/to_proto.rs +++ b/datafusion/proto/src/physical_plan/to_proto.rs @@ -33,7 +33,6 @@ use datafusion_datasource_parquet::file_format::ParquetSink; use datafusion_expr::WindowFrame; use datafusion_physical_expr::ScalarFunctionExpr; use datafusion_physical_expr::window::{SlidingAggregateWindowExpr, StandardWindowExpr}; -use datafusion_physical_expr_common::physical_expr::PhysicalExprId; use datafusion_physical_expr_common::sort_expr::PhysicalSortExpr; use datafusion_physical_plan::expressions::{ BinaryExpr, CaseExpr, CastExpr, Column, DynamicFilterPhysicalExpr, @@ -54,13 +53,6 @@ use crate::protobuf::{ physical_aggregate_expr_node, physical_window_expr_node, }; -fn to_proto_expr_id(expr_id: PhysicalExprId) -> protobuf::PhysicalExprId { - protobuf::PhysicalExprId { - exact: expr_id.exact(), - shallow: expr_id.shallow(), - } -} - #[expect(clippy::needless_pass_by_value)] pub fn serialize_physical_aggr_expr( aggr_expr: Arc, diff --git a/datafusion/proto/tests/cases/roundtrip_physical_plan.rs b/datafusion/proto/tests/cases/roundtrip_physical_plan.rs index 2c4a978a8577c..a5bacb4008861 100644 --- a/datafusion/proto/tests/cases/roundtrip_physical_plan.rs +++ b/datafusion/proto/tests/cases/roundtrip_physical_plan.rs @@ -122,7 +122,7 @@ use datafusion_proto::physical_plan::{ PhysicalProtoConverterExtension, }; use datafusion_proto::protobuf; -use datafusion_proto::protobuf::physical_plan_node::PhysicalPlanType; +use datafusion_proto::protobuf::physical_expr_node::ExprType; use datafusion_proto::protobuf::{PhysicalExprNode, PhysicalPlanNode}; use prost::Message; @@ -131,7 +131,9 @@ use crate::cases::{ MyRegexUdfNode, }; -use datafusion_physical_expr::expressions::DynamicFilterPhysicalExpr; +use datafusion_physical_expr::expressions::{ + DynamicFilterPhysicalExpr, DynamicFilterSnapshot, +}; use datafusion_physical_expr::utils::reassign_expr_columns; use datafusion_physical_expr_common::physical_expr::PhysicalExprId; @@ -3069,34 +3071,21 @@ fn proto_expr_id(expr: &PhysicalExprNode) -> PhysicalExprId { PhysicalExprId::new(expr_id.exact, expr_id.shallow) } -fn proto_binary_children( - expr: &PhysicalExprNode, -) -> (&PhysicalExprNode, &PhysicalExprNode) { - match expr.expr_type.as_ref() { - Some(protobuf::physical_expr_node::ExprType::BinaryExpr(binary)) => ( - binary - .l - .as_deref() - .expect("binary expr should have a left child"), - binary - .r - .as_deref() - .expect("binary expr should have a right child"), - ), - _ => panic!("Expected PhysicalExprNode::BinaryExpr"), - } -} - +/// Roundtrip a single physical expression shaped like: +/// +/// ```text +/// BinaryExpr(AND) +/// / \ +/// dynamic_filter_1 dynamic_filter_2 +/// ``` +/// +/// The resulting proto contains both dynamic filters under one serialized expr +/// tree, which lets the test inspect expr-id deduplication directly. fn roundtrip_dynamic_filter_expr_pair( filter_expr_1: Arc, filter_expr_2: Arc, schema: Arc, -) -> Result<( - PhysicalExprId, - PhysicalExprId, - Arc, - Arc, -)> { +) -> Result<(Arc, Arc)> { let pair_expr = Arc::new(BinaryExpr::new( Arc::clone(&filter_expr_1), Operator::And, @@ -3106,10 +3095,19 @@ fn roundtrip_dynamic_filter_expr_pair( let codec = DefaultPhysicalExtensionCodec {}; let converter = DeduplicatingProtoConverter {}; let proto = converter.physical_expr_to_proto(&pair_expr, &codec)?; - let (left_proto, right_proto) = proto_binary_children(&proto); - - let left_proto_expr_id = proto_expr_id(left_proto); - let right_proto_expr_id = proto_expr_id(right_proto); + match proto.expr_type.as_ref() { + Some(ExprType::BinaryExpr(binary)) => { + binary + .l + .as_deref() + .expect("binary expr should have a left child"); + binary + .r + .as_deref() + .expect("binary expr should have a right child"); + } + _ => panic!("Expected PhysicalExprNode::BinaryExpr"), + } let ctx = SessionContext::new(); let deserialized_expr = converter.proto_to_physical_expr( @@ -3124,140 +3122,98 @@ fn roundtrip_dynamic_filter_expr_pair( .downcast_ref::() .expect("Expected BinaryExpr"); - Ok(( - left_proto_expr_id, - right_proto_expr_id, - Arc::clone(binary.left()), - Arc::clone(binary.right()), - )) + Ok((Arc::clone(binary.left()), Arc::clone(binary.right()))) } -fn roundtrip_dynamic_filter_plan_pair( - filter_expr_1: Arc, - filter_expr_2: Arc, - schema: Arc, -) -> Result<( - PhysicalExprId, - PhysicalExprId, +/// Roundtrip an execution plan shaped like: +/// +/// ```text +/// FilterExec(dynamic_filter_1 on a@0) +/// ProjectionExec(a := Column("a", source_index)) +/// DataSourceExec +/// ParquetSource(predicate = dynamic_filter_2) +/// ``` +/// +/// The pushed-down dynamic filter has been rewritten with +/// `reassign_expr_columns`, so the source schema is `b@0, a@1` while the +/// consumer-side filter above the projection still runs against `a@0`. +fn roundtrip_dynamic_filter_plan_pair() -> Result<( + Arc, + Arc, Arc, Arc, )> { - let empty_exec = Arc::new(EmptyExec::new(schema)) as Arc; - let filter_exec_1 = - Arc::new(FilterExec::try_new(Arc::clone(&filter_expr_1), empty_exec)?) - as Arc; - let filter_exec_2 = Arc::new(FilterExec::try_new( - Arc::clone(&filter_expr_2), - filter_exec_1, + let filter_expr_1 = make_dynamic_filter(); + let (data_source_schema, filter_expr_2) = + make_reassigned_dynamic_filter(Arc::clone(&filter_expr_1))?; + let left_before = Arc::clone(&filter_expr_1); + let right_before = Arc::clone(&filter_expr_2); + let file_source = Arc::new( + ParquetSource::new(Arc::clone(&data_source_schema)) + .with_predicate(Arc::clone(&filter_expr_2)), + ); + let scan_config = + FileScanConfigBuilder::new(ObjectStoreUrl::local_filesystem(), file_source) + .with_file_groups(vec![FileGroup::new(vec![PartitionedFile::new( + "/path/to/file.parquet".to_string(), + 1024, + )])]) + .build(); + let data_source_exec = + DataSourceExec::from_data_source(scan_config) as Arc; + + let projection_exec = Arc::new(ProjectionExec::try_new( + vec![ProjectionExpr { + expr: Arc::new(Column::new("a", 1)) as Arc, + alias: "a".to_string(), + }], + data_source_exec, + )?) as Arc; + let filter_exec = Arc::new(FilterExec::try_new( + Arc::clone(&filter_expr_1), + projection_exec, )?) as Arc; let codec = DefaultPhysicalExtensionCodec {}; let converter = DeduplicatingProtoConverter {}; - let proto = converter.execution_plan_to_proto(&filter_exec_2, &codec)?; - - let outer_filter = match &proto.physical_plan_type { - Some(PhysicalPlanType::Filter(outer_filter)) => outer_filter, - _ => panic!("Expected outer PhysicalPlanType::Filter"), - }; - let inner_filter = match &outer_filter.input { - Some(inner_input) => match &inner_input.physical_plan_type { - Some(PhysicalPlanType::Filter(inner_filter)) => inner_filter, - _ => panic!("Expected inner PhysicalPlanType::Filter"), - }, - _ => panic!("Expected inner input"), - }; - - let left_proto_expr_id = proto_expr_id( - inner_filter - .expr - .as_ref() - .expect("inner filter should have a predicate"), - ); - let right_proto_expr_id = proto_expr_id( - outer_filter - .expr - .as_ref() - .expect("outer filter should have a predicate"), - ); + let proto = converter.execution_plan_to_proto(&filter_exec, &codec)?; let ctx = SessionContext::new(); let deserialized_plan = converter.proto_to_execution_plan(ctx.task_ctx().as_ref(), &codec, &proto)?; let outer_filter = deserialized_plan + .as_ref() .as_any() .downcast_ref::() .expect("Expected outer FilterExec"); - let right_filter = Arc::clone(outer_filter.predicate()); - let inner_filter = outer_filter.children()[0] + let left_filter = Arc::clone(outer_filter.predicate()); + let projection = outer_filter.children()[0] .as_any() - .downcast_ref::() - .expect("Expected inner FilterExec"); - let left_filter = Arc::clone(inner_filter.predicate()); + .downcast_ref::() + .expect("Expected ProjectionExec"); + let data_source = projection + .input() + .as_any() + .downcast_ref::() + .expect("Expected DataSourceExec"); + let scan_config = data_source + .data_source() + .as_any() + .downcast_ref::() + .expect("Expected FileScanConfig"); + let right_filter = scan_config + .file_source() + .filter() + .expect("Expected pushed-down predicate"); - Ok(( - left_proto_expr_id, - right_proto_expr_id, - left_filter, - right_filter, - )) + Ok((left_before, right_before, left_filter, right_filter)) } -fn assert_exact_dedupe( - left_proto_expr_id: PhysicalExprId, - right_proto_expr_id: PhysicalExprId, - left_filter: &Arc, - right_filter: &Arc, -) -> Result<()> { - assert_eq!( - left_proto_expr_id.exact(), - right_proto_expr_id.exact(), - "arc-cloned dynamic filters should serialize with the same exact expr id", - ); - assert_eq!( - left_proto_expr_id.shallow(), - right_proto_expr_id.shallow(), - "arc-cloned dynamic filters should serialize with the same shallow expr id", - ); - let left_expr_id = Arc::clone(left_filter) - .expr_id(&[]) - .expect("deserialized filter should have expr_id"); - let right_expr_id = Arc::clone(right_filter) - .expr_id(&[]) - .expect("deserialized filter should have expr_id"); - assert_eq!(left_expr_id.exact(), right_expr_id.exact()); - assert_eq!(left_expr_id.shallow(), right_expr_id.shallow()); - assert_dynamic_filter_shared_state(left_filter, right_filter) -} - -fn assert_shallow_dedupe( - left_proto_expr_id: PhysicalExprId, - right_proto_expr_id: PhysicalExprId, - left_filter: &Arc, - right_filter: &Arc, -) -> Result<()> { - assert_ne!( - left_proto_expr_id.exact(), - right_proto_expr_id.exact(), - "rewritten dynamic filters should serialize with different exact expr ids", - ); - assert_eq!( - left_proto_expr_id.shallow(), - right_proto_expr_id.shallow(), - "rewritten dynamic filters should serialize with the same shallow expr id", - ); - let left_expr_id = Arc::clone(left_filter) - .expr_id(&[]) - .expect("deserialized filter should have expr_id"); - let right_expr_id = Arc::clone(right_filter) - .expr_id(&[]) - .expect("deserialized filter should have expr_id"); - assert_ne!(left_expr_id.exact(), right_expr_id.exact()); - assert_eq!(left_expr_id.shallow(), right_expr_id.shallow()); - assert_dynamic_filter_shared_state(left_filter, right_filter) -} - -fn assert_dynamic_filter_shared_state( +/// Takes two [`DynamicFilterPhysicalExpr`] and asserts that updates to one are visible +/// via the other. This helps assert that referential integrity is maintained after +/// deserializing. +fn assert_dynamic_filter_update_is_visible( left_filter: &Arc, right_filter: &Arc, ) -> Result<()> { @@ -3286,81 +3242,56 @@ fn assert_dynamic_filter_shared_state( Ok(()) } +fn assert_dynamic_filter_snapshot_matches( + expected: &Arc, + actual: &Arc, +) { + let expected = expected + .as_any() + .downcast_ref::() + .expect("Expected dynamic filter"); + let actual = actual + .as_any() + .downcast_ref::() + .expect("Expected dynamic filter"); + + assert_eq!( + DynamicFilterSnapshot::from(expected).to_string(), + DynamicFilterSnapshot::from(actual).to_string(), + ); +} + #[test] fn test_dynamic_filter_roundtrip_dedupe() -> Result<()> { let schema = Arc::new(Schema::new(vec![Field::new("a", DataType::Int64, false)])); let filter_expr_1 = make_dynamic_filter(); let filter_expr_2 = Arc::clone(&filter_expr_1); - let (left_proto_expr_id, right_proto_expr_id, left_filter, right_filter) = + let (left_filter, right_filter) = roundtrip_dynamic_filter_expr_pair(filter_expr_1, filter_expr_2, schema)?; - assert_exact_dedupe( - left_proto_expr_id, - right_proto_expr_id, - &left_filter, - &right_filter, - )?; + assert_dynamic_filter_snapshot_matches(&left_filter, &right_filter); + assert_dynamic_filter_update_is_visible(&left_filter, &right_filter)?; Ok(()) } #[test] -fn test_dynamic_filter_roundtrip_dedupe_shallow_expr_id() -> Result<()> { +fn test_dynamic_filter_roundtrip_dedupe_shallow() -> Result<()> { let filter_expr_1 = make_dynamic_filter(); + // Derive a new filter from the original by changing the children. let (schema, filter_expr_2) = make_reassigned_dynamic_filter(Arc::clone(&filter_expr_1))?; - let filter_2_before_roundtrip = filter_expr_2 - .as_any() - .downcast_ref::() - .expect("Expected dynamic filter"); - let expected_children = format!("{:?}", filter_2_before_roundtrip.children()); - let expected_current = format!("{:?}", filter_2_before_roundtrip.current()?); - - let (left_proto_expr_id, right_proto_expr_id, left_filter, right_filter) = + let filter_1_before_roundtrip = Arc::clone(&filter_expr_1); + let filter_2_before_roundtrip_expr = Arc::clone(&filter_expr_2); + let (left_filter, right_filter) = roundtrip_dynamic_filter_expr_pair(filter_expr_1, filter_expr_2, schema)?; - let right_filter_dynamic = right_filter - .as_any() - .downcast_ref::() - .expect("Expected dynamic filter"); - assert_eq!( - expected_children, - format!("{:?}", right_filter_dynamic.children()), - "reassigned filter should preserve its transformed children after roundtrip", - ); - assert_eq!( - expected_current, - format!("{:?}", right_filter_dynamic.current()?), - "reassigned filter should preserve its visible predicate after roundtrip", - ); - - assert_shallow_dedupe( - left_proto_expr_id, - right_proto_expr_id, - &left_filter, - &right_filter, - )?; - - Ok(()) -} - -#[test] -fn test_dynamic_filter_plan_roundtrip_dedupe() -> Result<()> { - let schema = Arc::new(Schema::new(vec![Field::new("a", DataType::Int64, false)])); - let filter_expr_1 = make_dynamic_filter(); - let filter_expr_2 = Arc::clone(&filter_expr_1); - - let (left_proto_expr_id, right_proto_expr_id, left_filter, right_filter) = - roundtrip_dynamic_filter_plan_pair(filter_expr_1, filter_expr_2, schema)?; + assert_dynamic_filter_snapshot_matches(&filter_1_before_roundtrip, &left_filter); + assert_dynamic_filter_snapshot_matches(&filter_2_before_roundtrip_expr, &right_filter); - assert_exact_dedupe( - left_proto_expr_id, - right_proto_expr_id, - &left_filter, - &right_filter, - )?; + assert_dynamic_filter_update_is_visible(&left_filter, &right_filter)?; Ok(()) } @@ -3369,43 +3300,37 @@ fn test_dynamic_filter_plan_roundtrip_dedupe() -> Result<()> { /// with different children. #[test] fn test_dynamic_filter_plan_roundtrip_dedupe_shallow_expr_id() -> Result<()> { - // filter_expr_1: original children - // filter_expr_2: has remapped children - let filter_expr_1 = make_dynamic_filter(); - let (schema, filter_expr_2) = - make_reassigned_dynamic_filter(Arc::clone(&filter_expr_1))?; - - let filter_2_before_roundtrip = filter_expr_2 - .as_any() - .downcast_ref::() - .expect("Expected dynamic filter"); - let expected_children = format!("{:?}", filter_2_before_roundtrip.children()); - let expected_current = format!("{:?}", filter_2_before_roundtrip.current()?); - - let (left_proto_expr_id, right_proto_expr_id, left_filter, right_filter) = - roundtrip_dynamic_filter_plan_pair(filter_expr_1, filter_expr_2, schema)?; + let ( + _left_filter_before_roundtrip, + right_filter_before_roundtrip, + left_filter, + right_filter, + ) = roundtrip_dynamic_filter_plan_pair()?; - let right_filter_dynamic = right_filter - .as_any() - .downcast_ref::() - .expect("Expected dynamic filter"); assert_eq!( - expected_children, - format!("{:?}", right_filter_dynamic.children()), - "reassigned plan filter should preserve its transformed children after roundtrip", + "DynamicFilterSnapshot { children: [Column { name: \"a\", index: 0 }], remapped_children: Some([Column { name: \"a\", index: 1 }]), generation: 1, inner_expr: Literal { value: Boolean(true), field: Field { name: \"lit\", data_type: Boolean } }, is_complete: false }", + DynamicFilterSnapshot::from( + right_filter_before_roundtrip + .as_any() + .downcast_ref::() + .expect("Expected dynamic filter"), + ) + .to_string(), + "expected the pushed-down filter to start with remapped children against the scan schema", ); assert_eq!( - expected_current, - format!("{:?}", right_filter_dynamic.current()?), - "reassigned plan filter should preserve its visible predicate after roundtrip", + "DynamicFilterSnapshot { children: [Column { name: \"a\", index: 0 }], remapped_children: Some([Column { name: \"a\", index: 1 }]), generation: 1, inner_expr: Literal { value: Boolean(true), field: Field { name: \"lit\", data_type: Boolean } }, is_complete: false }", + DynamicFilterSnapshot::from( + right_filter + .as_any() + .downcast_ref::() + .expect("Expected dynamic filter"), + ) + .to_string(), + "expected roundtrip to preserve the scan-side remapped children", ); - assert_shallow_dedupe( - left_proto_expr_id, - right_proto_expr_id, - &left_filter, - &right_filter, - )?; + assert_dynamic_filter_update_is_visible(&left_filter, &right_filter)?; Ok(()) } From fc72ecab3e456fc1494176ca1df506c9851aee38 Mon Sep 17 00:00:00 2001 From: Jayant Shrivastava Date: Sun, 12 Apr 2026 19:47:57 +0000 Subject: [PATCH 3/6] wip --- .../proto/src/physical_plan/to_proto.rs | 40 +++++++++---------- .../tests/cases/roundtrip_physical_plan.rs | 8 +++- 2 files changed, 25 insertions(+), 23 deletions(-) diff --git a/datafusion/proto/src/physical_plan/to_proto.rs b/datafusion/proto/src/physical_plan/to_proto.rs index 03bda535fc611..aee564a529ca2 100644 --- a/datafusion/proto/src/physical_plan/to_proto.rs +++ b/datafusion/proto/src/physical_plan/to_proto.rs @@ -72,7 +72,6 @@ pub fn serialize_physical_aggr_expr( codec.try_encode_udaf(aggr_expr.fun(), &mut buf)?; Ok(protobuf::PhysicalExprNode { expr_id: None, - expr_type: Some(protobuf::physical_expr_node::ExprType::AggregateExpr( protobuf::PhysicalAggregateExprNode { aggregate_function: Some(physical_aggregate_expr_node::AggregateFunction::UserDefinedAggrFunction(name)), @@ -258,6 +257,7 @@ pub fn serialize_physical_expr_with_converter( codec: &dyn PhysicalExtensionCodec, proto_converter: &dyn PhysicalProtoConverterExtension, ) -> Result { + let expr = value.as_any(); // HashTableLookupExpr is used for dynamic filter pushdown in hash joins. // It contains an Arc (the build-side hash table) which // cannot be serialized - the hash table is a runtime structure built during @@ -271,11 +271,7 @@ pub fn serialize_physical_expr_with_converter( // // In distributed execution, the remote worker won't have access to the hash // table anyway, so the best we can do is skip this optimization. - if value - .as_any() - .downcast_ref::() - .is_some() - { + if expr.downcast_ref::().is_some() { let value = datafusion_proto_common::ScalarValue { value: Some(datafusion_proto_common::scalar_value::Value::BoolValue( true, @@ -287,7 +283,7 @@ pub fn serialize_physical_expr_with_converter( }); } - if let Some(expr) = value.as_any().downcast_ref::() { + if let Some(expr) = expr.downcast_ref::() { Ok(protobuf::PhysicalExprNode { expr_id: None, expr_type: Some(protobuf::physical_expr_node::ExprType::Column( @@ -297,7 +293,7 @@ pub fn serialize_physical_expr_with_converter( }, )), }) - } else if let Some(expr) = value.as_any().downcast_ref::() { + } else if let Some(expr) = expr.downcast_ref::() { Ok(protobuf::PhysicalExprNode { expr_id: None, expr_type: Some(protobuf::physical_expr_node::ExprType::UnknownColumn( @@ -306,7 +302,7 @@ pub fn serialize_physical_expr_with_converter( }, )), }) - } else if let Some(expr) = value.as_any().downcast_ref::() { + } else if let Some(expr) = expr.downcast_ref::() { let binary_expr = Box::new(protobuf::PhysicalBinaryExprNode { l: Some(Box::new( proto_converter.physical_expr_to_proto(expr.left(), codec)?, @@ -323,7 +319,7 @@ pub fn serialize_physical_expr_with_converter( binary_expr, )), }) - } else if let Some(expr) = value.as_any().downcast_ref::() { + } else if let Some(expr) = expr.downcast_ref::() { Ok(protobuf::PhysicalExprNode { expr_id: None, expr_type: Some( @@ -366,7 +362,7 @@ pub fn serialize_physical_expr_with_converter( ), ), }) - } else if let Some(expr) = value.as_any().downcast_ref::() { + } else if let Some(expr) = expr.downcast_ref::() { Ok(protobuf::PhysicalExprNode { expr_id: None, expr_type: Some(protobuf::physical_expr_node::ExprType::NotExpr(Box::new( @@ -377,7 +373,7 @@ pub fn serialize_physical_expr_with_converter( }, ))), }) - } else if let Some(expr) = value.as_any().downcast_ref::() { + } else if let Some(expr) = expr.downcast_ref::() { Ok(protobuf::PhysicalExprNode { expr_id: None, expr_type: Some(protobuf::physical_expr_node::ExprType::IsNullExpr( @@ -388,7 +384,7 @@ pub fn serialize_physical_expr_with_converter( }), )), }) - } else if let Some(expr) = value.as_any().downcast_ref::() { + } else if let Some(expr) = expr.downcast_ref::() { Ok(protobuf::PhysicalExprNode { expr_id: None, expr_type: Some(protobuf::physical_expr_node::ExprType::IsNotNullExpr( @@ -399,7 +395,7 @@ pub fn serialize_physical_expr_with_converter( }), )), }) - } else if let Some(expr) = value.as_any().downcast_ref::() { + } else if let Some(expr) = expr.downcast_ref::() { Ok(protobuf::PhysicalExprNode { expr_id: None, expr_type: Some(protobuf::physical_expr_node::ExprType::InList(Box::new( @@ -412,7 +408,7 @@ pub fn serialize_physical_expr_with_converter( }, ))), }) - } else if let Some(expr) = value.as_any().downcast_ref::() { + } else if let Some(expr) = expr.downcast_ref::() { Ok(protobuf::PhysicalExprNode { expr_id: None, expr_type: Some(protobuf::physical_expr_node::ExprType::Negative(Box::new( @@ -423,14 +419,14 @@ pub fn serialize_physical_expr_with_converter( }, ))), }) - } else if let Some(lit) = value.as_any().downcast_ref::() { + } else if let Some(lit) = expr.downcast_ref::() { Ok(protobuf::PhysicalExprNode { expr_id: None, expr_type: Some(protobuf::physical_expr_node::ExprType::Literal( lit.value().try_into()?, )), }) - } else if let Some(cast) = value.as_any().downcast_ref::() { + } else if let Some(cast) = expr.downcast_ref::() { Ok(protobuf::PhysicalExprNode { expr_id: None, expr_type: Some(protobuf::physical_expr_node::ExprType::Cast(Box::new( @@ -442,7 +438,7 @@ pub fn serialize_physical_expr_with_converter( }, ))), }) - } else if let Some(cast) = value.as_any().downcast_ref::() { + } else if let Some(cast) = expr.downcast_ref::() { Ok(protobuf::PhysicalExprNode { expr_id: None, expr_type: Some(protobuf::physical_expr_node::ExprType::TryCast(Box::new( @@ -454,7 +450,7 @@ pub fn serialize_physical_expr_with_converter( }, ))), }) - } else if let Some(expr) = value.as_any().downcast_ref::() { + } else if let Some(expr) = expr.downcast_ref::() { let mut buf = Vec::new(); codec.try_encode_udf(expr.fun(), &mut buf)?; Ok(protobuf::PhysicalExprNode { @@ -473,7 +469,7 @@ pub fn serialize_physical_expr_with_converter( }, )), }) - } else if let Some(expr) = value.as_any().downcast_ref::() { + } else if let Some(expr) = expr.downcast_ref::() { Ok(protobuf::PhysicalExprNode { expr_id: None, expr_type: Some(protobuf::physical_expr_node::ExprType::LikeExpr(Box::new( @@ -489,7 +485,7 @@ pub fn serialize_physical_expr_with_converter( }, ))), }) - } else if let Some(expr) = value.as_any().downcast_ref::() { + } else if let Some(expr) = expr.downcast_ref::() { Ok(protobuf::PhysicalExprNode { expr_id: None, expr_type: Some(protobuf::physical_expr_node::ExprType::HashExpr( @@ -504,7 +500,7 @@ pub fn serialize_physical_expr_with_converter( }, )), }) - } else if let Some(df) = value.as_any().downcast_ref::() { + } else if let Some(df) = expr.downcast_ref::() { // Capture all state atomically, including the internal id. let snapshot = DynamicFilterSnapshot::from(df); diff --git a/datafusion/proto/tests/cases/roundtrip_physical_plan.rs b/datafusion/proto/tests/cases/roundtrip_physical_plan.rs index a5bacb4008861..be2dd6e68f768 100644 --- a/datafusion/proto/tests/cases/roundtrip_physical_plan.rs +++ b/datafusion/proto/tests/cases/roundtrip_physical_plan.rs @@ -130,7 +130,6 @@ use crate::cases::{ CustomUDWF, CustomUDWFNode, MyAggregateUDF, MyAggregateUdfNode, MyRegexUdf, MyRegexUdfNode, }; - use datafusion_physical_expr::expressions::{ DynamicFilterPhysicalExpr, DynamicFilterSnapshot, }; @@ -3137,6 +3136,13 @@ fn roundtrip_dynamic_filter_expr_pair( /// The pushed-down dynamic filter has been rewritten with /// `reassign_expr_columns`, so the source schema is `b@0, a@1` while the /// consumer-side filter above the projection still runs against `a@0`. +/// +/// Returns `(left_before, right_before, left_after, right_after)` where: +/// +/// - `left_before` is the filter above the projection before serialization +/// - `right_before` is the pushed-down scan filter before serialization +/// - `left_after` is the deserialized filter above the projection +/// - `right_after` is the deserialized pushed-down scan filter fn roundtrip_dynamic_filter_plan_pair() -> Result<( Arc, Arc, From e0ec773c7f976469b83010cebb9954d90933c05d Mon Sep 17 00:00:00 2001 From: Jayant Shrivastava Date: Tue, 14 Apr 2026 17:15:10 +0000 Subject: [PATCH 4/6] some ci errors --- datafusion/proto/tests/cases/roundtrip_physical_plan.rs | 7 ++++++- 1 file changed, 6 insertions(+), 1 deletion(-) diff --git a/datafusion/proto/tests/cases/roundtrip_physical_plan.rs b/datafusion/proto/tests/cases/roundtrip_physical_plan.rs index be2dd6e68f768..9c9770638757e 100644 --- a/datafusion/proto/tests/cases/roundtrip_physical_plan.rs +++ b/datafusion/proto/tests/cases/roundtrip_physical_plan.rs @@ -3195,11 +3195,13 @@ fn roundtrip_dynamic_filter_plan_pair() -> Result<( .expect("Expected outer FilterExec"); let left_filter = Arc::clone(outer_filter.predicate()); let projection = outer_filter.children()[0] + .as_ref() .as_any() .downcast_ref::() .expect("Expected ProjectionExec"); let data_source = projection .input() + .as_ref() .as_any() .downcast_ref::() .expect("Expected DataSourceExec"); @@ -3295,7 +3297,10 @@ fn test_dynamic_filter_roundtrip_dedupe_shallow() -> Result<()> { roundtrip_dynamic_filter_expr_pair(filter_expr_1, filter_expr_2, schema)?; assert_dynamic_filter_snapshot_matches(&filter_1_before_roundtrip, &left_filter); - assert_dynamic_filter_snapshot_matches(&filter_2_before_roundtrip_expr, &right_filter); + assert_dynamic_filter_snapshot_matches( + &filter_2_before_roundtrip_expr, + &right_filter, + ); assert_dynamic_filter_update_is_visible(&left_filter, &right_filter)?; From 61220c89304a49b769edb19c0bd85e9f73bdd36f Mon Sep 17 00:00:00 2001 From: Jayant Shrivastava Date: Tue, 14 Apr 2026 17:30:02 +0000 Subject: [PATCH 5/6] fix --- datafusion/proto/tests/cases/roundtrip_physical_plan.rs | 3 --- 1 file changed, 3 deletions(-) diff --git a/datafusion/proto/tests/cases/roundtrip_physical_plan.rs b/datafusion/proto/tests/cases/roundtrip_physical_plan.rs index 9c9770638757e..8bf94a2f932e5 100644 --- a/datafusion/proto/tests/cases/roundtrip_physical_plan.rs +++ b/datafusion/proto/tests/cases/roundtrip_physical_plan.rs @@ -3190,19 +3190,16 @@ fn roundtrip_dynamic_filter_plan_pair() -> Result<( let outer_filter = deserialized_plan .as_ref() - .as_any() .downcast_ref::() .expect("Expected outer FilterExec"); let left_filter = Arc::clone(outer_filter.predicate()); let projection = outer_filter.children()[0] .as_ref() - .as_any() .downcast_ref::() .expect("Expected ProjectionExec"); let data_source = projection .input() .as_ref() - .as_any() .downcast_ref::() .expect("Expected DataSourceExec"); let scan_config = data_source From d64c33d49b74eb7dc4d54c74607c0422b39e1818 Mon Sep 17 00:00:00 2001 From: Jayant Shrivastava Date: Tue, 14 Apr 2026 17:42:41 +0000 Subject: [PATCH 6/6] clippy --- datafusion/proto/tests/cases/roundtrip_physical_plan.rs | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/datafusion/proto/tests/cases/roundtrip_physical_plan.rs b/datafusion/proto/tests/cases/roundtrip_physical_plan.rs index 8bf94a2f932e5..7457ef4f8871e 100644 --- a/datafusion/proto/tests/cases/roundtrip_physical_plan.rs +++ b/datafusion/proto/tests/cases/roundtrip_physical_plan.rs @@ -3143,12 +3143,14 @@ fn roundtrip_dynamic_filter_expr_pair( /// - `right_before` is the pushed-down scan filter before serialization /// - `left_after` is the deserialized filter above the projection /// - `right_after` is the deserialized pushed-down scan filter -fn roundtrip_dynamic_filter_plan_pair() -> Result<( +type DynamicFilterExprPair = ( Arc, Arc, Arc, Arc, -)> { +); + +fn roundtrip_dynamic_filter_plan_pair() -> Result { let filter_expr_1 = make_dynamic_filter(); let (data_source_schema, filter_expr_2) = make_reassigned_dynamic_filter(Arc::clone(&filter_expr_1))?;