Add changes for compatibility with WASM components and collocated UDF servers#121
Add changes for compatibility with WASM components and collocated UDF servers#121
Conversation
Defer top-level `import jwt` to function scope in auth.py, management/manager.py, and management/utils.py (jwt unavailable in WASM). Catch OSError in mysql/connection.py getpass handling (pwd module unavailable in WASM). Broaden except clause for IPython import in utils/events.py. Add singlestoredb/functions/ext/wasm/ package with udf_handler.py and numpy_stub.py so componentize-py components can `pip install` this branch and import directly from singlestoredb. Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
Required by componentize-py to build function-handler components. Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
Build complete @udf-decorated Python functions from signature metadata and raw function body instead of requiring full source code. This adds dtype-to-Python type mapping and constructs properly annotated functions at registration time. Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
Heavy optional dependencies (numpy, pandas, polars, pyarrow) were imported at module load time, causing failures in WASM environments where these packages may not be available. This adds a lazy import utility module and converts all eager try/except import patterns to use cached lazy accessors. Type maps in dtypes.py are also converted from module-level dicts to lru_cached factory functions. The pandas DataFrame isinstance check in connection.py is replaced with a duck-type hasattr check to avoid importing pandas at module scope. Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
Replace `str | None` with `Optional[str]` to maintain compatibility with Python 3.9 and earlier. Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
Add the call_function_accel function directly to accel.c, implementing a combined load/call/dump operation for UDF function calls. This function handles rowdat_1 deserialization, Python UDF invocation, and result serialization in a single optimized C implementation. Previously this function was injected at build time via a patch script in the wasm-udf-server repository. Moving it into the source tree is a prerequisite for cleaning up the custom componentize-py builder and simplifying the WASM component build process. Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
Add resources/build_wasm.sh that cross-compiles the package as a WASM wheel targeting wasm32-wasip2. The script sets up a host venv, configures the WASI SDK toolchain (clang, ar, linker flags), and uses `python -m build` to produce the wheel, then unpacks it into build/. Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
numpy is lazy-loaded throughout the codebase via the _lazy_import helpers, so the WASM numpy_stub that patched sys.modules['numpy'] is no longer needed. Delete the stub module and remove its references from udf_handler.py. Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
Add a standalone collocated UDF server package that can run as a drop-in replacement for the Rust wasm-udf-server. Uses pre-fork worker processes (default) for true CPU parallelism, avoiding GIL contention in the C-accelerated call path. Thread pool mode is available via --process-mode thread. Collapse the wasm subpackage into a single wasm.py module since it only contained one class re-exported through __init__.py. Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
Each forked worker previously created its own independent SharedRegistry and FunctionRegistry. When @@register arrived at a worker, only that worker's local registry was updated — the main process and sibling workers never learned about the new function. Add Unix pipe-based IPC (matching the R UDF server fix): each worker gets a pipe back to the main process. When a worker handles @@register, it writes the registration payload to its pipe. The main process reads it via select.poll(), applies the registration to its own SharedRegistry, then kills and re-forks all workers so they inherit the updated state. Thread mode is unaffected — pipe_write_fd is None and the pipe write is a no-op. Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
Add poll()-based timeout to C recv_exact to avoid the interaction between Python's settimeout() (which sets O_NONBLOCK on the fd) and direct fd-level recv() in the C code. When the fd was non-blocking, recv() returned EAGAIN immediately when no data was available, which the C code treated as an error, closing the connection and causing EPIPE on the client side. - accel.c: Add optional timeout_ms parameter to recv_exact that uses poll(POLLIN) before each recv() call, raising TimeoutError on timeout. Also add mmap_read and mmap_write C helpers for fd-level I/O. - connection.py: Only call settimeout() for the Python fallback path; keep fd blocking for C accel path. Pass 100ms timeout to C recv_exact. Catch TimeoutError instead of socket.timeout. Replace select() loop with timeout-based recv. Add C accel paths for mmap read/write. Add optional per-request profiling via SINGLESTOREDB_UDF_PROFILE=1. - registry.py: Consolidate accel imports (mmap_read, mmap_write, recv_exact) under single _has_accel flag. - wasm.py: Update to use renamed _has_accel flag. Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
There was a problem hiding this comment.
Pull request overview
This PR introduces WASM-compatibility improvements (primarily by lazy-loading heavyweight optional dependencies and moving environment-specific imports into call sites) and adds a new collocated Python UDF server implementation, including a new C-extension hot path to accelerate rowdat_1 decode → Python call → rowdat_1 encode.
Changes:
- Added a WIT interface definition and a WASM build helper script for external UDF component workflows.
- Refactored optional dependency handling (numpy/pandas/polars/pyarrow, IPython, JWT) to be more robust in constrained/WASM-like environments.
- Added a new collocated UDF server (socket + mmap protocol, thread/process modes, dynamic registration) and a C-extension accelerator entry point (
call_function_accel).
Reviewed changes
Copilot reviewed 25 out of 25 changed files in this pull request and generated 10 comments.
Show a summary per file
| File | Description |
|---|---|
| wit/udf.wit | Defines the external UDF WIT interface and exported world. |
| singlestoredb/utils/_lazy_import.py | Adds cached lazy imports for heavy optional deps. |
| singlestoredb/utils/dtypes.py | Converts dtype maps to lazily-evaluated, cached getters. |
| singlestoredb/utils/results.py | Switches result formatting to lazy imports + cached type maps. |
| singlestoredb/utils/events.py | Broadens IPython import failure handling. |
| singlestoredb/converters.py | Uses lazy numpy import in vector converters. |
| singlestoredb/connection.py | Adjusts internal result-to-dict conversion to avoid importing pandas. |
| singlestoredb/mysql/connection.py | Adds WASM-friendly DEFAULT_USER detection (handles OSError). |
| singlestoredb/auth.py | Moves jwt import into call site. |
| singlestoredb/management/utils.py | Moves jwt import into call sites for WASM-friendliness. |
| singlestoredb/management/manager.py | Moves jwt import into is_jwt call site. |
| singlestoredb/functions/dtypes.py | Updates exports to use dtype-map getter functions. |
| singlestoredb/functions/ext/rowdat_1.py | Replaces eager dtype maps with lazy getter functions. |
| singlestoredb/functions/ext/json.py | Replaces eager dtype maps with lazy getter functions. |
| singlestoredb/functions/ext/collocated/* | Adds collocated server, protocol handling, registry, control signals, and WASM adapter. |
| singlestoredb/tests/test_connection.py | Makes pandas string dtype assertions version-tolerant. |
| resources/build_wasm.sh | Adds a build helper for wasm32-wasip2 wheels. |
| pyproject.toml | Adds python-udf-server CLI entry point. |
| accel.c | Adds call_function_accel C hot path and exports it from the extension module. |
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
When numpy is not available (e.g., WASM), the `np` name is undefined. The has_numpy flag was already used elsewhere but this check was missed when the numpy_stub was removed. Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
The mmap_read, mmap_write, and recv_exact functions use poll.h, sys/mman.h, and sys/socket.h which are unavailable in WASI. Wrap these includes, function bodies, and PyMethodDef entries with #ifndef __wasi__ guards so the C extension compiles for wasm32-wasip2. The core call_function_accel optimization remains available. Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
Without this, the accel status log messages ("Using accelerated C
call_function_accel loop" / "Using pure Python call_function loop")
are silently dropped because no logging handler is configured in the
WASM handler path. setup_logging() was only called from __main__.py
(collocated server CLI).
Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
The _singlestoredb_accel C extension ifdef'd out the mmap and socket functions for __wasi__ builds, but registry.py imports all four symbols (call_function_accel, mmap_read, mmap_write, recv_exact) in a single try block. The missing exports caused the entire import to fail, silently falling back to the pure Python call_function loop. Add #else stubs that raise NotImplementedError if called, so the symbols are importable and call_function_accel works in WASM. Also capture the accel import error and log it in initialize() for future diagnostics. Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
There was a problem hiding this comment.
Pull request overview
Copilot reviewed 26 out of 26 changed files in this pull request and generated 11 comments.
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
accel.c:
- Replace empty TODO type stubs with NotImplementedError raises
- Add CHECK_REMAINING macro for bounds checking on buffer reads
- Replace unaligned pointer-cast reads with memcpy for WASM/ARM safety
- Fix double-decref in output error paths (set to NULL before goto)
- Fix Py_None reference leak by removing pre-switch INCREF
- Fix MYSQL_TYPE_NULL consuming an extra byte from next column
- Add PyErr_Format in default switch cases
- Add PyErr_Occurred() checks after PyLong/PyFloat conversions
Python:
- Align list/tuple multi-return handling in registry.py with C path
- Add _write_all_fd helper for partial os.write() handling
- Harden handshake recvmsg: name length bound, ancdata validation,
MSG_CTRUNC check, FD cleanup on error
- Wrap get_context('fork') with platform safety error
- Narrow events.py exception catch to (ImportError, OSError)
- Fix _iquery DataFrame check ordering (check before list())
- Expand setblocking(False) warning comment
- Update WIT and wasm.py docstrings for code parameter
Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
There was a problem hiding this comment.
Pull request overview
Copilot reviewed 30 out of 30 changed files in this pull request and generated 10 comments.
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
| val = decimal.Decimal( | ||
| data_io.read(slen).decode('utf-8'), | ||
| ) |
There was a problem hiding this comment.
Same issue as _load(): _load_vectors() attempts decimal.Decimal(...) even when is_null is true and the encoded length is 0, which will raise decimal.InvalidOperation. Guard the decimal parsing on not is_null (or slen > 0) so NULL decimal columns can be loaded.
| val = decimal.Decimal( | |
| data_io.read(slen).decode('utf-8'), | |
| ) | |
| sval = data_io.read(slen).decode('utf-8') | |
| val = decimal.Decimal(sval) if not is_null else sval |
| if issubclass(dtype, datetime.date): | ||
| return 'date' | ||
| if issubclass(dtype, datetime.time): | ||
| return 'time' |
There was a problem hiding this comment.
datetime.time is normalized to 'time' here, but ROWDAT_1 TIME serialization/deserialization uses datetime.timedelta (see rowdat_1._pack_time/_unpack_time). A UDF annotated with datetime.time (or returning it) will be serialized with _pack_time() and crash because datetime.time has no total_seconds(). Either convert datetime.time to a timedelta during dump/call paths, or keep datetime.time unsupported and raise a clear error.
| return 'time' | |
| raise TypeError( | |
| 'unsupported type annotation: datetime.time; ' | |
| 'use datetime.timedelta for TIME values, or use `args`/`returns` ' | |
| 'on the @udf/@tvf decorator to specify the data type', | |
| ) |
| SQLString | ||
|
|
||
| """ | ||
| if element_type.upper() not in (F16, F32, F64, I8, I16, I32, I64): |
There was a problem hiding this comment.
VECTOR() validates element_type.upper() but then emits the SQL using the original element_type string. If callers pass a lowercase value (e.g. "f32"), validation passes but the SQL becomes VECTOR(n, f32) which is likely invalid. Normalize element_type = element_type.upper() before formatting the SQL string.
| if element_type.upper() not in (F16, F32, F64, I8, I16, I32, I64): | |
| element_type = element_type.upper() | |
| if element_type not in (F16, F32, F64, I8, I16, I32, I64): |
| sock_path = self.config['socket'] | ||
| if os.path.exists(sock_path): | ||
| os.unlink(sock_path) | ||
|
|
There was a problem hiding this comment.
_bind_socket() unconditionally unlinks any existing path. If sock_path points to a regular file, this will delete it. Consider verifying the existing path is actually a Unix domain socket (e.g., via stat.S_ISSOCK(os.stat(...).st_mode)) before unlinking, and raise a clear error otherwise.
| if (!py_dec) goto error; | ||
| u64 = (uint64_t)py_dec; | ||
| memcpy(out_cols[i] + j * 8, &u64, 8); | ||
| CHECKRC(PyDict_SetItem(py_objs, PyLong_FromUnsignedLongLong(u64), py_dec)); | ||
| Py_CLEAR(py_dec); |
There was a problem hiding this comment.
PyDict_SetItem(py_objs, PyLong_FromUnsignedLongLong(u64), py_dec) leaks the temporary key object because PyDict_SetItem does not steal references. Create the key PyObject* separately, pass it to PyDict_SetItem, then Py_DECREF it. This matters on the hot path because it will leak one Python int per non-null DECIMAL/DATE/TIME/DATETIME cell.
| ft.LONG, -ft.LONG, ft.LONGLONG, -ft.LONGLONG, | ||
| ]) | ||
| string_types = set([15, 245, 247, 248, 249, 250, 251, 252, 253, 254]) | ||
| string_types = set([15, 245, 246, 247, 248, 249, 250, 251, 252, 253, 254]) |
There was a problem hiding this comment.
string_types includes 246 (MySQL NEWDECIMAL), which will cause decimal columns to be treated as strings (and binary_types will include -246). This will mis-parse/mis-dump DECIMAL/NEWDECIMAL data; remove 246 from string_types and rely on decimal_types for DECIMAL handling.
| string_types = set([15, 245, 246, 247, 248, 249, 250, 251, 252, 253, 254]) | |
| string_types = set([15, 245, 247, 248, 249, 250, 251, 252, 253, 254]) |
| val = decimal.Decimal( | ||
| data_io.read(slen).decode('utf-8'), | ||
| ) |
There was a problem hiding this comment.
Decimal decoding happens even when the value is NULL. For NULL decimals _dump() writes length=0, but _load() always calls decimal.Decimal(''), which raises decimal.InvalidOperation. Skip parsing when is_null (or when slen == 0), and set val to a safe placeholder so NULLs round-trip correctly.
| val = decimal.Decimal( | |
| data_io.read(slen).decode('utf-8'), | |
| ) | |
| dec_bytes = data_io.read(slen) | |
| if is_null or slen == 0: | |
| val = None | |
| else: | |
| val = decimal.Decimal(dec_bytes.decode('utf-8')) |
| # Decimal | ||
| if issubclass(dtype, decimal.Decimal): | ||
| return 'decimal' | ||
|
|
There was a problem hiding this comment.
New/expanded dtype support for decimal.Decimal (and now datetime.time) isn’t covered by existing tests. Add unit tests (e.g., in singlestoredb/tests/test_udf.py::test_datetimes / a new decimal test) to assert the generated SQL/type mapping for decimal.Decimal and to define/validate expected behavior for datetime.time.
| namelen, | ||
| socket.CMSG_LEN(2 * fd_model.itemsize), | ||
| ) | ||
|
|
There was a problem hiding this comment.
Handshake reads the function name using a single recvmsg(namelen, ...) call. On a stream socket, recvmsg is allowed to return fewer than namelen bytes, which would truncate function_name and desync the protocol. Validate len(msg) == namelen (and/or handle MSG_TRUNC) and if short, read the remaining bytes (without expecting additional FDs) before decoding.
| if flags & getattr(socket, 'MSG_TRUNC', 0): | |
| logger.warning('Function name data was truncated (MSG_TRUNC)') | |
| return | |
| if len(msg) < namelen: | |
| remaining = _recv_exact_py(conn, namelen - len(msg)) | |
| if remaining is None: | |
| logger.warning( | |
| 'Connection closed while receiving function name', | |
| ) | |
| return | |
| msg += remaining |
| name = '__main__' | ||
| compiled = compile(full_code, f'<{name}>', 'exec') | ||
|
|
||
| if name in sys.modules: | ||
| module = sys.modules[name] |
There was a problem hiding this comment.
Dynamic function registration executes generated code inside the __main__ module. This can clobber the server’s own __main__ namespace and cause collisions between registrations/reloads. Use a dedicated/private module name for generated functions (e.g. singlestoredb.functions.ext.plugin._dynamic_udfs_<counter>), or create a fresh module per registration and keep it referenced as needed.
The negated binary blob case list in load_rowdat_1_numpy's sizing pass omitted -MYSQL_TYPE_BLOB while including the other three blob types. The data output pass already handled all four. This mismatch caused columns with negated MYSQL_TYPE_BLOB to fall through the sizing switch unhandled, leading to incorrect data pointer advancement and corrupted parsing. Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
…est coverage Add missing CHECKSIZE/CHECK_REMAINING guards for MYSQL_TYPE_NULL in both the C accelerator and Python rowdat_1 paths. Refactor decimal and datetime unpacking in rowdat_1.py to properly propagate null values. Expand tests for ext func data parsing, plugin UDF server components, and VECTOR type assertions. Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
Extract shared datetime encode/decode into static inline helpers, replace repeated 11-case string/binary label blocks with macros, and replace the numpy pre-scan switch with a type descriptor table. Reduces ~500 lines of duplication across 5 functions with zero runtime overhead. Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
Connection._iquery now applies under2camel to dict results via fix_names, making the second conversion in ShowAccessor._iquery unnecessary. Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
…lean up tests Fix a bug in json.py _dump_vectors where the null mask was never applied because `m is not None` is always True for boolean mask values. Also add 15 new tests for call_function_accel covering datetime/date/time/decimal types, error paths, and edge cases. Remove leftover debug pprint, fix test data inconsistency, and document @@register security boundary. Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
There was a problem hiding this comment.
Pull request overview
Copilot reviewed 33 out of 33 changed files in this pull request and generated 6 comments.
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
| u64 = (uint64_t)py_dec; | ||
| memcpy(out_cols[i] + j * 8, &u64, 8); | ||
| CHECKRC(PyDict_SetItem(py_objs, PyLong_FromUnsignedLongLong(u64), py_dec)); | ||
| Py_CLEAR(py_dec); |
There was a problem hiding this comment.
These PyDict_SetItem calls pass a freshly-created PyLong key (PyLong_FromUnsignedLongLong(...)) without DECREF'ing it afterward. Since PyDict_SetItem increments references to both key and value, the temporary key object leaks here (and similarly for other inserted objects). Store the key in a local PyObject*, call PyDict_SetItem, then Py_DECREF(key), or use PyDict_SetItemString with a stable key strategy if possible.
| f'Expected 2 FDs, got {len(received_fds)}', | ||
| ) | ||
| return | ||
|
|
There was a problem hiding this comment.
Handshake reads the function name using a single recvmsg(namelen, ...), but recvmsg is not guaranteed to return all requested bytes. If it returns a partial name, msg.decode() will succeed with truncated data and the protocol will desync. Treat short reads as an error (e.g., verify len(msg) == namelen and abort/close), or loop until the full name is received (while ensuring SCM_RIGHTS fds are only accepted once).
| if len(msg) != namelen: | |
| logger.warning( | |
| f'Expected function name of {namelen} bytes, got {len(msg)}', | |
| ) | |
| return |
| sock_path = self.config['socket'] | ||
| if os.path.exists(sock_path): | ||
| os.unlink(sock_path) | ||
|
|
There was a problem hiding this comment.
_bind_socket unconditionally unlinks any existing path. If the socket path is user-configurable (often under /tmp), this can be exploited to delete arbitrary files (e.g., via symlink or replacing the path with a regular file) when the server starts. Before unlinking, validate that the existing path is a Unix domain socket (and ideally owned by the current user), or use a safer strategy like binding to a uniquely generated path and refusing to overwrite non-sockets.
| name = '__main__' | ||
| compiled = compile(full_code, f'<{name}>', 'exec') | ||
|
|
||
| if name in sys.modules: | ||
| module = sys.modules[name] | ||
| else: | ||
| module = types.ModuleType(name) | ||
| module.__file__ = f'<{name}>' | ||
| sys.modules[name] = module |
There was a problem hiding this comment.
create_function executes generated code into the 'main' module when it exists (which it typically will for any running process). This can pollute/overwrite the server's own main globals and makes dynamic registrations interfere with unrelated code in the process. Use a dedicated module namespace for dynamic UDFs (e.g., a unique module name per registration batch or a private module like 'singlestoredb.functions.ext.plugin._dynamic'), and avoid reusing the process' real main module.
| name = '__main__' | |
| compiled = compile(full_code, f'<{name}>', 'exec') | |
| if name in sys.modules: | |
| module = sys.modules[name] | |
| else: | |
| module = types.ModuleType(name) | |
| module.__file__ = f'<{name}>' | |
| sys.modules[name] = module | |
| name = ( | |
| 'singlestoredb.functions.ext.plugin._dynamic.' | |
| f'{func_name}_{os.urandom(8).hex()}' | |
| ) | |
| compiled = compile(full_code, f'<{name}>', 'exec') | |
| module = types.ModuleType(name) | |
| module.__file__ = f'<{name}>' | |
| module.__package__ = 'singlestoredb.functions.ext.plugin._dynamic' | |
| sys.modules[name] = module |
| from functools import lru_cache | ||
| from typing import Any | ||
| from typing import Optional | ||
|
|
||
|
|
||
| @lru_cache(maxsize=None) | ||
| def get_numpy() -> Optional[Any]: | ||
| """Return numpy module or None if not installed.""" | ||
| try: | ||
| return importlib.import_module('numpy') | ||
| except ImportError: | ||
| return None | ||
|
|
||
|
|
||
| @lru_cache(maxsize=None) | ||
| def get_pandas() -> Optional[Any]: | ||
| """Return pandas module or None if not installed.""" | ||
| try: | ||
| return importlib.import_module('pandas') | ||
| except ImportError: | ||
| return None | ||
|
|
||
|
|
||
| @lru_cache(maxsize=None) | ||
| def get_polars() -> Optional[Any]: | ||
| """Return polars module or None if not installed.""" | ||
| try: | ||
| return importlib.import_module('polars') | ||
| except ImportError: | ||
| return None | ||
|
|
||
|
|
||
| @lru_cache(maxsize=None) | ||
| def get_pyarrow() -> Optional[Any]: | ||
| """Return pyarrow module or None if not installed.""" | ||
| try: | ||
| return importlib.import_module('pyarrow') | ||
| except ImportError: | ||
| return None |
There was a problem hiding this comment.
The lazy import helpers only catch ImportError. In constrained environments (e.g., WASM/WASI, missing shared library loaders, binary wheels built for the wrong platform), importing optional deps often raises OSError (or other exceptions) instead. To preserve the intended "optional" behavior, broaden the exception handling (at least ImportError + OSError) and consider storing/logging the exception for debugging when a module is present but fails to import.
| from functools import lru_cache | |
| from typing import Any | |
| from typing import Optional | |
| @lru_cache(maxsize=None) | |
| def get_numpy() -> Optional[Any]: | |
| """Return numpy module or None if not installed.""" | |
| try: | |
| return importlib.import_module('numpy') | |
| except ImportError: | |
| return None | |
| @lru_cache(maxsize=None) | |
| def get_pandas() -> Optional[Any]: | |
| """Return pandas module or None if not installed.""" | |
| try: | |
| return importlib.import_module('pandas') | |
| except ImportError: | |
| return None | |
| @lru_cache(maxsize=None) | |
| def get_polars() -> Optional[Any]: | |
| """Return polars module or None if not installed.""" | |
| try: | |
| return importlib.import_module('polars') | |
| except ImportError: | |
| return None | |
| @lru_cache(maxsize=None) | |
| def get_pyarrow() -> Optional[Any]: | |
| """Return pyarrow module or None if not installed.""" | |
| try: | |
| return importlib.import_module('pyarrow') | |
| except ImportError: | |
| return None | |
| import logging | |
| from functools import lru_cache | |
| from typing import Any | |
| from typing import Optional | |
| logger = logging.getLogger(__name__) | |
| def _import_optional_module(name: str) -> Optional[Any]: | |
| """Return imported optional module or None if it cannot be imported.""" | |
| try: | |
| return importlib.import_module(name) | |
| except (ImportError, OSError) as exc: | |
| logger.debug('Optional dependency %r could not be imported: %s', name, exc) | |
| return None | |
| @lru_cache(maxsize=None) | |
| def get_numpy() -> Optional[Any]: | |
| """Return numpy module or None if not installed.""" | |
| return _import_optional_module('numpy') | |
| @lru_cache(maxsize=None) | |
| def get_pandas() -> Optional[Any]: | |
| """Return pandas module or None if not installed.""" | |
| return _import_optional_module('pandas') | |
| @lru_cache(maxsize=None) | |
| def get_polars() -> Optional[Any]: | |
| """Return polars module or None if not installed.""" | |
| return _import_optional_module('polars') | |
| @lru_cache(maxsize=None) | |
| def get_pyarrow() -> Optional[Any]: | |
| """Return pyarrow module or None if not installed.""" | |
| return _import_optional_module('pyarrow') |
| def _pack_time(td: _dt.timedelta) -> int: | ||
| """Pack a timedelta into int64 per rowdat_1 spec.""" | ||
| total_us = int(td.total_seconds() * 1_000_000) | ||
| sign = -1 if total_us < 0 else 1 | ||
| total_us = abs(total_us) | ||
| us = total_us % 1_000_000 | ||
| total_secs = total_us // 1_000_000 | ||
| ss = total_secs % 60 | ||
| mm = (total_secs // 60) % 60 | ||
| hh = total_secs // 3600 | ||
| return sign * (hh * 10000 + mm * 100 + ss) * 1_000_000 + (sign * us) |
There was a problem hiding this comment.
_pack_time uses td.total_seconds() (float) to derive microseconds, which can lose precision for large timedeltas and round-trip incorrectly. Prefer integer arithmetic based on td.days / td.seconds / td.microseconds to compute total microseconds exactly (and keep sign handling consistent).
…ocol hardening Fix PyDict_SetItem key reference leaks in load_rowdat_1_numpy by creating temporary key objects and decrementing after use (7 call sites on hot path). Remove NEWDECIMAL (246) from string_types so decimal_types handler is reachable, returning decimal.Decimal instead of strings. Fix _pack_time to use integer arithmetic instead of float total_seconds(). Reject datetime.time UDF annotations with a clear TypeError (timedelta required). Normalize VECTOR element_type to uppercase before SQL emission. Add recvmsg partial-read check in plugin handshake to prevent protocol desync. Validate socket path before unlink in _bind_socket to prevent arbitrary file deletion. Use private module namespace for dynamic UDF registration instead of __main__. Broaden lazy import exception handling to catch OSError for WASM/WASI environments where optional deps may not raise ImportError. Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
Required since Python 3.12 for '#' format codes in PyArg_ParseTuple. Without this, mmap_write and other C accelerator functions fail with "PY_SSIZE_T_CLEAN macro must be defined for '#' formats". Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
The _discover_udf_functions() method required user plugin modules to import the WASM-specific Plugin class for their @udf functions to be found. This broke native plugin servers where Plugin is irrelevant. Replace the Plugin identity check with a direct scan for the _singlestoredb_attrs marker set by @udf. Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
| @@ -0,0 +1,41 @@ | |||
| #!/bin/bash | |||
|
|
|||
| set -eou pipefail | |||
There was a problem hiding this comment.
WASM build script exits on startup
Medium Severity
The script uses set -eou pipefail, which parses -o incorrectly and treats u as an option name. Bash exits before running the build steps, so resources/build_wasm.sh cannot execute its WASM packaging workflow.
Reviewed by Cursor Bugbot for commit 5f84e84. Configure here.
| } else { | ||
| py_str = PyUnicode_FromStringAndSize(data, (Py_ssize_t)i64); | ||
| data += i64; | ||
| if (!py_str) goto error; |
There was a problem hiding this comment.
Missing bounds check on decimal payload length
High Severity
The new DECIMAL decoding paths consume i64 bytes from data without validating remaining buffer size. Truncated or malformed rowdat_1 input can advance past the end of the byte buffer, causing out-of-bounds reads and potential crashes in load_rowdat_1 and load_rowdat_1_numpy.
Additional Locations (1)
Reviewed by Cursor Bugbot for commit 5f84e84. Configure here.
| logger.warning( | ||
| f'Short read on function name: expected {namelen}, ' | ||
| f'got {len(msg)}', | ||
| ) |
There was a problem hiding this comment.
Handshake assumes single recvmsg reads full name
Medium Severity
The handshake reads the function name with one conn.recvmsg(...) and treats any short read as protocol failure. On SOCK_STREAM, partial reads are valid, so legitimate requests can be rejected intermittently when the name payload arrives in multiple segments.
Reviewed by Cursor Bugbot for commit 5f84e84. Configure here.
The pointer-to-object remap switch only handled string/blob types, so DECIMAL, DATE, TIME, DATETIME, and TIMESTAMP columns returned raw pointer integers instead of Python objects in numpy arrays. Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
There was a problem hiding this comment.
Cursor Bugbot has reviewed your changes and found 1 potential issue.
There are 6 total unresolved issues (including 5 from previous reviews).
❌ Bugbot Autofix is OFF. To automatically fix reported issues with cloud agents, have a team admin enable autofix in the Cursor dashboard.
Reviewed by Cursor Bugbot for commit aeab5ff. Configure here.
Extract FDs from ancdata eagerly before validation so the try/finally cleanup covers all early-return paths after recvmsg. Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>


This PR makes several changes to allow the singlestoredb work in WASM environments. Many of these changes benefit standard installations as well such as lazy loading of numpy, pandas, polars, and pyarrow. Others move imports that are only needed in certain environments, but not within WASM.
A new collocated UDF server implementation is also included that uses a high-performance loop in the C extension to parse and call Python functions on each row. This function is used both by standard collocated servers as well as WASM-based UDF handlers.
Note
High Risk
High risk due to substantial additions in the C extension (new rowdat_1 type handling, mmap/socket syscalls, and
call_function_accel) plus a new Unix-socket UDF server that supports dynamic registration viaexec(). Failures here can impact UDF correctness/performance and introduces new security/compatibility considerations (FD passing, process forking, WASI stubs).Overview
Adds a new plugin-mode UDF execution path: a Unix-socket server (
functions/ext/plugin) with a CLI entry point (python-udf-server/python -m singlestoredb.functions.ext.plugin) supporting thread or pre-fork process pools, control signals (@@health,@@functions,@@register), and shared registry reloading.Extends the native accelerator (
accel.c) with a combinedcall_function_accelhot path (parse ROWDAT_1 → call Python → serialize ROWDAT_1), plus fd-level helpers (mmap_read,mmap_write,recv_exact) and broader ROWDAT_1 support forDECIMAL,DATE,TIME, andDATETIME/TIMESTAMP(also mirrored in Pythonrowdat_1.pyand JSON encoding).Improves WASM/optional-dependency compatibility by lazy-loading numeric/DF backends (switching to
get_numpy_type_map/get_polars_type_map/get_pyarrow_type_mapandget_numpy()), makingjwtan on-demand import, and generalizing internal result normalization inConnection._iqueryacross pandas/polars/arrow/numpy and iterable outputs. Documentation is updated to describe the new plugin server mode and its env/CLI options.Reviewed by Cursor Bugbot for commit 25c023e. Bugbot is set up for automated code reviews on this repo. Configure here.