-
Notifications
You must be signed in to change notification settings - Fork 150
refactor(context): deduplicate register/read option-building logic #1479
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -16,7 +16,6 @@ | |
| // under the License. | ||
|
|
||
| use std::collections::{HashMap, HashSet}; | ||
| use std::path::PathBuf; | ||
| use std::ptr::NonNull; | ||
| use std::str::FromStr; | ||
| use std::sync::Arc; | ||
|
|
@@ -456,19 +455,8 @@ impl PySessionContext { | |
| ) -> PyDataFusionResult<()> { | ||
| let options = ListingOptions::new(Arc::new(ParquetFormat::new())) | ||
| .with_file_extension(file_extension) | ||
| .with_table_partition_cols( | ||
| table_partition_cols | ||
| .into_iter() | ||
| .map(|(name, ty)| (name, ty.0)) | ||
| .collect::<Vec<(String, DataType)>>(), | ||
| ) | ||
| .with_file_sort_order( | ||
| file_sort_order | ||
| .unwrap_or_default() | ||
| .into_iter() | ||
| .map(|e| e.into_iter().map(|f| f.into()).collect()) | ||
| .collect(), | ||
| ); | ||
| .with_table_partition_cols(convert_partition_cols(table_partition_cols)) | ||
| .with_file_sort_order(convert_file_sort_order(file_sort_order)); | ||
| let table_path = ListingTableUrl::parse(path)?; | ||
| let resolved_schema: SchemaRef = match schema { | ||
| Some(s) => Arc::new(s.0), | ||
|
|
@@ -831,25 +819,15 @@ impl PySessionContext { | |
| file_sort_order: Option<Vec<Vec<PySortExpr>>>, | ||
| py: Python, | ||
| ) -> PyDataFusionResult<()> { | ||
| let mut options = ParquetReadOptions::default() | ||
| .table_partition_cols( | ||
| table_partition_cols | ||
| .into_iter() | ||
| .map(|(name, ty)| (name, ty.0)) | ||
| .collect::<Vec<(String, DataType)>>(), | ||
| ) | ||
| .parquet_pruning(parquet_pruning) | ||
| .skip_metadata(skip_metadata); | ||
| options.file_extension = file_extension; | ||
| options.schema = schema.as_ref().map(|x| &x.0); | ||
| options.file_sort_order = file_sort_order | ||
| .unwrap_or_default() | ||
| .into_iter() | ||
| .map(|e| e.into_iter().map(|f| f.into()).collect()) | ||
| .collect(); | ||
|
|
||
| let result = self.ctx.register_parquet(name, path, options); | ||
| wait_for_future(py, result)??; | ||
| let options = build_parquet_options( | ||
| table_partition_cols, | ||
| parquet_pruning, | ||
| file_extension, | ||
| skip_metadata, | ||
| &schema, | ||
| file_sort_order, | ||
| ); | ||
| wait_for_future(py, self.ctx.register_parquet(name, path, options))??; | ||
| Ok(()) | ||
| } | ||
|
|
||
|
|
@@ -863,19 +841,17 @@ impl PySessionContext { | |
| options: Option<&PyCsvReadOptions>, | ||
| py: Python, | ||
| ) -> PyDataFusionResult<()> { | ||
| let options = options | ||
| .map(|opts| opts.try_into()) | ||
| .transpose()? | ||
| .unwrap_or_default(); | ||
| let options = convert_csv_options(options)?; | ||
|
|
||
| if path.is_instance_of::<PyList>() { | ||
| let paths = path.extract::<Vec<String>>()?; | ||
| let result = self.register_csv_from_multiple_paths(name, paths, options); | ||
| wait_for_future(py, result)??; | ||
| wait_for_future( | ||
| py, | ||
| self.register_csv_from_multiple_paths(name, paths, options), | ||
| )??; | ||
| } else { | ||
| let path = path.extract::<String>()?; | ||
| let result = self.ctx.register_csv(name, &path, options); | ||
| wait_for_future(py, result)??; | ||
| wait_for_future(py, self.ctx.register_csv(name, &path, options))??; | ||
| } | ||
|
|
||
| Ok(()) | ||
|
|
@@ -892,33 +868,22 @@ impl PySessionContext { | |
| pub fn register_json( | ||
| &self, | ||
| name: &str, | ||
| path: PathBuf, | ||
| path: &str, | ||
| schema: Option<PyArrowType<Schema>>, | ||
| schema_infer_max_records: usize, | ||
| file_extension: &str, | ||
| table_partition_cols: Vec<(String, PyArrowType<DataType>)>, | ||
| file_compression_type: Option<String>, | ||
| py: Python, | ||
| ) -> PyDataFusionResult<()> { | ||
|
Comment on lines
868
to
878
|
||
| let path = path | ||
| .to_str() | ||
| .ok_or_else(|| PyValueError::new_err("Unable to convert path to a string"))?; | ||
|
|
||
| let mut options = JsonReadOptions::default() | ||
| .file_compression_type(parse_file_compression_type(file_compression_type)?) | ||
| .table_partition_cols( | ||
| table_partition_cols | ||
| .into_iter() | ||
| .map(|(name, ty)| (name, ty.0)) | ||
| .collect::<Vec<(String, DataType)>>(), | ||
| ); | ||
| options.schema_infer_max_records = schema_infer_max_records; | ||
| options.file_extension = file_extension; | ||
| options.schema = schema.as_ref().map(|x| &x.0); | ||
|
|
||
| let result = self.ctx.register_json(name, path, options); | ||
| wait_for_future(py, result)??; | ||
|
|
||
| let options = build_json_options( | ||
| table_partition_cols, | ||
| file_compression_type, | ||
| schema_infer_max_records, | ||
| file_extension, | ||
| &schema, | ||
| )?; | ||
| wait_for_future(py, self.ctx.register_json(name, path, options))??; | ||
| Ok(()) | ||
| } | ||
|
|
||
|
|
@@ -931,28 +896,14 @@ impl PySessionContext { | |
| pub fn register_avro( | ||
| &self, | ||
| name: &str, | ||
| path: PathBuf, | ||
| path: &str, | ||
| schema: Option<PyArrowType<Schema>>, | ||
| file_extension: &str, | ||
| table_partition_cols: Vec<(String, PyArrowType<DataType>)>, | ||
| py: Python, | ||
| ) -> PyDataFusionResult<()> { | ||
| let path = path | ||
| .to_str() | ||
| .ok_or_else(|| PyValueError::new_err("Unable to convert path to a string"))?; | ||
|
|
||
| let mut options = AvroReadOptions::default().table_partition_cols( | ||
| table_partition_cols | ||
| .into_iter() | ||
| .map(|(name, ty)| (name, ty.0)) | ||
| .collect::<Vec<(String, DataType)>>(), | ||
| ); | ||
| options.file_extension = file_extension; | ||
| options.schema = schema.as_ref().map(|x| &x.0); | ||
|
|
||
| let result = self.ctx.register_avro(name, path, options); | ||
| wait_for_future(py, result)??; | ||
|
|
||
| let options = build_avro_options(table_partition_cols, file_extension, &schema); | ||
| wait_for_future(py, self.ctx.register_avro(name, path, options))??; | ||
| Ok(()) | ||
| } | ||
|
|
||
|
|
@@ -1054,35 +1005,22 @@ impl PySessionContext { | |
| #[pyo3(signature = (path, schema=None, schema_infer_max_records=1000, file_extension=".json", table_partition_cols=vec![], file_compression_type=None))] | ||
| pub fn read_json( | ||
| &self, | ||
| path: PathBuf, | ||
| path: &str, | ||
| schema: Option<PyArrowType<Schema>>, | ||
| schema_infer_max_records: usize, | ||
| file_extension: &str, | ||
| table_partition_cols: Vec<(String, PyArrowType<DataType>)>, | ||
| file_compression_type: Option<String>, | ||
| py: Python, | ||
| ) -> PyDataFusionResult<PyDataFrame> { | ||
| let path = path | ||
| .to_str() | ||
| .ok_or_else(|| PyValueError::new_err("Unable to convert path to a string"))?; | ||
| let mut options = JsonReadOptions::default() | ||
| .table_partition_cols( | ||
| table_partition_cols | ||
| .into_iter() | ||
| .map(|(name, ty)| (name, ty.0)) | ||
| .collect::<Vec<(String, DataType)>>(), | ||
| ) | ||
| .file_compression_type(parse_file_compression_type(file_compression_type)?); | ||
| options.schema_infer_max_records = schema_infer_max_records; | ||
| options.file_extension = file_extension; | ||
| let df = if let Some(schema) = schema { | ||
| options.schema = Some(&schema.0); | ||
| let result = self.ctx.read_json(path, options); | ||
| wait_for_future(py, result)?? | ||
| } else { | ||
| let result = self.ctx.read_json(path, options); | ||
| wait_for_future(py, result)?? | ||
| }; | ||
| let options = build_json_options( | ||
| table_partition_cols, | ||
| file_compression_type, | ||
| schema_infer_max_records, | ||
| file_extension, | ||
| &schema, | ||
| )?; | ||
| let df = wait_for_future(py, self.ctx.read_json(path, options))??; | ||
| Ok(PyDataFrame::new(df)) | ||
| } | ||
|
|
||
|
|
@@ -1095,23 +1033,15 @@ impl PySessionContext { | |
| options: Option<&PyCsvReadOptions>, | ||
| py: Python, | ||
| ) -> PyDataFusionResult<PyDataFrame> { | ||
| let options = options | ||
| .map(|opts| opts.try_into()) | ||
| .transpose()? | ||
| .unwrap_or_default(); | ||
| let options = convert_csv_options(options)?; | ||
|
|
||
| if path.is_instance_of::<PyList>() { | ||
| let paths = path.extract::<Vec<String>>()?; | ||
| let paths = paths.iter().map(|p| p as &str).collect::<Vec<&str>>(); | ||
| let result = self.ctx.read_csv(paths, options); | ||
| let df = PyDataFrame::new(wait_for_future(py, result)??); | ||
| Ok(df) | ||
| let paths: Vec<String> = if path.is_instance_of::<PyList>() { | ||
| path.extract()? | ||
| } else { | ||
| let path = path.extract::<String>()?; | ||
| let result = self.ctx.read_csv(path, options); | ||
| let df = PyDataFrame::new(wait_for_future(py, result)??); | ||
| Ok(df) | ||
| } | ||
| vec![path.extract()?] | ||
| }; | ||
| let df = wait_for_future(py, self.ctx.read_csv(paths, options))??; | ||
| Ok(PyDataFrame::new(df)) | ||
| } | ||
|
|
||
| #[allow(clippy::too_many_arguments)] | ||
|
|
@@ -1134,25 +1064,15 @@ impl PySessionContext { | |
| file_sort_order: Option<Vec<Vec<PySortExpr>>>, | ||
| py: Python, | ||
| ) -> PyDataFusionResult<PyDataFrame> { | ||
| let mut options = ParquetReadOptions::default() | ||
| .table_partition_cols( | ||
| table_partition_cols | ||
| .into_iter() | ||
| .map(|(name, ty)| (name, ty.0)) | ||
| .collect::<Vec<(String, DataType)>>(), | ||
| ) | ||
| .parquet_pruning(parquet_pruning) | ||
| .skip_metadata(skip_metadata); | ||
| options.file_extension = file_extension; | ||
| options.schema = schema.as_ref().map(|x| &x.0); | ||
| options.file_sort_order = file_sort_order | ||
| .unwrap_or_default() | ||
| .into_iter() | ||
| .map(|e| e.into_iter().map(|f| f.into()).collect()) | ||
| .collect(); | ||
|
|
||
| let result = self.ctx.read_parquet(path, options); | ||
| let df = PyDataFrame::new(wait_for_future(py, result)??); | ||
| let options = build_parquet_options( | ||
| table_partition_cols, | ||
| parquet_pruning, | ||
| file_extension, | ||
| skip_metadata, | ||
| &schema, | ||
| file_sort_order, | ||
| ); | ||
| let df = PyDataFrame::new(wait_for_future(py, self.ctx.read_parquet(path, options))??); | ||
| Ok(df) | ||
| } | ||
|
|
||
|
|
@@ -1166,21 +1086,8 @@ impl PySessionContext { | |
| file_extension: &str, | ||
| py: Python, | ||
| ) -> PyDataFusionResult<PyDataFrame> { | ||
| let mut options = AvroReadOptions::default().table_partition_cols( | ||
| table_partition_cols | ||
| .into_iter() | ||
| .map(|(name, ty)| (name, ty.0)) | ||
| .collect::<Vec<(String, DataType)>>(), | ||
| ); | ||
| options.file_extension = file_extension; | ||
| let df = if let Some(schema) = schema { | ||
| options.schema = Some(&schema.0); | ||
| let read_future = self.ctx.read_avro(path, options); | ||
| wait_for_future(py, read_future)?? | ||
| } else { | ||
| let read_future = self.ctx.read_avro(path, options); | ||
| wait_for_future(py, read_future)?? | ||
| }; | ||
| let options = build_avro_options(table_partition_cols, file_extension, &schema); | ||
| let df = wait_for_future(py, self.ctx.read_avro(path, options))??; | ||
| Ok(PyDataFrame::new(df)) | ||
| } | ||
|
|
||
|
|
@@ -1280,7 +1187,7 @@ impl PySessionContext { | |
| // check if the file extension matches the expected extension | ||
| for path in &table_paths { | ||
| let file_path = path.as_str(); | ||
| if !file_path.ends_with(option_extension.clone().as_str()) && !path.is_collection() { | ||
| if !file_path.ends_with(option_extension.as_str()) && !path.is_collection() { | ||
| return exec_err!( | ||
| "File path '{file_path}' does not match the expected extension '{option_extension}'" | ||
| ); | ||
|
|
@@ -1321,6 +1228,80 @@ pub fn parse_file_compression_type( | |
| }) | ||
| } | ||
|
|
||
| fn convert_csv_options( | ||
| options: Option<&PyCsvReadOptions>, | ||
| ) -> PyDataFusionResult<CsvReadOptions<'_>> { | ||
| Ok(options | ||
| .map(|opts| opts.try_into()) | ||
| .transpose()? | ||
| .unwrap_or_default()) | ||
| } | ||
|
|
||
| fn convert_partition_cols( | ||
| table_partition_cols: Vec<(String, PyArrowType<DataType>)>, | ||
| ) -> Vec<(String, DataType)> { | ||
| table_partition_cols | ||
| .into_iter() | ||
| .map(|(name, ty)| (name, ty.0)) | ||
| .collect() | ||
| } | ||
|
|
||
| fn convert_file_sort_order( | ||
| file_sort_order: Option<Vec<Vec<PySortExpr>>>, | ||
| ) -> Vec<Vec<datafusion::logical_expr::SortExpr>> { | ||
| file_sort_order | ||
| .unwrap_or_default() | ||
| .into_iter() | ||
| .map(|e| e.into_iter().map(|f| f.into()).collect()) | ||
| .collect() | ||
| } | ||
|
|
||
| fn build_parquet_options<'a>( | ||
| table_partition_cols: Vec<(String, PyArrowType<DataType>)>, | ||
| parquet_pruning: bool, | ||
| file_extension: &'a str, | ||
| skip_metadata: bool, | ||
| schema: &'a Option<PyArrowType<Schema>>, | ||
| file_sort_order: Option<Vec<Vec<PySortExpr>>>, | ||
| ) -> ParquetReadOptions<'a> { | ||
| let mut options = ParquetReadOptions::default() | ||
| .table_partition_cols(convert_partition_cols(table_partition_cols)) | ||
| .parquet_pruning(parquet_pruning) | ||
| .skip_metadata(skip_metadata); | ||
| options.file_extension = file_extension; | ||
| options.schema = schema.as_ref().map(|x| &x.0); | ||
| options.file_sort_order = convert_file_sort_order(file_sort_order); | ||
| options | ||
| } | ||
|
|
||
| fn build_json_options<'a>( | ||
| table_partition_cols: Vec<(String, PyArrowType<DataType>)>, | ||
| file_compression_type: Option<String>, | ||
| schema_infer_max_records: usize, | ||
| file_extension: &'a str, | ||
| schema: &'a Option<PyArrowType<Schema>>, | ||
| ) -> Result<JsonReadOptions<'a>, PyErr> { | ||
| let mut options = JsonReadOptions::default() | ||
| .table_partition_cols(convert_partition_cols(table_partition_cols)) | ||
| .file_compression_type(parse_file_compression_type(file_compression_type)?); | ||
| options.schema_infer_max_records = schema_infer_max_records; | ||
| options.file_extension = file_extension; | ||
| options.schema = schema.as_ref().map(|x| &x.0); | ||
| Ok(options) | ||
| } | ||
|
|
||
| fn build_avro_options<'a>( | ||
| table_partition_cols: Vec<(String, PyArrowType<DataType>)>, | ||
| file_extension: &'a str, | ||
| schema: &'a Option<PyArrowType<Schema>>, | ||
| ) -> AvroReadOptions<'a> { | ||
| let mut options = AvroReadOptions::default() | ||
| .table_partition_cols(convert_partition_cols(table_partition_cols)); | ||
| options.file_extension = file_extension; | ||
| options.schema = schema.as_ref().map(|x| &x.0); | ||
| options | ||
| } | ||
|
|
||
| impl From<PySessionContext> for SessionContext { | ||
| fn from(ctx: PySessionContext) -> SessionContext { | ||
| ctx.ctx.as_ref().clone() | ||
|
|
||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Changing
pathfromPathBufto&strin this#[pymethods]API is a breaking change for Python callers that previously passedpathlib.Path/ otheros.PathLikeobjects (PyO3 can extract those intoPathBuf, but not into&str). Consider keepingPathBuf(and moving theto_str()conversion into a helper), or accept aBound<'_, PyAny>and explicitly support bothstrandos.PathLikeinputs to preserve the existing Python surface area.