Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
21 changes: 13 additions & 8 deletions buckaroo/customizations/styling.py
Original file line number Diff line number Diff line change
Expand Up @@ -100,19 +100,24 @@ def style_column(kls, col:str, column_metadata: Any) -> Any:
else:
disp = {'displayer': 'obj'}
base_config['tooltip_config'] = {'tooltip_type':'simple', 'val_column': str(col)}
# Lowcode ops (e.g. Search) contribute highlight metadata via SDResult;
# the JS string displayer reads these from displayer_args.
if disp.get('displayer') == 'string':
for k in ('highlight_phrase', 'highlight_regex', 'highlight_color'):
if k in column_metadata:
disp[k] = column_metadata[k]
# init_sd users may pass the same nested shape they'd use in
# column_config_overrides — e.g. {'comments': {'displayer_args':
# {'max_length': 2000}}}. Shallow-merge that into disp so init_sd
# augments (rather than replaces) styling's computed displayer_args
# AND coexists with op-supplied highlights. Caller wins per-key.
# augments (rather than replaces) styling's computed displayer_args.
# Done BEFORE the highlight branch so a column whose pandas ``_type``
# is 'obj' but which init_sd promotes to ``displayer: 'string'``
# (long-text columns like comments are the canonical case) still
# picks up lowcode-op highlight_phrase. Caller wins per-key.
if isinstance(column_metadata.get('displayer_args'), dict):
disp.update(column_metadata['displayer_args'])
# Lowcode ops (e.g. Search) contribute highlight metadata via SDResult;
# the JS string displayer reads these from displayer_args. Skip
# injecting any key the caller already set above, so an explicit
# init_sd ``displayer_args.highlight_phrase`` still wins.
if disp.get('displayer') == 'string':
for k in ('highlight_phrase', 'highlight_regex', 'highlight_color'):
if k in column_metadata and k not in disp:
disp[k] = column_metadata[k]
base_config['displayer_args'] = disp

# Compute content-aware minWidth
Expand Down
13 changes: 10 additions & 3 deletions buckaroo/server/data_loading.py
Original file line number Diff line number Diff line change
Expand Up @@ -52,9 +52,16 @@ def _df_to_obj(self, df):
return pd_to_obj(df)


def create_dataflow(df: pd.DataFrame) -> ServerDataflow:
"""Instantiate the full Buckaroo analysis pipeline headlessly."""
return ServerDataflow(df, skip_main_serial=True)
def create_dataflow(df: pd.DataFrame, column_config_overrides=None, extra_grid_config=None, init_sd=None) -> ServerDataflow:
"""Instantiate the full Buckaroo analysis pipeline headlessly.

Accepts the same per-column / per-grid configuration kwargs that
``BuckarooInfiniteWidget`` does so server-mode sessions can match the
look of a notebook widget. ``LoadHandler`` forwards the matching
request-body fields here.
"""
return ServerDataflow(df, column_config_overrides=column_config_overrides, extra_grid_config=extra_grid_config,
init_sd=init_sd, skip_main_serial=True)


def get_buckaroo_display_state(dataflow: ServerDataflow) -> dict:
Expand Down
136 changes: 136 additions & 0 deletions buckaroo/server/data_loading_polars.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,136 @@
"""Polars counterparts of the pandas-based loaders in ``data_loading``.

Lives in its own module so polars stays an optional dependency — the
server only imports this when ``/load`` is called with
``backend: "polars"``. Mirrors the shape of ``data_loading``:

* :func:`load_file_polars` reads parquet/csv/tsv/json eagerly into a
``pl.DataFrame``.
* :class:`PolarsServerDataflow` is the polars analogue of
``ServerDataflow`` — same role in the pipeline, polars-flavored
analysis / autocleaning / stats / sampling classes (lifted from
``PolarsBuckarooInfiniteWidget``).
* :func:`handle_infinite_request_buckaroo_polars` is the polars
equivalent of ``handle_infinite_request_buckaroo`` — applies the
per-client live ``search_string`` as a literal substring match
on String columns (mirrors ``search_df_str`` semantics from the
pandas path so the client-facing behaviour is identical).
"""
import os
import traceback
from io import BytesIO

import polars as pl

from buckaroo.dataflow.dataflow import CustomizableDataflow
from buckaroo.dataflow.autocleaning import PandasAutocleaning
from buckaroo.customizations.pl_autocleaning_conf import NoCleaningConfPl
from buckaroo.pluggable_analysis_framework.df_stats_v2 import PlDfStatsV2
from buckaroo.polars_buckaroo import (
PLSampling, local_analysis_klasses, prepare_df_for_serialization)
from buckaroo.serialization_utils import pd_to_obj


