-
Notifications
You must be signed in to change notification settings - Fork 16
feat(server): /load backend=polars, /load_expr config parity, per-client search isolation (#851) #882
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Open
paddymul
wants to merge
2
commits into
main
Choose a base branch
from
feat/server-load-polars-and-search-isolation
base: main
Could not load branches
Branch not found: {{ refName }}
Loading
Could not load tags
Nothing to show
Loading
Are you sure you want to change the base?
Some commits from the old base branch may be removed from the timeline,
and old review comments may become outdated.
Open
feat(server): /load backend=polars, /load_expr config parity, per-client search isolation (#851) #882
Changes from all commits
Commits
File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,136 @@ | ||
| """Polars counterparts of the pandas-based loaders in ``data_loading``. | ||
|
|
||
| Lives in its own module so polars stays an optional dependency — the | ||
| server only imports this when ``/load`` is called with | ||
| ``backend: "polars"``. Mirrors the shape of ``data_loading``: | ||
|
|
||
| * :func:`load_file_polars` reads parquet/csv/tsv/json eagerly into a | ||
| ``pl.DataFrame``. | ||
| * :class:`PolarsServerDataflow` is the polars analogue of | ||
| ``ServerDataflow`` — same role in the pipeline, polars-flavored | ||
| analysis / autocleaning / stats / sampling classes (lifted from | ||
| ``PolarsBuckarooInfiniteWidget``). | ||
| * :func:`handle_infinite_request_buckaroo_polars` is the polars | ||
| equivalent of ``handle_infinite_request_buckaroo`` — applies the | ||
| per-client live ``search_string`` as a literal substring match | ||
| on String columns (mirrors ``search_df_str`` semantics from the | ||
| pandas path so the client-facing behaviour is identical). | ||
| """ | ||
| import os | ||
| import traceback | ||
| from io import BytesIO | ||
|
|
||
| import polars as pl | ||
|
|
||
| from buckaroo.dataflow.dataflow import CustomizableDataflow | ||
| from buckaroo.dataflow.autocleaning import PandasAutocleaning | ||
| from buckaroo.customizations.pl_autocleaning_conf import NoCleaningConfPl | ||
| from buckaroo.pluggable_analysis_framework.df_stats_v2 import PlDfStatsV2 | ||
| from buckaroo.polars_buckaroo import ( | ||
| PLSampling, local_analysis_klasses, prepare_df_for_serialization) | ||
| from buckaroo.serialization_utils import pd_to_obj | ||
|
|
||
|
|
||
| class PolarsServerSampling(PLSampling): | ||
| """Server-mode polars sampling. Inherits ``PLSampling``'s widget | ||
| defaults but caps pre-stats work at ``pre_limit`` so a multi-million-row | ||
| /load doesn't OOM the stats pipeline.""" | ||
| pre_limit = 1_000_000 | ||
| serialize_limit = -1 # infinite mode — no per-page sample cap | ||
|
|
||
|
|
||
| class PolarsServerDataflow(CustomizableDataflow): | ||
| """Headless polars dataflow matching ``PolarsBuckarooInfiniteWidget``.""" | ||
| analysis_klasses = local_analysis_klasses | ||
| autocleaning_klass = PandasAutocleaning | ||
| autoclean_conf = tuple([NoCleaningConfPl]) | ||
| DFStatsClass = PlDfStatsV2 | ||
| sampling_klass = PolarsServerSampling | ||
|
|
||
| def _df_to_obj(self, df): | ||
| # Matches PolarsBuckarooWidget._df_to_obj — pandas frames pass | ||
| # straight through, polars frames go via to_pandas for the JSON | ||
| # initial-state path. (The hot path is the parquet infinite handler | ||
| # below, not this; this is only the initial empty/seed payload.) | ||
| import pandas as pd | ||
| if isinstance(df, pd.DataFrame): | ||
| return pd_to_obj(self.sampling_klass.serialize_sample(df)) | ||
| return pd_to_obj(self.sampling_klass.serialize_sample(df.to_pandas())) | ||
|
|
||
|
|
||
| def load_file_polars(path: str) -> pl.DataFrame: | ||
| """Eager polars read. Extension dispatch mirrors :func:`load_file`.""" | ||
| ext = os.path.splitext(path)[1].lower() | ||
| if ext == ".csv": | ||
| return pl.read_csv(path) | ||
| elif ext == ".tsv": | ||
| return pl.read_csv(path, separator="\t") | ||
| elif ext in (".parquet", ".parq"): | ||
| return pl.read_parquet(path) | ||
| elif ext == ".json": | ||
| return pl.read_ndjson(path) | ||
| else: | ||
| raise ValueError(f"Unsupported file format: {ext}") | ||
|
|
||
|
|
||
| def get_metadata_polars(df: pl.DataFrame, path: str) -> dict: | ||
| columns = [{"name": str(c), "dtype": str(d)} for c, d in zip(df.columns, df.dtypes)] | ||
| return {"path": path, "rows": len(df), "columns": columns} | ||
|
|
||
|
|
||
| def create_polars_dataflow(df, column_config_overrides=None, extra_grid_config=None, init_sd=None) -> PolarsServerDataflow: | ||
| return PolarsServerDataflow(df, column_config_overrides=column_config_overrides, | ||
| extra_grid_config=extra_grid_config, init_sd=init_sd, skip_main_serial=True) | ||
|
|
||
|
|
||
| def handle_infinite_request_buckaroo_polars( | ||
| dataflow: PolarsServerDataflow, payload_args: dict, search_string: str = "" | ||
| ) -> tuple[dict, bytes]: | ||
| """Polars analogue of :func:`handle_infinite_request_buckaroo`. | ||
|
|
||
| ``search_string`` is the per-client live-typed filter (#838) — | ||
| applied as a literal substring match across all polars ``String`` | ||
| columns. Literal (``literal=True``) so user typing isn't treated | ||
| as regex; this matches the pandas server path's ``search_df_str`` | ||
| semantics. | ||
| """ | ||
| from buckaroo.server.window import clamp_window | ||
|
|
||
| _unused, processed_df, merged_sd = dataflow.widget_args_tuple | ||
| if processed_df is None: | ||
| return ({"type": "infinite_resp", "key": payload_args, "data": [], "length": 0}, b"") | ||
| try: | ||
| if search_string: | ||
| string_cols = [c for c, dt in zip(processed_df.columns, processed_df.dtypes) | ||
| if dt == pl.String] | ||
| if string_cols: | ||
| mask = pl.any_horizontal( | ||
| pl.col(c).str.contains(search_string, literal=True) | ||
| for c in string_cols) | ||
| filtered_df = processed_df.filter(mask) | ||
| else: | ||
| filtered_df = processed_df | ||
| else: | ||
| filtered_df = processed_df | ||
|
|
||
| start, end = clamp_window( | ||
| payload_args.get("start"), payload_args.get("end"), len(filtered_df)) | ||
|
|
||
| sort = payload_args.get("sort") | ||
| if sort: | ||
| ascending = payload_args.get("sort_direction") == "asc" | ||
| converted_sort_column = merged_sd[sort]["orig_col_name"] | ||
| sorted_df = filtered_df.with_row_index().sort( | ||
| converted_sort_column, descending=not ascending) | ||
| slice_df = sorted_df[start:end] | ||
| else: | ||
| slice_df = filtered_df.with_row_index()[start:end] | ||
|
|
||
| out = BytesIO() | ||
| prepare_df_for_serialization(slice_df).write_parquet(out, compression="uncompressed") | ||
| parquet_bytes = out.getvalue() | ||
| msg = {"type": "infinite_resp", "key": payload_args, "data": [], "length": len(filtered_df)} | ||
| return msg, parquet_bytes | ||
| except Exception: | ||
| return ({"type": "infinite_resp", "key": payload_args, "data": [], "length": 0, | ||
| "error_info": traceback.format_exc()}, b"") | ||
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Oops, something went wrong.
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
When
POST /loadis called withbackend='polars'for a.jsonfile, this uses the NDJSON reader, so ordinary JSON documents that the existing pandas/loadpath accepts viapd.read_json(for example a records array like[{"a":1}]) fail even though they have the same.jsonextension. This makes the new backend unexpectedly reject valid JSON inputs; use the regular JSON reader here or reserveread_ndjsonfor an NDJSON-specific extension/option.Useful? React with 👍 / 👎.