Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
46 changes: 45 additions & 1 deletion datafusion/physical-expr-common/src/physical_expr.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@
use std::any::Any;
use std::fmt;
use std::fmt::{Debug, Display, Formatter};
use std::hash::{Hash, Hasher};
use std::hash::{DefaultHasher, Hash, Hasher};
use std::sync::Arc;

use crate::utils::scatter;
Expand Down Expand Up @@ -441,6 +441,50 @@ pub trait PhysicalExpr: Any + Send + Sync + Display + Debug + DynEq + DynHash {
fn placement(&self) -> ExpressionPlacement {
ExpressionPlacement::KeepInPlace
}

/// Returns a composite identifier for a [`PhysicalExpr`]. Note that if the expression
/// is dropped, then the returned id is no longer valid.
fn expr_id(self: Arc<Self>, salt: &[u64]) -> Option<PhysicalExprId> {
Some(PhysicalExprId::new(expr_id_from_arc(&self, salt), None))
}
}

/// A composite identifier for [`PhysicalExpr`].
#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
pub struct PhysicalExprId {
exact: u64,
shallow: Option<u64>,
}

impl PhysicalExprId {
/// Create a new [`PhysicalExprId`]. Both ids must be globally unique within
/// a process.
pub fn new(exact: u64, shallow: Option<u64>) -> Self {
Self { exact, shallow }
}

/// Returns the identifier for the full expression tree, including children.
pub fn exact(&self) -> u64 {
self.exact
}

/// Returns the identifier for just the expression root, ignoring children.
pub fn shallow(&self) -> Option<u64> {
self.shallow
}
}

/// Computes a unique identifier for a type contained within an [`Arc`]. It hashes
/// the [`Arc`] pointer to create a process-local identifier that remains valid
/// only while that allocation is still alive.
pub fn expr_id_from_arc<T: ?Sized>(expr: &Arc<T>, salt: &[u64]) -> u64 {
let mut hasher = DefaultHasher::new();
let ptr = Arc::as_ptr(expr) as *const () as u64;
ptr.hash(&mut hasher);
for &salt in salt {
salt.hash(&mut hasher);
}
hasher.finish()
}

#[deprecated(
Expand Down
205 changes: 204 additions & 1 deletion datafusion/physical-expr/src/expressions/dynamic_filters.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,9 @@ use datafusion_common::{
tree_node::{Transformed, TransformedResult, TreeNode},
};
use datafusion_expr::ColumnarValue;
use datafusion_physical_expr_common::physical_expr::DynHash;
use datafusion_physical_expr_common::physical_expr::{
DynHash, PhysicalExprId, expr_id_from_arc,
};

/// State of a dynamic filter, tracking both updates and completion.
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
Expand Down Expand Up @@ -88,6 +90,116 @@ struct Inner {
is_complete: bool,
}

/// An atomic snapshot of a [`DynamicFilterPhysicalExpr`] used to reconstruct the expression during
/// serialization / deserialization.
pub struct DynamicFilterSnapshot {
Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added this type because I thought it looked cleaner / simpler than having getters on the DynamicFilterPhysicalExpr itself.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe we should just add the getters and setters for the internal fields...

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It seems like we end up exposing all of the internals anyway: if there was a way to return a dyn Serializable it'd be one thing, but the proto machinery still needs to know all of the fields to serialize.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I also think the name snapshot makes it sounds like this is what would be returned from PhysicalExpr::snapshot, but that is not the case.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could be... you kind of have already this, just to an intermediate structure instead

children: Vec<Arc<dyn PhysicalExpr>>,
remapped_children: Option<Vec<Arc<dyn PhysicalExpr>>>,
// Inner state.
generation: u64,
inner_expr: Arc<dyn PhysicalExpr>,
is_complete: bool,
}