class PolarsServerSampling(PLSampling):
"""Server-mode polars sampling. Inherits ``PLSampling``'s widget
defaults but caps pre-stats work at ``pre_limit`` so a multi-million-row
/load doesn't OOM the stats pipeline."""
pre_limit = 1_000_000
serialize_limit = -1 # infinite mode — no per-page sample cap


class PolarsServerDataflow(CustomizableDataflow):
"""Headless polars dataflow matching ``PolarsBuckarooInfiniteWidget``."""
analysis_klasses = local_analysis_klasses
autocleaning_klass = PandasAutocleaning
autoclean_conf = tuple([NoCleaningConfPl])
DFStatsClass = PlDfStatsV2
sampling_klass = PolarsServerSampling

def _df_to_obj(self, df):
# Matches PolarsBuckarooWidget._df_to_obj — pandas frames pass
# straight through, polars frames go via to_pandas for the JSON
# initial-state path. (The hot path is the parquet infinite handler
# below, not this; this is only the initial empty/seed payload.)
import pandas as pd
if isinstance(df, pd.DataFrame):
return pd_to_obj(self.sampling_klass.serialize_sample(df))
return pd_to_obj(self.sampling_klass.serialize_sample(df.to_pandas()))


def load_file_polars(path: str) -> pl.DataFrame:
"""Eager polars read. Extension dispatch mirrors :func:`load_file`."""
ext = os.path.splitext(path)[1].lower()
if ext == ".csv":
return pl.read_csv(path)
elif ext == ".tsv":
return pl.read_csv(path, separator="\t")
elif ext in (".parquet", ".parq"):
return pl.read_parquet(path)
elif ext == ".json":
return pl.read_ndjson(path)
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P2 Badge Load regular JSON files with the polars backend

When POST /load is called with backend='polars' for a .json file, this uses the NDJSON reader, so ordinary JSON documents that the existing pandas /load path accepts via pd.read_json (for example a records array like [{"a":1}]) fail even though they have the same .json extension. This makes the new backend unexpectedly reject valid JSON inputs; use the regular JSON reader here or reserve read_ndjson for an NDJSON-specific extension/option.

Useful? React with 👍 / 👎.

else:
raise ValueError(f"Unsupported file format: {ext}")


def get_metadata_polars(df: pl.DataFrame, path: str) -> dict:
columns = [{"name": str(c), "dtype": str(d)} for c, d in zip(df.columns, df.dtypes)]
return {"path": path, "rows": len(df), "columns": columns}


def create_polars_dataflow(df, column_config_overrides=None, extra_grid_config=None, init_sd=None) -> PolarsServerDataflow:
return PolarsServerDataflow(df, column_config_overrides=column_config_overrides,
extra_grid_config=extra_grid_config, init_sd=init_sd, skip_main_serial=True)


def handle_infinite_request_buckaroo_polars(
dataflow: PolarsServerDataflow, payload_args: dict, search_string: str = ""
) -> tuple[dict, bytes]:
"""Polars analogue of :func:`handle_infinite_request_buckaroo`.

``search_string`` is the per-client live-typed filter (#838) —
applied as a literal substring match across all polars ``String``
columns. Literal (``literal=True``) so user typing isn't treated
as regex; this matches the pandas server path's ``search_df_str``
semantics.
"""
from buckaroo.server.window import clamp_window

_unused, processed_df, merged_sd = dataflow.widget_args_tuple
if processed_df is None:
return ({"type": "infinite_resp", "key": payload_args, "data": [], "length": 0}, b"")
try:
if search_string:
string_cols = [c for c, dt in zip(processed_df.columns, processed_df.dtypes)
if dt == pl.String]
if string_cols:
mask = pl.any_horizontal(
pl.col(c).str.contains(search_string, literal=True)
for c in string_cols)
filtered_df = processed_df.filter(mask)
else:
filtered_df = processed_df
else:
filtered_df = processed_df

start, end = clamp_window(
payload_args.get("start"), payload_args.get("end"), len(filtered_df))

sort = payload_args.get("sort")
if sort:
ascending = payload_args.get("sort_direction") == "asc"
converted_sort_column = merged_sd[sort]["orig_col_name"]
sorted_df = filtered_df.with_row_index().sort(
converted_sort_column, descending=not ascending)
slice_df = sorted_df[start:end]
else:
slice_df = filtered_df.with_row_index()[start:end]

