Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 0 additions & 15 deletions crates/core/src/context.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1072,21 +1072,6 @@ impl PySessionContext {
self.ctx.catalog_names().into_iter().collect()
}

pub fn tables(&self) -> HashSet<String> {
self.ctx
.catalog_names()
.into_iter()
.filter_map(|name| self.ctx.catalog(&name))
.flat_map(move |catalog| {
catalog
.schema_names()
.into_iter()
.filter_map(move |name| catalog.schema(&name))
})
.flat_map(|schema| schema.table_names())
.collect()
}

pub fn table(&self, name: &str, py: Python) -> PyResult<PyDataFrame> {
let res = wait_for_future(py, self.ctx.table(name))
.map_err(|e| PyKeyError::new_err(e.to_string()))?;
Expand Down
29 changes: 3 additions & 26 deletions crates/core/src/dataframe.rs
Original file line number Diff line number Diff line change
Expand Up @@ -468,17 +468,17 @@ impl PyDataFrame {
fn __getitem__(&self, key: Bound<'_, PyAny>) -> PyDataFusionResult<Self> {
if let Ok(key) = key.extract::<PyBackedStr>() {
// df[col]
self.select_columns(vec![key])
self.select_exprs(vec![key])
} else if let Ok(tuple) = key.cast::<PyTuple>() {
// df[col1, col2, col3]
let keys = tuple
.iter()
.map(|item| item.extract::<PyBackedStr>())
.collect::<PyResult<Vec<PyBackedStr>>>()?;
self.select_columns(keys)
self.select_exprs(keys)
} else if let Ok(keys) = key.extract::<Vec<PyBackedStr>>() {
// df[[col1, col2, col3]]
self.select_columns(keys)
self.select_exprs(keys)
} else {
let message = "DataFrame can only be indexed by string index or indices".to_string();
Err(PyDataFusionError::Common(message))
Expand Down Expand Up @@ -554,13 +554,6 @@ impl PyDataFrame {
Ok(PyTable::from(table_provider))
}

#[pyo3(signature = (*args))]
fn select_columns(&self, args: Vec<PyBackedStr>) -> PyDataFusionResult<Self> {
let args = args.iter().map(|s| s.as_ref()).collect::<Vec<&str>>();
let df = self.df.as_ref().clone().select_columns(&args)?;
Ok(Self::new(df))
}

Comment on lines -557 to -563
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Strange that this is deprecated in datafusion-python but not in datafusion: https://docs.rs/datafusion/53.0.0/datafusion/dataframe/struct.DataFrame.html#method.select_columns

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Because we can take both arguments in select vs select_columns, we deprecated select_columns a long time ago because it's more pythonic to have just one interface.

#[pyo3(signature = (*args))]
fn select_exprs(&self, args: Vec<PyBackedStr>) -> PyDataFusionResult<Self> {
let args = args.iter().map(|s| s.as_ref()).collect::<Vec<&str>>();
Expand Down Expand Up @@ -890,22 +883,6 @@ impl PyDataFrame {
Ok(Self::new(new_df))
}

#[pyo3(signature = (column, preserve_nulls=true, recursions=None))]
fn unnest_column(
&self,
column: &str,
preserve_nulls: bool,
recursions: Option<Vec<(String, String, usize)>>,
) -> PyDataFusionResult<Self> {
let unnest_options = build_unnest_options(preserve_nulls, recursions);
let df = self
.df
.as_ref()
.clone()
.unnest_columns_with_options(&[column], unnest_options)?;
Ok(Self::new(df))
}

#[pyo3(signature = (columns, preserve_nulls=true, recursions=None))]
fn unnest_columns(
&self,
Expand Down
133 changes: 3 additions & 130 deletions crates/core/src/functions.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,20 +18,14 @@
use std::collections::HashMap;

use datafusion::common::{Column, ScalarValue, TableReference};
use datafusion::execution::FunctionRegistry;
use datafusion::functions_aggregate::all_default_aggregate_functions;
use datafusion::functions_window::all_default_window_functions;
use datafusion::logical_expr::expr::{
Alias, FieldMetadata, NullTreatment as DFNullTreatment, WindowFunction, WindowFunctionParams,
};
use datafusion::logical_expr::{Expr, ExprFunctionExt, WindowFrame, WindowFunctionDefinition, lit};
use datafusion::logical_expr::expr::{Alias, FieldMetadata, NullTreatment as DFNullTreatment};
use datafusion::logical_expr::{Expr, ExprFunctionExt, lit};
use datafusion::{functions, functions_aggregate, functions_window};
use pyo3::prelude::*;
use pyo3::wrap_pyfunction;

use crate::common::data_type::{NullTreatment, PyScalarValue};
use crate::context::PySessionContext;
use crate::errors::{PyDataFusionError, PyDataFusionResult};
use crate::errors::PyDataFusionResult;
use crate::expr::PyExpr;
use crate::expr::conditional_expr::PyCaseBuilder;
use crate::expr::sort_expr::{PySortExpr, to_sort_expressions};
Expand Down Expand Up @@ -306,126 +300,6 @@ fn when(when: PyExpr, then: PyExpr) -> PyResult<PyCaseBuilder> {
Ok(PyCaseBuilder::new(None).when(when, then))
}

/// Helper function to find the appropriate window function.
///
/// Search procedure:
/// 1) Search built in window functions, which are being deprecated.
/// 1) If a session context is provided:
/// 1) search User Defined Aggregate Functions (UDAFs)
/// 1) search registered window functions
/// 1) search registered aggregate functions
/// 1) If no function has been found, search default aggregate functions.
///
/// NOTE: we search the built-ins first because the `UDAF` versions currently do not have the same behavior.
fn find_window_fn(
name: &str,
ctx: Option<PySessionContext>,
) -> PyDataFusionResult<WindowFunctionDefinition> {
if let Some(ctx) = ctx {
// search UDAFs
let udaf = ctx
.ctx
.udaf(name)
.map(WindowFunctionDefinition::AggregateUDF)
.ok();

if let Some(udaf) = udaf {
return Ok(udaf);
}

let session_state = ctx.ctx.state();

// search registered window functions
let window_fn = session_state
.window_functions()
.get(name)
.map(|f| WindowFunctionDefinition::WindowUDF(f.clone()));

if let Some(window_fn) = window_fn {
return Ok(window_fn);
}

// search registered aggregate functions
let agg_fn = session_state
.aggregate_functions()
.get(name)
.map(|f| WindowFunctionDefinition::AggregateUDF(f.clone()));

if let Some(agg_fn) = agg_fn {
return Ok(agg_fn);
}
}

// search default aggregate functions
let agg_fn = all_default_aggregate_functions()
.iter()
.find(|v| v.name() == name || v.aliases().contains(&name.to_string()))
.map(|f| WindowFunctionDefinition::AggregateUDF(f.clone()));

if let Some(agg_fn) = agg_fn {
return Ok(agg_fn);
}

// search default window functions
let window_fn = all_default_window_functions()
.iter()
.find(|v| v.name() == name || v.aliases().contains(&name.to_string()))
.map(|f| WindowFunctionDefinition::WindowUDF(f.clone()));

if let Some(window_fn) = window_fn {
return Ok(window_fn);
}

Err(PyDataFusionError::Common(format!(
"window function `{name}` not found"
)))
}

/// Creates a new Window function expression
#[allow(clippy::too_many_arguments)]
#[pyfunction]
#[pyo3(signature = (name, args, partition_by=None, order_by=None, window_frame=None, filter=None, distinct=false, ctx=None))]
fn window(
name: &str,
args: Vec<PyExpr>,
partition_by: Option<Vec<PyExpr>>,
order_by: Option<Vec<PySortExpr>>,
window_frame: Option<PyWindowFrame>,
filter: Option<PyExpr>,
distinct: bool,
ctx: Option<PySessionContext>,
) -> PyResult<PyExpr> {
let fun = find_window_fn(name, ctx)?;

let window_frame = window_frame
.map(|w| w.into())
.unwrap_or(WindowFrame::new(order_by.as_ref().map(|v| !v.is_empty())));
let filter = filter.map(|f| f.expr.into());

Ok(PyExpr {
expr: datafusion::logical_expr::Expr::WindowFunction(Box::new(WindowFunction {
fun,
params: WindowFunctionParams {
args: args.into_iter().map(|x| x.expr).collect::<Vec<_>>(),
partition_by: partition_by
.unwrap_or_default()
.into_iter()
.map(|x| x.expr)
.collect::<Vec<_>>(),
order_by: order_by
.unwrap_or_default()
.into_iter()
.map(|x| x.into())
.collect::<Vec<_>>(),
window_frame,
filter,
distinct,
null_treatment: None,
},
})),
})
}

// Generates a [pyo3] wrapper for associated aggregate functions.
// All of the builder options are exposed to the python internal
// function and we rely on the wrappers to only use those that
Expand Down Expand Up @@ -1186,7 +1060,6 @@ pub(crate) fn init_module(m: &Bound<'_, PyModule>) -> PyResult<()> {
m.add_wrapped(wrap_pyfunction!(self::uuid))?; // Use self to avoid name collision
m.add_wrapped(wrap_pyfunction!(var_pop))?;
m.add_wrapped(wrap_pyfunction!(var_sample))?;
m.add_wrapped(wrap_pyfunction!(window))?;
m.add_wrapped(wrap_pyfunction!(regr_avgx))?;
m.add_wrapped(wrap_pyfunction!(regr_avgy))?;
m.add_wrapped(wrap_pyfunction!(regr_count))?;
Expand Down
15 changes: 7 additions & 8 deletions docs/source/user-guide/common-operations/windows.rst
Original file line number Diff line number Diff line change
Expand Up @@ -175,10 +175,7 @@ it's ``Type 2`` column that are null.
Aggregate Functions
-------------------

You can use any :ref:`Aggregation Function<aggregation>` as a window function. Currently
aggregate functions must use the deprecated
:py:func:`datafusion.functions.window` API but this should be resolved in
DataFusion 42.0 (`Issue Link <https://github.com/apache/datafusion-python/issues/833>`_). Here
You can use any :ref:`Aggregation Function<aggregation>` as a window function. Here
is an example that shows how to compare each pokemons’s attack power with the average attack
power in its ``"Type 1"`` using the :py:func:`datafusion.functions.avg` function.

Expand All @@ -189,10 +186,12 @@ power in its ``"Type 1"`` using the :py:func:`datafusion.functions.avg` function
col('"Name"'),
col('"Attack"'),
col('"Type 1"'),
f.window("avg", [col('"Attack"')])
.partition_by(col('"Type 1"'))
.build()
.alias("Average Attack"),
f.avg(col('"Attack"')).over(
Window(
window_frame=WindowFrame("rows", None, None),
partition_by=[col('"Type 1"')],
)
).alias("Average Attack"),
Comment on lines +189 to +194
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think the order_by, window_frame, or null_treatment make sense here.

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The window_frame is necessary so that we get the same avg across the entire partition. Otherwise we'd need a sort on it afterwards to make sure it shows up in the "running avg" and I think using the frame is more easy to understand. But you're right about the null treatment and order_by.

)

Available Functions
Expand Down
Loading
Loading