impl DynamicFilterSnapshot {
pub fn new(
children: Vec<Arc<dyn PhysicalExpr>>,
remapped_children: Option<Vec<Arc<dyn PhysicalExpr>>>,
generation: u64,
inner_expr: Arc<dyn PhysicalExpr>,
is_complete: bool,
) -> Self {
Self {
children,
remapped_children,
generation,
inner_expr,
is_complete,
}
}

pub fn children(&self) -> &[Arc<dyn PhysicalExpr>] {
&self.children
}

pub fn remapped_children(&self) -> Option<&[Arc<dyn PhysicalExpr>]> {
self.remapped_children.as_deref()
}

pub fn generation(&self) -> u64 {
self.generation
}

pub fn inner_expr(&self) -> &Arc<dyn PhysicalExpr> {
&self.inner_expr
}

pub fn is_complete(&self) -> bool {
self.is_complete
}
}
impl Display for DynamicFilterSnapshot {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(
f,
"DynamicFilterSnapshot {{ children: {:?}, remapped_children: {:?}, generation: {}, inner_expr: {:?}, is_complete: {} }}",
self.children,
self.remapped_children,
self.generation,
self.inner_expr,
self.is_complete
)
}
}

impl From<DynamicFilterSnapshot> for DynamicFilterPhysicalExpr {
fn from(snapshot: DynamicFilterSnapshot) -> Self {
let DynamicFilterSnapshot {
children,
remapped_children,
generation,
inner_expr,
is_complete,
} = snapshot;

let state = if is_complete {
FilterState::Complete { generation }
} else {
FilterState::InProgress { generation }
};
let (state_watch, _) = watch::channel(state);

Self {
children,
remapped_children,
inner: Arc::new(RwLock::new(Inner {
generation,
expr: inner_expr,
is_complete,
})),
state_watch,
data_type: Arc::new(RwLock::new(None)),
nullable: Arc::new(RwLock::new(None)),
}
}
}

impl From<&DynamicFilterPhysicalExpr> for DynamicFilterSnapshot {
fn from(expr: &DynamicFilterPhysicalExpr) -> Self {
let (generation, inner_expr, is_complete) = {
let inner = expr.inner.read();
(inner.generation, Arc::clone(&inner.expr), inner.is_complete)
};
DynamicFilterSnapshot {
children: expr.children.clone(),
remapped_children: expr.remapped_children.clone(),
generation,
inner_expr,
is_complete,
}
}
}