out = BytesIO()
prepare_df_for_serialization(slice_df).write_parquet(out, compression="uncompressed")
parquet_bytes = out.getvalue()
msg = {"type": "infinite_resp", "key": payload_args, "data": [], "length": len(filtered_df)}
return msg, parquet_bytes
except Exception:
return ({"type": "infinite_resp", "key": payload_args, "data": [], "length": 0,
"error_info": traceback.format_exc()}, b"")
111 changes: 91 additions & 20 deletions buckaroo/server/handlers.py
Original file line number Diff line number Diff line change
Expand Up @@ -137,7 +137,7 @@ def _parse_request_body(self) -> dict:
def _validate_request(self, body: dict) -> tuple:
"""Validate and extract session_id, path, mode, prompt, no_browser, and component_config from request.

Returns (session_id, path, mode, prompt, no_browser, component_config) or a tuple of Nones on error.
Returns (session_id, path, mode, prompt, no_browser, component_config, backend) or a tuple of Nones on error.

``session`` is optional — when omitted the server mints a UUID and
returns it in the response. Lets Tauri/Electron-style hosts call /load
Expand All @@ -149,7 +149,7 @@ def _validate_request(self, body: dict) -> tuple:
if not path:
self.set_status(400)
self.write({"error": "Missing 'path'"})
return None, None, None, None, None, None
return None, None, None, None, None, None, None

if not session_id:
session_id = uuid.uuid4().hex
Expand All @@ -158,7 +158,12 @@ def _validate_request(self, body: dict) -> tuple:
prompt = body.get("prompt", "")
no_browser = bool(body.get("no_browser", False))
component_config = body.get("component_config")
return session_id, path, mode, prompt, no_browser, component_config
# ``backend`` is mode="buckaroo"-only: "pandas" (default) builds a
# ServerDataflow against pandas; "polars" builds a PolarsServerDataflow.
# Validated/branched in ``post`` — silently passed through here so a
# bad value surfaces as a 400 from the loader, not a TypeError.
backend = str(body.get("backend", "pandas")).lower()
return session_id, path, mode, prompt, no_browser, component_config, backend

def _load_lazy_polars(self, session, path: str, ldf, metadata: dict):
"""Set up lazy polars session state."""
Expand All @@ -175,14 +180,20 @@ def _load_lazy_polars(self, session, path: str, ldf, metadata: dict):
return True

def _push_state_to_clients(self, session, metadata: dict):
"""Push updated state to all connected WebSocket clients."""
"""Push updated state to all connected WebSocket clients.

Also resets each client's live ``search_string`` (#851): the
dataset just changed, so a term carried over from the previous
load would silently filter the new one.
"""
log.info("push_state path=%s ws_clients=%d", metadata.get("filename", "?"), len(session.ws_clients))
if not session.ws_clients:
return

push_msg = json.dumps(build_state_message(session, metadata=metadata))
for client in list(session.ws_clients):
try:
client.search_string = ""
client.write_message(push_msg)
except Exception:
session.ws_clients.discard(client)
Expand All @@ -195,6 +206,38 @@ def _handle_browser_window(self, session_id: str) -> str:
port = self.application.settings["port"]
return find_or_create_session_window(session_id, port, reload_if_found=True)

def _load_polars_with_error_handling(self, path: str):
"""Eager polars load for ``backend='polars'``. Errors share the
same shape as the pandas loader so the response surface is
identical from the client's POV."""
try:
from buckaroo.server.data_loading_polars import load_file_polars, get_metadata_polars
df = load_file_polars(path)
metadata = get_metadata_polars(df, path)
return df, metadata
except FileNotFoundError:
self.set_status(404)
self.write({"error_code": "file_not_found", "message": f"File not found: {path}"})
return None, None
except ImportError:
self.set_status(400)
self.write({"error_code": "missing_dependency",
"message": "backend='polars' requires polars to be installed"})
return None, None
except ValueError as e:
self.set_status(400)
self.write({"error_code": "invalid_file", "message": str(e)})
return None, None
except Exception:
tb = traceback.format_exc()
log.error("polars load error path=%s: %s", path, tb)
resp: dict = {"error_code": "load_error", "message": "Failed to load file"}
if _BUCKAROO_DEBUG:
resp["details"] = tb
self.set_status(500)
self.write(resp)
return None, None

def _load_file_with_error_handling(self, path: str, is_lazy: bool):
"""Load file and handle errors. Returns (file_obj, metadata) or (None, None)."""
try:
Expand Down Expand Up @@ -229,30 +272,49 @@ async def post(self):
if body is None:
return

session_id, path, mode, prompt, no_browser, component_config = self._validate_request(body)
session_id, path, mode, prompt, no_browser, component_config, backend = self._validate_request(body)
if session_id is None:
return

# ``backend`` only affects mode="buckaroo". Reject early so a
# mismatched lazy/viewer request doesn't silently look fine.
if backend not in ("pandas", "polars"):
self.set_status(400)
self.write({"error_code": "invalid_backend",
"message": f"backend must be 'pandas' or 'polars', got {backend!r}"})
return
if backend == "polars" and mode != "buckaroo":
self.set_status(400)
self.write({"error_code": "invalid_backend",
"message": "backend='polars' is only valid with mode='buckaroo'"})
return

# Optional per-column / per-grid configuration that mirrors the
# matching kwargs on BuckarooInfiniteWidget. Only applied for
# mode="buckaroo" where the full dataflow is built (lazy/viewer
# paths don't carry merged_sd / extra_grid_config).
column_config_overrides = body.get("column_config_overrides")
extra_grid_config = body.get("extra_grid_config")
init_sd = body.get("init_sd")

sessions = self.application.settings["sessions"]
session = sessions.get_or_create(session_id, path)
session.mode = mode
# Loading via /load is always pandas — clear any xorq state left
# by a prior /load_expr on the same session so WS dispatch routes
# to the new pandas dataflow rather than a stale xorq one.
session.backend = "pandas"
# Loading via /load is always pandas or polars — clear any xorq state
# left by a prior /load_expr on the same session so WS dispatch routes
# to the new dataflow rather than a stale xorq one.
session.backend = backend
session.xorq_dataflow = None
session.expr = None
# Reset the live-typed row-fetch filter so a search term carried
# over from a prior dataset on this session doesn't silently
# filter the new one (Codex P1 on #839). The client's fresh
# buckaroo_state has search_string="" — keep the server in sync.
session.search_string = ""
session.prompt = prompt
if component_config:
session.component_config = component_config

# Load data in appropriate mode
file_obj, metadata = self._load_file_with_error_handling(path, is_lazy=(mode == "lazy"))
if backend == "polars" and mode == "buckaroo":
file_obj, metadata = self._load_polars_with_error_handling(path)
else:
file_obj, metadata = self._load_file_with_error_handling(path, is_lazy=(mode == "lazy"))
if file_obj is None:
return

Expand All @@ -262,7 +324,13 @@ async def post(self):
session.df = file_obj
session.metadata = metadata
if mode == "buckaroo":
dataflow = create_dataflow(file_obj)
if backend == "polars":
from buckaroo.server.data_loading_polars import create_polars_dataflow
dataflow = create_polars_dataflow(file_obj, column_config_overrides=column_config_overrides,
extra_grid_config=extra_grid_config, init_sd=init_sd)
else:
dataflow = create_dataflow(file_obj, column_config_overrides=column_config_overrides,
extra_grid_config=extra_grid_config, init_sd=init_sd)
session.dataflow = dataflow
buckaroo_state = get_buckaroo_display_state(dataflow)
session.df_display_args = buckaroo_state["df_display_args"]
Expand Down Expand Up @@ -349,6 +417,9 @@ async def post(self):
prompt = body.get("prompt", "")
no_browser = bool(body.get("no_browser", False))
component_config = body.get("component_config")
column_config_overrides = body.get("column_config_overrides")
extra_grid_config = body.get("extra_grid_config")
init_sd = body.get("init_sd")

project_root = body.get("project_root")

Expand All @@ -359,7 +430,9 @@ async def post(self):
+ xorq_loading.load_project_post_processing_klasses(project_root)
if project_root else [])
xorq_dataflow = xorq_loading.XorqServerDataflow(
expr, skip_main_serial=True, extra_klasses=extra_klasses)
expr, skip_main_serial=True, extra_klasses=extra_klasses,
column_config_overrides=column_config_overrides,
extra_grid_config=extra_grid_config, init_sd=init_sd)
metadata = xorq_loading.get_xorq_metadata(xorq_dataflow, build_dir)
except FileNotFoundError:
self.set_status(404)
Expand Down Expand Up @@ -388,9 +461,6 @@ async def post(self):
session.df = None
session.dataflow = None
session.ldf = None
# Reset the live-typed row-fetch filter so a prior term doesn't
# silently filter the freshly loaded expression (Codex P1 on #839).
session.search_string = ""
session.metadata = metadata
session.prompt = prompt
if component_config:
Expand Down Expand Up @@ -422,6 +492,7 @@ async def post(self):
push_msg = json.dumps(build_state_message(session, metadata=metadata))
for client in list(session.ws_clients):
try:
client.search_string = ""
client.write_message(push_msg)
except Exception:
session.ws_clients.discard(client)
Expand Down
Loading