impl Inner {
fn new(expr: Arc<dyn PhysicalExpr>) -> Self {
Self {
Expand Down Expand Up @@ -448,6 +560,13 @@ impl PhysicalExpr for DynamicFilterPhysicalExpr {
// Return the current generation of the expression.
self.inner.read().generation
}

fn expr_id(self: Arc<Self>, salt: &[u64]) -> Option<PhysicalExprId> {
Some(PhysicalExprId::new(
expr_id_from_arc(&self, salt),
Some(expr_id_from_arc(&self.inner, salt)),
))
}
}

#[cfg(test)]
Expand Down Expand Up @@ -867,4 +986,88 @@ mod test {
"Hash should be stable after update (identity-based)"
);
}

#[test]
fn test_current_snapshot_roundtrip() {
let schema = Arc::new(Schema::new(vec![Field::new("a", DataType::Int32, false)]));
let col_a = col("a", &schema).unwrap();

// Create a dynamic filter with children
let expr = Arc::new(BinaryExpr::new(
Arc::clone(&col_a),
datafusion_expr::Operator::Gt,
lit(10) as Arc<dyn PhysicalExpr>,
));
let filter = DynamicFilterPhysicalExpr::new(
vec![Arc::clone(&col_a)],
expr as Arc<dyn PhysicalExpr>,
);

// Update expression and mark complete
filter
.update(lit(42) as Arc<dyn PhysicalExpr>)
.expect("Update should succeed");
filter.mark_complete();

// Take a snapshot and reconstruct
let snapshot = DynamicFilterSnapshot::from(&filter);
let reconstructed = DynamicFilterPhysicalExpr::from(snapshot);

// String representations should be equal
assert_eq!(
DynamicFilterSnapshot::from(&filter).to_string(),
DynamicFilterSnapshot::from(&reconstructed).to_string(),
);
}

#[tokio::test]
async fn test_expr_id() {
// Create a source filter
let source = Arc::new(DynamicFilterPhysicalExpr::new(
vec![],
lit(42) as Arc<dyn PhysicalExpr>,
));
let source_clone = Arc::clone(&source);

// Create a derived filter with different children
let schema = Arc::new(Schema::new(vec![Field::new("x", DataType::Int32, false)]));
let col_x = col("x", &schema).unwrap();
let derived = Arc::clone(&source)
.with_new_children(vec![Arc::clone(&col_x)])
.unwrap();

let derived_expr_id = Arc::clone(&derived)
.expr_id(&[])
.expect("combined filter should have an expr_id");
let source_expr_id = Arc::clone(&source)
.expr_id(&[])
.expect("source filter should have an expr_id");
let source_clone_expr_id = Arc::clone(&source_clone)
.expr_id(&[])
.expect("source clone should have an expr_id");

assert_eq!(
source_clone_expr_id.exact(),
source_expr_id.exact(),
"cloned filter should have the same exact id because the outer Arc is the same",
);

assert_eq!(
source_clone_expr_id.shallow(),
source_expr_id.shallow(),
"cloned filter should have the same shallow id because the inner state is the same",
);

assert_ne!(
derived_expr_id.exact(),
source_expr_id.exact(),
"filters should have different exact ids because the children are different",
);

assert_eq!(
derived_expr_id.shallow(),
source_expr_id.shallow(),
"filters should have the same shallow id because they are the same expression",
);
}
}
2 changes: 1 addition & 1 deletion datafusion/physical-expr/src/expressions/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ pub use cast::{CastExpr, cast};
pub use cast_column::CastColumnExpr;
pub use column::{Column, col, with_new_schema};
pub use datafusion_expr::utils::format_state_name;
pub use dynamic_filters::DynamicFilterPhysicalExpr;
pub use dynamic_filters::{DynamicFilterPhysicalExpr, DynamicFilterSnapshot};
pub use in_list::{InListExpr, in_list};
pub use is_not_null::{IsNotNullExpr, is_not_null};
pub use is_null::{IsNullExpr, is_null};
Expand Down
27 changes: 19 additions & 8 deletions datafusion/proto/proto/datafusion.proto
Original file line number Diff line number Diff line change
Expand Up @@ -871,17 +871,18 @@ message PhysicalExtensionNode {
}

// physical expressions
message PhysicalExprId {
uint64 exact = 1;
optional uint64 shallow = 2;
}

message PhysicalExprNode {
// Was date_time_interval_expr
reserved 17;

// Unique identifier for this expression to do deduplication during deserialization.
// When serializing, this is set to a unique identifier for each combination of
// expression, process and serialization run.
// When deserializing, if this ID has been seen before, the cached Arc is returned
// instead of creating a new one, enabling reconstruction of referential integrity
// across serde roundtrips.
optional uint64 expr_id = 30;
// Unique identifier for this expression used during deserialization to restore
// referential integrity across serde roundtrips.
PhysicalExprId expr_id = 31;

oneof ExprType {
// column references
Expand Down Expand Up @@ -920,9 +921,19 @@ message PhysicalExprNode {
UnknownColumn unknown_column = 20;

PhysicalHashExprNode hash_expr = 21;

PhysicalDynamicFilterNode dynamic_filter = 22;
}
}

message PhysicalDynamicFilterNode {
repeated PhysicalExprNode children = 1;
repeated PhysicalExprNode remapped_children = 2;
uint64 generation = 3;
PhysicalExprNode inner_expr = 4;
bool is_complete = 5;
}

message PhysicalScalarUdfNode {
string name = 1;
repeated PhysicalExprNode args = 2;
Expand Down Expand Up @@ -1470,4 +1481,4 @@ message AsyncFuncExecNode {
message BufferExecNode {
PhysicalPlanNode input = 1;
uint64 capacity = 2;
}
}
Loading
Loading