diff --git a/docs/superpowers/plans/2026-06-30-rayon-multithread-verification.md b/docs/superpowers/plans/2026-06-30-rayon-multithread-verification.md new file mode 100644 index 00000000..a9bae62f --- /dev/null +++ b/docs/superpowers/plans/2026-06-30-rayon-multithread-verification.md @@ -0,0 +1,773 @@ +# Rayon Multithread Verification & Thread-Cap Fix — Implementation Plan + +> **For agentic workers:** REQUIRED SUB-SKILL: Use superpowers:subagent-driven-development (recommended) or superpowers:executing-plans to implement this plan task-by-task. Steps use checkbox (`- [ ]`) syntax for tracking. + +**Goal:** Make gvl's rayon-parallel paths runnable and verified under real multithreaded load, fix the `cap_threads()` oversubscription bugs behind issue #263, release the GIL around the rayon FFI calls, and unblock the pre-commit pyrefly hook. + +**Architecture:** Four Python-side changes in `_threads.py` (force-parallel override, CFS-quota-aware CPU detection, `RAYON_NUM_THREADS` overwrite) and `tests/parity/` (pyrefly fix); one Rust change wrapping every parallel FFI entry point in `py.allow_threads`; and two new test modules (forced-parallel equivalence + a spawn-worker stress reproducer). The size gate in `should_parallelize()` is the single chokepoint the tests flip via `GVL_FORCE_PARALLEL`. + +**Tech Stack:** Python 3.10–3.13, Rust + PyO3 + rayon (via maturin), pytest, pixi (`-e dev`). + +**Spec:** `docs/superpowers/specs/2026-06-30-rayon-multithread-verification-design.md` +**Issue:** https://github.com/mcvickerlab/GenVarLoader/issues/263 + +## Global Constraints + +- **Rebuild Rust before any Python test that imports the extension:** `pixi run -e dev maturin develop --release`. `pytest` does NOT rebuild; a stale `.so` silently masks Rust changes. (`cargo test` compiles from source and is exempt.) +- All dev commands run under pixi: `pixi run -e dev `. Platform is linux-64. +- Lint/format cover BOTH trees: `ruff check python/ tests/` and `ruff format python/ tests/`. E501 is ignored. +- Typecheck: `pixi run -e dev typecheck` (pyrefly) must report **zero** errors — the pre-commit hook runs it and blocks commits. +- **No inline `# pyrefly: ignore`** — an ignore that becomes unused in another environment breaks CI via `unused-ignore`. Fix by restructuring code. +- Conventional-commit messages (commitizen). End commit messages with `Co-Authored-By: Claude Opus 4.8 `. +- prek/pre-commit hooks must be installed and must pass **without** `--no-verify` by the end of the work. +- `should_parallelize()` / `_force_parallel()` must read `os.environ` **live** on every call (no import-time capture) so tests can toggle env vars with `monkeypatch.setenv`. + +--- + +## File Structure + +- `python/genvarloader/_threads.py` — MODIFY. Add `_force_parallel()`, `_cgroup_cpu_quota()`; edit `should_parallelize()`, `_detect_cpus()`, `cap_threads()`. Update module docstring. +- `tests/unit/test_threads.py` — MODIFY. Add tests for the three `_threads.py` changes. +- `tests/parity/test_gen_dataset_goldens.py` — MODIFY. Fix `unbound-name` (lines ~262–270). +- `tests/parity/test_variants_dataset_parity.py` — MODIFY. Fix `unbound-name` (lines ~85–115). +- `src/ffi/mod.rs` — MODIFY. Wrap the 11 parallel FFI entry points in `py.allow_threads`. +- `tests/integration/test_rayon_forced_parallel.py` — CREATE. End-to-end forced-parallel == serial equivalence. +- `tests/integration/test_rayon_stress.py` — CREATE. Spawn-worker stress reproducer (`slow`). + +--- + +## Task 1: `GVL_FORCE_PARALLEL` override + +**Files:** +- Modify: `python/genvarloader/_threads.py` +- Test: `tests/unit/test_threads.py` + +**Interfaces:** +- Produces: `_force_parallel() -> bool` (reads `GVL_FORCE_PARALLEL` live); `should_parallelize(total_bytes: int) -> bool` short-circuits to `True` when `_force_parallel()`. + +- [ ] **Step 1: Write the failing tests** + +Append to `tests/unit/test_threads.py`: + +```python +import pytest + + +@pytest.mark.parametrize("val", ["1", "true", "TRUE", "yes", "on", "On"]) +def test_force_parallel_truthy(monkeypatch, val): + monkeypatch.setenv("GVL_FORCE_PARALLEL", val) + # Below the byte threshold, but forced on → parallel. + assert th.should_parallelize(0) is True + + +@pytest.mark.parametrize("val", ["0", "false", "no", "off", "", "banana"]) +def test_force_parallel_falsy_falls_back_to_threshold(monkeypatch, val): + monkeypatch.setattr(th, "_NUM_THREADS", None) + monkeypatch.delenv("GVL_NUM_THREADS", raising=False) + monkeypatch.setenv("GVL_FORCE_PARALLEL", val) + _constrain_detected_cpus(monkeypatch, 4) + # Not forced → normal size gate applies. + assert th.should_parallelize(0) is False + assert th.should_parallelize(4 * th._MIN_BYTES_PER_THREAD) is True + + +def test_force_parallel_unset(monkeypatch): + monkeypatch.delenv("GVL_FORCE_PARALLEL", raising=False) + monkeypatch.setattr(th, "_NUM_THREADS", None) + monkeypatch.delenv("GVL_NUM_THREADS", raising=False) + _constrain_detected_cpus(monkeypatch, 4) + assert th.should_parallelize(0) is False +``` + +- [ ] **Step 2: Run to verify failure** + +Run: `pixi run -e dev pytest tests/unit/test_threads.py -k force_parallel -v` +Expected: FAIL — `AttributeError`/wrong result (`_force_parallel` / short-circuit not implemented). + +- [ ] **Step 3: Implement** + +In `python/genvarloader/_threads.py`, add near the top after `_NUM_THREADS`: + +```python +_TRUTHY = frozenset({"1", "true", "yes", "on"}) + + +def _force_parallel() -> bool: + """True iff GVL_FORCE_PARALLEL is set to a truthy value (read live).""" + return os.environ.get("GVL_FORCE_PARALLEL", "").strip().lower() in _TRUTHY +``` + +Replace `should_parallelize`: + +```python +def should_parallelize(total_bytes: int) -> bool: + # GVL_FORCE_PARALLEL bypasses the size gate so the multithreaded paths run + # on small inputs (tests, repro harnesses). See issue #263. + if _force_parallel(): + return True + return total_bytes >= num_threads() * _MIN_BYTES_PER_THREAD +``` + +- [ ] **Step 4: Run to verify pass** + +Run: `pixi run -e dev pytest tests/unit/test_threads.py -k force_parallel -v` +Expected: PASS (all parametrizations). + +- [ ] **Step 5: Commit** + +```bash +git add python/genvarloader/_threads.py tests/unit/test_threads.py +git commit -m "feat(threads): add GVL_FORCE_PARALLEL to bypass the size gate" +``` + +--- + +## Task 2: CFS-quota-aware CPU detection + +**Files:** +- Modify: `python/genvarloader/_threads.py` +- Test: `tests/unit/test_threads.py` + +**Interfaces:** +- Produces: `_cgroup_cpu_quota() -> int | None` (effective CPU count from a CFS quota, or `None` when no quota / unreadable). `_detect_cpus()` returns `min(affinity, quota)` when a quota is present. +- Consumes: nothing from Task 1. + +- [ ] **Step 1: Write the failing tests** + +Append to `tests/unit/test_threads.py`: + +```python +import math + + +def test_cgroup_quota_v2_parses_cpu_max(monkeypatch, tmp_path): + f = tmp_path / "cpu.max" + f.write_text("1530000 100000\n") # 15.3 cores + monkeypatch.setattr(th, "_CGROUP_V2_CPU_MAX", f) + monkeypatch.setattr(th, "_CGROUP_V1_QUOTA", tmp_path / "nope_quota") + monkeypatch.setattr(th, "_CGROUP_V1_PERIOD", tmp_path / "nope_period") + assert th._cgroup_cpu_quota() == math.ceil(1530000 / 100000) # 16 + + +def test_cgroup_quota_v2_max_is_none(monkeypatch, tmp_path): + f = tmp_path / "cpu.max" + f.write_text("max 100000\n") # unlimited + monkeypatch.setattr(th, "_CGROUP_V2_CPU_MAX", f) + monkeypatch.setattr(th, "_CGROUP_V1_QUOTA", tmp_path / "nope_quota") + monkeypatch.setattr(th, "_CGROUP_V1_PERIOD", tmp_path / "nope_period") + assert th._cgroup_cpu_quota() is None + + +def test_cgroup_quota_v1_fallback(monkeypatch, tmp_path): + q = tmp_path / "cfs_quota_us" + p = tmp_path / "cfs_period_us" + q.write_text("800000\n") + p.write_text("100000\n") + monkeypatch.setattr(th, "_CGROUP_V2_CPU_MAX", tmp_path / "absent") + monkeypatch.setattr(th, "_CGROUP_V1_QUOTA", q) + monkeypatch.setattr(th, "_CGROUP_V1_PERIOD", p) + assert th._cgroup_cpu_quota() == 8 + + +def test_cgroup_quota_none_when_absent(monkeypatch, tmp_path): + monkeypatch.setattr(th, "_CGROUP_V2_CPU_MAX", tmp_path / "absent1") + monkeypatch.setattr(th, "_CGROUP_V1_QUOTA", tmp_path / "absent2") + monkeypatch.setattr(th, "_CGROUP_V1_PERIOD", tmp_path / "absent3") + assert th._cgroup_cpu_quota() is None + + +def test_detect_cpus_takes_min_of_affinity_and_quota(monkeypatch, tmp_path): + _constrain_detected_cpus(monkeypatch, 16) # affinity reports 16 + monkeypatch.setattr(th, "_cgroup_cpu_quota", lambda: 15) + assert th._detect_cpus() == 15 + + +def test_detect_cpus_ignores_quota_when_none(monkeypatch): + _constrain_detected_cpus(monkeypatch, 16) + monkeypatch.setattr(th, "_cgroup_cpu_quota", lambda: None) + assert th._detect_cpus() == 16 +``` + +- [ ] **Step 2: Run to verify failure** + +Run: `pixi run -e dev pytest tests/unit/test_threads.py -k cgroup or detect_cpus -v` +Expected: FAIL — `_cgroup_cpu_quota` / module path constants not defined. + +- [ ] **Step 3: Implement** + +In `python/genvarloader/_threads.py`, add imports and module-level path constants near the top: + +```python +import math +from pathlib import Path + +# cgroup CPU-quota files (module-level so tests can repoint them). +_CGROUP_V2_CPU_MAX = Path("/sys/fs/cgroup/cpu.max") +_CGROUP_V1_QUOTA = Path("/sys/fs/cgroup/cpu/cpu.cfs_quota_us") +_CGROUP_V1_PERIOD = Path("/sys/fs/cgroup/cpu/cpu.cfs_period_us") + + +def _read_int(path: Path) -> int | None: + try: + return int(path.read_text().strip()) + except (OSError, ValueError): + return None + + +def _cgroup_cpu_quota() -> int | None: + """Effective CPU count implied by a CFS quota, or None if unlimited/unreadable. + + A CFS *quota* (cpu.max / cpu.cfs_quota_us) is invisible to + sched_getaffinity, so a 15.3-core container still reports 16 cores by + affinity. See issue #263. + """ + # cgroup v2: " " or "max ". + try: + raw = _CGROUP_V2_CPU_MAX.read_text().split() + except OSError: + raw = None + if raw and len(raw) == 2: + quota_s, period_s = raw + if quota_s != "max": + try: + quota, period = int(quota_s), int(period_s) + except ValueError: + quota = period = 0 + if quota > 0 and period > 0: + return max(1, math.ceil(quota / period)) + else: + return None # explicitly unlimited + + # cgroup v1 fallback. + quota = _read_int(_CGROUP_V1_QUOTA) + period = _read_int(_CGROUP_V1_PERIOD) + if quota is not None and quota > 0 and period: + return max(1, math.ceil(quota / period)) + return None +``` + +Replace `_detect_cpus`: + +```python +def _detect_cpus() -> int: + try: + affinity = max(1, len(os.sched_getaffinity(0))) # respects cgroup cpuset (Linux) + except AttributeError: + affinity = max(1, os.cpu_count() or 1) + quota = _cgroup_cpu_quota() + if quota is not None: + return max(1, min(affinity, quota)) + return affinity +``` + +- [ ] **Step 4: Run to verify pass** + +Run: `pixi run -e dev pytest tests/unit/test_threads.py -k "cgroup or detect_cpus" -v` +Expected: PASS. + +- [ ] **Step 5: Commit** + +```bash +git add python/genvarloader/_threads.py tests/unit/test_threads.py +git commit -m "fix(threads): honor CFS cpu quota in CPU detection (#263)" +``` + +--- + +## Task 3: Overwrite `RAYON_NUM_THREADS` in `cap_threads()` + +**Files:** +- Modify: `python/genvarloader/_threads.py` +- Test: `tests/unit/test_threads.py` + +**Interfaces:** +- Consumes: `_resolve_num_threads()` (existing), `_detect_cpus()` (Task 2). +- Produces: `cap_threads()` sets `os.environ["RAYON_NUM_THREADS"]` unconditionally (overwrite, not `setdefault`). + +- [ ] **Step 1: Write the failing test** + +Append to `tests/unit/test_threads.py`: + +```python +def test_cap_threads_overwrites_ambient_rayon(monkeypatch): + # An ambient RAYON_NUM_THREADS (base image) must NOT win over GVL's count. + monkeypatch.setenv("RAYON_NUM_THREADS", "16") + monkeypatch.setattr(th, "_NUM_THREADS", None) + monkeypatch.setenv("GVL_NUM_THREADS", "4") + n = th.cap_threads() + assert n == 4 + assert os.environ["RAYON_NUM_THREADS"] == "4" + + +def test_cap_threads_sets_when_unset(monkeypatch): + monkeypatch.delenv("RAYON_NUM_THREADS", raising=False) + monkeypatch.setattr(th, "_NUM_THREADS", None) + monkeypatch.setenv("GVL_NUM_THREADS", "3") + th.cap_threads() + assert os.environ["RAYON_NUM_THREADS"] == "3" +``` + +- [ ] **Step 2: Run to verify failure** + +Run: `pixi run -e dev pytest tests/unit/test_threads.py -k cap_threads -v` +Expected: FAIL on `test_cap_threads_overwrites_ambient_rayon` — `setdefault` leaves `"16"`. + +- [ ] **Step 3: Implement** + +In `python/genvarloader/_threads.py`, replace the assignment in `cap_threads`: + +```python +def cap_threads() -> int: + """Resolve worker count once and pin rayon's pool via RAYON_NUM_THREADS. + + Overwrites any ambient RAYON_NUM_THREADS: an inherited value (e.g. from a + base image) must not defeat GVL's cgroup-aware cap (issue #263). Users who + want explicit control set GVL_NUM_THREADS. Must run before the first rust + parallel call (rayon reads RAYON_NUM_THREADS at global-pool init). + Idempotent. + """ + global _NUM_THREADS + if _NUM_THREADS is None: + _NUM_THREADS = _resolve_num_threads() + os.environ["RAYON_NUM_THREADS"] = str(_NUM_THREADS) + return _NUM_THREADS +``` + +- [ ] **Step 4: Run to verify pass** + +Run: `pixi run -e dev pytest tests/unit/test_threads.py -v` +Expected: PASS (whole module, including Task 1 & 2 tests). + +- [ ] **Step 5: Update module docstring & commit** + +In `python/genvarloader/_threads.py`, extend the module docstring to mention the two env knobs: `GVL_NUM_THREADS` (explicit worker count) and `GVL_FORCE_PARALLEL` (bypass the size gate), and that `RAYON_NUM_THREADS` is overwritten with GVL's resolved count. + +```bash +git add python/genvarloader/_threads.py tests/unit/test_threads.py +git commit -m "fix(threads): overwrite ambient RAYON_NUM_THREADS with resolved cap (#263)" +``` + +--- + +## Task 4: Fix pre-existing pyrefly `unbound-name` in parity tests + +**Files:** +- Modify: `tests/parity/test_gen_dataset_goldens.py` +- Modify: `tests/parity/test_variants_dataset_parity.py` + +**Interfaces:** none (test-only correctness fix). + +Root cause: a variable assigned in a `try` whose `except` calls `pytest.skip(...)`; pyrefly does not model `pytest.skip` as `NoReturn`, so it treats the variable as possibly-unbound at later use. Fix: make each skipping `except` branch visibly terminate with a trailing `raise` (unreachable at runtime — `skip` raises — but statically `NoReturn`). + +- [ ] **Step 1: Confirm the current failure** + +Run: `pixi run -e dev typecheck` +Expected: FAIL — 4 `unbound-name` errors (`ds`, `out_numba`, `golden`) in the two files. + +- [ ] **Step 2: Edit `test_gen_dataset_goldens.py`** + +In `test_gen_variants_af`, add a trailing `raise` after each `pytest.skip(...)` inside an `except`: + +```python + try: + ds = ds_base.with_seqs("variants").with_settings(min_af=0.1, max_af=0.9) + except Exception as e: + pytest.skip(f"AF filtering unavailable: {e}") + raise # unreachable (skip raises); tells pyrefly this branch is NoReturn + try: + monkeypatch.setenv("GVL_BACKEND", "numba") + out_numba = ds[:, :] + except KeyError as e: + pytest.skip(f"AF key missing: {e}") + raise # unreachable; NoReturn marker for pyrefly +``` + +- [ ] **Step 3: Edit `test_variants_dataset_parity.py`** + +Apply the same trailing-`raise` fix to both skipping `except` branches (the `with_seqs(...).with_settings(...)` block and the `load_flat_golden(...)` `FileNotFoundError` block): + +```python + try: + ds = ds_base.with_seqs("variants").with_settings(min_af=0.1, max_af=0.9) + except Exception as e: + pytest.skip( + f"AF filtering unavailable on this dataset — skipping compact_keep " + f"exercise ({type(e).__name__}: {e})" + ) + raise # unreachable; NoReturn marker for pyrefly +``` + +```python + try: + golden = _golden.load_flat_golden("ds_variants_af") + except FileNotFoundError: + pytest.skip("ds_variants_af golden not generated (AF unavailable at gen time)") + raise # unreachable; NoReturn marker for pyrefly +``` + +- [ ] **Step 4: Verify typecheck is clean** + +Run: `pixi run -e dev typecheck` +Expected: PASS — zero errors. + +- [ ] **Step 5: Commit (hook must pass without --no-verify)** + +```bash +git add tests/parity/test_gen_dataset_goldens.py tests/parity/test_variants_dataset_parity.py +git commit -m "test(parity): mark pytest.skip except-branches NoReturn for pyrefly" +``` + +Expected: the pre-commit `pyrefly-check` hook passes (no `--no-verify`). + +--- + +## Task 5: Release the GIL around the rayon FFI entry points + +**Files:** +- Modify: `src/ffi/mod.rs` +- Test: `tests/parity/test_rayon_equivalence.py` (existing — used as the correctness gate, no edits) + +**Interfaces:** +- The 11 parallel entry points keep their Python-visible signatures unchanged. Functions lacking a `py` token gain `py: Python<'py>` as the **first** parameter (PyO3 injects it; invisible to Python callers). +- The rayon closures touch **no Python** (verified: the load path is lock-free, operates only on `ndarray` views), so running them under `allow_threads` is sound. + +**The 11 entry points** (by `pub fn`): `get_diffs_sparse`, `intervals_to_tracks`, `reconstruct_haplotypes_from_sparse`, `reconstruct_haplotypes_fused`, `reconstruct_haplotypes_spliced_fused`, `reconstruct_annotated_haplotypes_spliced_fused`, `reconstruct_annotated_haplotypes_fused`, `get_reference`, `shift_and_realign_tracks_sparse`, `tracks_to_intervals`, `intervals_and_realign_track_fused`. + +Rule: resolve every `PyReadonly*/PyReadwrite*` guard to an `ndarray` view (`.as_array()` / `.as_array_mut()`) in locals **before** the closure; capture only the views (they are `Ungil`); keep the guards in the outer scope; run every `into_pyarray(py)` **after** the closure. Two shape templates follow. + +- [ ] **Step 1: Baseline — confirm parity is green before the change** + +Run: +```bash +pixi run -e dev maturin develop --release +pixi run -e dev pytest tests/parity/test_rayon_equivalence.py -v +``` +Expected: PASS (byte-identical serial==parallel==golden). This is the invariant Task 5 must preserve. + +- [ ] **Step 2: Template A — in-place / no-return kernels (add `py`, wrap core)** + +For `intervals_to_tracks` (currently no `py`), transform to: + +```rust +pub fn intervals_to_tracks( + py: Python<'_>, + offset_idxs: PyReadonlyArray1, + starts: PyReadonlyArray1, + itv_starts: PyReadonlyArray1, + itv_ends: PyReadonlyArray1, + itv_values: PyReadonlyArray1, + itv_offsets: PyReadonlyArray1, + mut out: PyReadwriteArray1, + out_offsets: PyReadonlyArray1, + parallel: bool, +) { + let offset_idxs_a = offset_idxs.as_array(); + let starts_a = starts.as_array(); + let itv_starts_a = itv_starts.as_array(); + let itv_ends_a = itv_ends.as_array(); + let itv_values_a = itv_values.as_array(); + let itv_offsets_a = itv_offsets.as_array(); + let out_a = out.as_array_mut(); + let out_offsets_a = out_offsets.as_array(); + py.allow_threads(move || { + intervals::intervals_to_tracks( + offset_idxs_a, starts_a, itv_starts_a, itv_ends_a, itv_values_a, + itv_offsets_a, out_a, out_offsets_a, parallel, + ); + }); +} +``` + +Apply the same shape to: `reconstruct_haplotypes_from_sparse`, `shift_and_realign_tracks_sparse`, `intervals_and_realign_track_fused` (each gains `py: Python<'_>` as the first param; move all `.as_array()/.as_array_mut()` above a `py.allow_threads(move || { })`). + +- [ ] **Step 3: Template B — functions that already have `py` and return arrays** + +For these, run the compute into **owned** Rust arrays inside `allow_threads`, then `into_pyarray(py)` outside. Example for `reconstruct_haplotypes_fused` (the body from `get_diffs_sparse` through the optional RC is all Python-free): + +```rust + // ... resolve all `.as_array()` views into locals (as today) ... + let (out_data, out_offsets_vec) = py.allow_threads(move || { + // Steps 1–4b unchanged, but returning owned arrays instead of calling into_pyarray: + // let diffs = genotypes::get_diffs_sparse(...); + // ... offset loop building out_offsets_vec ... + // let mut out_data = uninit_output(total); + // reconstruct::reconstruct_haplotypes_from_sparse(out_data.view_mut(), ...); + // if let Some(to_rc) = ... { rc_flat_rows_inplace(...); } + (out_data, out_offsets_vec) + }); + (out_data.into_pyarray(py), out_offsets_vec.into_pyarray(py)) +``` + +Apply the same restructuring (compute-in-closure, `into_pyarray` after) to: `get_diffs_sparse`, `reconstruct_haplotypes_spliced_fused`, `reconstruct_annotated_haplotypes_spliced_fused`, `reconstruct_annotated_haplotypes_fused`, `get_reference`, `tracks_to_intervals`. Keep the `geno_offsets.as_array()` + `.row(0)/.row(1)` extraction outside the closure and pass the resulting `ArrayView`s in. + +- [ ] **Step 4: Compile & iterate** + +Run: `pixi run -e dev cargo build --release 2>&1 | rtk err` +Expected: clean build. Common fixes if it errors: +- "closure may outlive / not `Ungil`": ensure no `PyReadonlyArray`/`PyReadwriteArray` guard and no `py` token is captured — only extracted views and POD locals. +- "cannot move out of `out` because borrowed": use `let out_a = out.as_array_mut();` then `move` the view (not `out`) into the closure. +- borrow conflicts on `geno_offsets`: bind `let go = geno_offsets.as_array();` and derive `go.row(0)/go.row(1)` into locals before the closure. + +- [ ] **Step 5: Rebuild and run the parity gate** + +Run: +```bash +pixi run -e dev maturin develop --release +pixi run -e dev pytest tests/parity/test_rayon_equivalence.py -v +pixi run -e dev cargo-test +``` +Expected: PASS — serial==parallel==golden still byte-identical; Rust unit tests green. + +- [ ] **Step 6: Commit** + +```bash +git add src/ffi/mod.rs +git commit -m "perf(ffi): release the GIL around rayon parallel regions" +``` + +--- + +## Task 6: Forced-parallel end-to-end equivalence test + +**Files:** +- Create: `tests/integration/test_rayon_forced_parallel.py` + +**Interfaces:** +- Consumes: `GVL_FORCE_PARALLEL` (Task 1); the rebuilt extension (Task 5); the `snap_dataset` fixture is in `tests/dataset/conftest.py` — this test lives under `tests/integration/`, so it builds its own dataset from the shared session fixtures (`source_bed`, `vcf_dir`, `reference` from `tests/conftest.py`). + +- [ ] **Step 1: Write the test** + +Create `tests/integration/test_rayon_forced_parallel.py`: + +```python +"""Forced-parallel dispatch must be byte-identical to the serial size-gated path. + +GVL_FORCE_PARALLEL=1 makes should_parallelize() return True, so dataset[...] +runs the real rayon path end-to-end on the small test corpus — coverage the +tiny-golden parity suite cannot reach. +""" + +from __future__ import annotations + +from pathlib import Path + +import numpy as np +import pyBigWig +import pytest +from genoray import VCF + +import genvarloader as gvl + + +@pytest.fixture() +def variant_track_dataset(source_bed, vcf_dir, reference, tmp_path: Path): + """A haplotypes+track dataset whose getitem hits reconstruct + intervals_to_tracks.""" + contig_sizes = [("chr1", 2_000_000), ("chr2", 2_000_000)] + bw_paths: dict[str, str] = {} + for i, sample in enumerate(["s0", "s1", "s2"]): + bw_path = tmp_path / f"{sample}.bw" + with pyBigWig.open(str(bw_path), "w") as bw: + bw.addHeader(contig_sizes, maxZooms=0) + value = float(i + 1) + bw.addEntries( + ["chr1", "chr1", "chr2", "chr2"], + [499_990, 1_010_686, 17_320, 1_234_560], + ends=[500_030, 1_010_706, 17_340, 1_234_580], + values=[value, value, value, value], + ) + bw_paths[sample] = str(bw_path) + out = tmp_path / "ds.gvl" + gvl.write( + path=out, + bed=source_bed, + variants=VCF(vcf_dir / "filtered_source.vcf.gz"), + tracks=gvl.BigWigs("5ss", bw_paths), + max_jitter=2, + ) + return out + + +def _materialize(ds): + """Reduce a getitem result to a list of numpy arrays for comparison.""" + out = ds[:, :] + items = out if isinstance(out, tuple) else (out,) + arrays = [] + for it in items: + # Ragged-like objects expose .data; dense are ndarrays already. + arrays.append(np.asarray(getattr(it, "data", it))) + return arrays + + +@pytest.mark.parametrize("seq_kind", ["haplotypes", "variants"]) +def test_forced_parallel_matches_serial(variant_track_dataset, reference, monkeypatch, seq_kind): + open_ds = lambda: gvl.Dataset.open(variant_track_dataset, reference=reference).with_seqs(seq_kind) + + monkeypatch.delenv("GVL_FORCE_PARALLEL", raising=False) + serial = _materialize(open_ds()) + + monkeypatch.setenv("GVL_FORCE_PARALLEL", "1") + parallel = _materialize(open_ds()) + + assert len(serial) == len(parallel) + for s, p in zip(serial, parallel): + np.testing.assert_array_equal(s, p) +``` + +- [ ] **Step 2: Rebuild the extension (if not already current for Task 5)** + +Run: `pixi run -e dev maturin develop --release` + +- [ ] **Step 3: Run the test** + +Run: `pixi run -e dev pytest tests/integration/test_rayon_forced_parallel.py -v` +Expected: PASS. If `.with_seqs("variants")` is unsupported for this fixture, the assertion still runs on the haplotypes parametrization; if a parametrization errors on dataset construction, narrow `seq_kind` to `["haplotypes"]` and note it in the test docstring. + +- [ ] **Step 4: Commit** + +```bash +git add tests/integration/test_rayon_forced_parallel.py +git commit -m "test(integration): forced-parallel getitem == serial (end-to-end)" +``` + +--- + +## Task 7: Spawn-worker stress reproducer + +**Files:** +- Create: `tests/integration/test_rayon_stress.py` + +**Interfaces:** +- Consumes: `GVL_FORCE_PARALLEL` (Task 1); the rebuilt extension (Task 5); the `variant_track_dataset`-style build (repeated here — workers open by path). +- A module-level worker function (picklable under `spawn`) that opens the dataset by path and iterates it. + +- [ ] **Step 1: Write the stress test** + +Create `tests/integration/test_rayon_stress.py`: + +```python +"""Stress reproducer for issue #263: concurrent spawn workers iterating a +Dataset under forced-parallel + oversubscribed rayon must not deadlock. + +A hang (futex-parked workers, per #263) surfaces as future.result(timeout=...) +raising TimeoutError → test failure. Clean completion across repeated launches +is evidence the cause was oversubscription (fixed by the cap_threads changes). +""" + +from __future__ import annotations + +import multiprocessing as mp +import os +from concurrent.futures import ProcessPoolExecutor, TimeoutError as FTimeoutError +from pathlib import Path + +import numpy as np +import pyBigWig +import pytest +from genoray import VCF + +import genvarloader as gvl + +pytestmark = pytest.mark.slow + +N_WORKERS = 5 +ITERS_PER_WORKER = 40 +LAUNCHES = 4 +PER_LAUNCH_TIMEOUT_S = 120 + + +def _iterate_dataset(ds_path: str, reference_path: str, iters: int) -> int: + """Worker body (must be importable/picklable for spawn). Returns bytes touched.""" + # Force the parallel path and oversubscribe: many rayon threads per worker. + os.environ["GVL_FORCE_PARALLEL"] = "1" + os.environ["RAYON_NUM_THREADS"] = "8" + ds = gvl.Dataset.open(Path(ds_path), reference=Path(reference_path)).with_seqs("haplotypes") + total = 0 + n = len(ds) + for _ in range(iters): + out = ds[:n, :] + first = out[0] if isinstance(out, tuple) else out + total += int(np.asarray(getattr(first, "data", first)).size) + return total + + +@pytest.fixture() +def stress_dataset(source_bed, vcf_dir, reference, tmp_path: Path) -> tuple[Path, Path]: + contig_sizes = [("chr1", 2_000_000), ("chr2", 2_000_000)] + bw_paths: dict[str, str] = {} + for i, sample in enumerate(["s0", "s1", "s2"]): + bw_path = tmp_path / f"{sample}.bw" + with pyBigWig.open(str(bw_path), "w") as bw: + bw.addHeader(contig_sizes, maxZooms=0) + v = float(i + 1) + bw.addEntries(["chr1", "chr2"], [499_990, 17_320], + ends=[500_030, 17_340], values=[v, v]) + bw_paths[sample] = str(bw_path) + out = tmp_path / "stress.gvl" + gvl.write( + path=out, bed=source_bed, variants=VCF(vcf_dir / "filtered_source.vcf.gz"), + tracks=gvl.BigWigs("5ss", bw_paths), max_jitter=2, + ) + return out, Path(reference) + + +def test_concurrent_spawn_workers_do_not_deadlock(stress_dataset): + ds_path, ref_path = stress_dataset + ctx = mp.get_context("spawn") + for launch in range(LAUNCHES): + with ProcessPoolExecutor(max_workers=N_WORKERS, mp_context=ctx) as ex: + futs = [ + ex.submit(_iterate_dataset, str(ds_path), str(ref_path), ITERS_PER_WORKER) + for _ in range(N_WORKERS) + ] + try: + results = [f.result(timeout=PER_LAUNCH_TIMEOUT_S) for f in futs] + except FTimeoutError: + pytest.fail( + f"launch {launch}: worker did not finish within " + f"{PER_LAUNCH_TIMEOUT_S}s — likely the #263 rayon deadlock." + ) + assert all(r > 0 for r in results) +``` + +- [ ] **Step 2: Rebuild (ensure current) and run** + +Run: +```bash +pixi run -e dev maturin develop --release +pixi run -e dev pytest tests/integration/test_rayon_stress.py -v -m slow +``` +Expected: PASS (all launches complete under timeout). If it HANGS/fails → the #263 deadlock reproduced under test: **stop and escalate to Task 8** (do not weaken the test to make it pass). + +- [ ] **Step 3: Tune only if flaky-slow (not hanging)** + +If workers complete but the box is too slow for the timeout under CI load, reduce `ITERS_PER_WORKER`/`LAUNCHES` or raise `PER_LAUNCH_TIMEOUT_S` — never remove the timeout (it is the deadlock detector). Keep `N_WORKERS × RAYON_NUM_THREADS` comfortably above the core count so oversubscription is still exercised. + +- [ ] **Step 4: Commit** + +```bash +git add tests/integration/test_rayon_stress.py +git commit -m "test(integration): spawn-worker rayon deadlock stress reproducer (#263)" +``` + +--- + +## Task 8 (contingent): root-cause fix if the stress test reproduces a hang + +**Only if Task 7 reproduces a deadlock after Tasks 1–6.** Use superpowers:systematic-debugging. Given the lock-free audit, investigate in this order and add the minimal fix + a regression assertion: +1. A dependency's rayon usage on the shared global pool during the read path (e.g. `bigtools` track decode) — capture a native backtrace of a parked worker (`py-spy dump --native` / `gdb` on the hung PID; on macOS hand David a script per the profiling-handoff note). +2. rayon global-pool init races under contention — try an explicit, once-only `ThreadPoolBuilder::num_threads(cap).build_global()` seeded from `RAYON_NUM_THREADS` at import. + +Do not write speculative code without a captured backtrace pinning the wait site. If Task 7 stays green across many launches, **skip this task** and record in the PR that oversubscription (fixed in Tasks 2–3) was the driver. + +--- + +## Final integration gate (before PR) + +- [ ] Rebuild: `pixi run -e dev maturin develop --release` +- [ ] Rust: `pixi run -e dev cargo-test` +- [ ] Lint/format: `pixi run -e dev ruff check python/ tests/ && pixi run -e dev ruff format --check python/ tests/` +- [ ] Types: `pixi run -e dev typecheck` (zero errors; hook passes without `--no-verify`) +- [ ] Full tree: `pixi run -e dev pytest tests -q` (covers dataset + unit + parity; shared `_threads.py` change) +- [ ] Slow tier: `pixi run -e dev pytest tests/integration/test_rayon_stress.py -m slow -q` +- [ ] Confirm no `skills/genvarloader/SKILL.md` change needed (env-only knobs, no `__all__` change). +``` diff --git a/docs/superpowers/specs/2026-06-30-rayon-multithread-verification-design.md b/docs/superpowers/specs/2026-06-30-rayon-multithread-verification-design.md new file mode 100644 index 00000000..edc53776 --- /dev/null +++ b/docs/superpowers/specs/2026-06-30-rayon-multithread-verification-design.md @@ -0,0 +1,137 @@ +# Rayon multithread verification harness + thread-cap fix + +**Date:** 2026-06-30 +**Issue:** [#263](https://github.com/mcvickerlab/GenVarLoader/issues/263) — Nondeterministic rayon deadlock when iterating `Dataset` from concurrent spawn worker processes (v0.36.0) +**Status:** design approved, awaiting spec review + +## Problem + +`genvarloader==0.36.0` introduced rayon parallelism into the Rust load path. In production, iterating a `Dataset` concurrently from multiple `spawn`-ed worker processes **intermittently hard-deadlocks** (threads parked on a futex, load ≈ 1.0, 0% GPU). It is nondeterministic and aggravated by CPU oversubscription. Stable only at `tasks_per_gpu=1`. + +The parallel paths are **not exercised under realistic multithreaded load by the test suite**: + +- The end-to-end read tests never cross the `should_parallelize(total_bytes)` byte gate on the small test corpus, so the parallel branch is never taken through the public API. +- `tests/parity/test_rayon_equivalence.py` *does* force `parallel=True` at the FFI boundary, but on tiny golden inputs — so rayon runs effectively single-threaded, with no concurrency and no contention. It proves *logical* serial==parallel equivalence, not *deadlock freedom under load*. + +## Key finding from the code audit + +**gvl's entire Rust load path is lock-free.** A search across `src/**/*.rs` finds **zero** `Mutex`, `RwLock`, `OnceCell`, `lazy_static`, or `Lazy`, and no nested rayon `install`/`broadcast`: + +- `src/intervals.rs`, `src/reconstruct*` and the getitem kernels build **disjoint** per-query output slices via the `split_at_mut` cursor idiom, then `into_par_iter().for_each(...)`. No shared mutable state, no locks. +- `src/bigwig.rs::write_track` uses a `thread_local!` reader cache (not a shared lock) and an unconditional `par_iter` — and is on the **write** path, not the reported read/iterate path. + +Consequence: issue #263's "audit for a lock-across-parallel-region / nested-install / condvar-in-worker" almost certainly comes up **empty inside gvl's own code**. Combined with #263's evidence (spawn → independent per-process pools, nondeterminism, `N×16` threads on a ~15-core cgroup, stable only when unsubscribed), the dominant driver is **CPU oversubscription** that `cap_threads()` fails to prevent, plus the GIL being held across every rayon region. + +Every rayon call today runs **with the GIL held** — there is no `py.allow_threads` anywhere in `src/`. + +## Goals + +1. Make the genuinely-multithreaded paths runnable and verified on the normal corpus (the "disable size thresholding" ask), permanently. +2. Fix the two concrete `cap_threads()` bugs that cause oversubscription (#263 ask #2). +3. Release the GIL around the rayon FFI calls, defensively. +4. Ship a multiprocess stress reproducer that either reproduces the hang (proving it is gvl-triggerable) or provides evidence the cause was oversubscription (now fixed). + +Non-goals: rewriting the getitem dispatch, changing the on-disk format, or touching the write-path `par_iter`. + +## Design + +One bundled PR. Workstreams 1–5 ship unconditionally; #6 (root-cause fix beyond `allow_threads`) is **contingent** on the harness reproducing a hang. + +### 1. Force-parallel override — `python/genvarloader/_threads.py` + +Single chokepoint. Add a `GVL_FORCE_PARALLEL` env var read by `should_parallelize()`: + +```python +def should_parallelize(total_bytes: int) -> bool: + if _force_parallel(): # GVL_FORCE_PARALLEL truthy → always parallel + return True + return total_bytes >= num_threads() * _MIN_BYTES_PER_THREAD +``` + +- Truthy = `{"1", "true", "yes", "on"}` (case-insensitive), matching common env conventions. +- Permanent, documented mechanism (not a throwaway monkeypatch). It is what the regression test flips, and a support lever for reproducing customer issues. +- Symmetry with a force-serial toggle is **out of scope** (YAGNI); the size gate already yields serial by default on small inputs. + +### 2. `cap_threads()` correctness fix — `python/genvarloader/_threads.py` + +Two bugs, both named in #263: + +**(a) `setdefault` → overwrite.** An ambient `RAYON_NUM_THREADS=16` (base image) currently wins in spawn workers, so `cap_threads()` never caps. Change to assign directly: + +```python +os.environ["RAYON_NUM_THREADS"] = str(_NUM_THREADS) # GVL's resolved count wins +``` + +GVL's resolved count wins; users steer explicitly via `GVL_NUM_THREADS`. Still runs before the first rust parallel call, so it takes effect at global-pool init. (Overwriting *after* pool init is a no-op — rayon reads the env var once — so the existing "must run first" contract is unchanged and still documented.) + +**(b) CFS-quota-aware CPU detection.** `_detect_cpus()` uses `sched_getaffinity`, which ignores a CFS *quota* (reports 16 on a 15.3-core budget). Add cgroup quota parsing: + +- cgroup v2: read `/sys/fs/cgroup/cpu.max` → `" "`; if quota is not `"max"`, `quota_cpus = ceil(quota / period)`. +- cgroup v1 fallback: `/sys/fs/cgroup/cpu/cpu.cfs_quota_us` and `cpu.cfs_period_us`; quota `> 0` ⇒ `ceil(quota / period)`. +- Effective count = `max(1, min(affinity_count, quota_cpus))` when a quota is present, else the affinity count. +- All file reads are wrapped so a missing/unreadable cgroup file (non-Linux, cgroup-less) falls back to affinity detection. No hard dependency on cgroup layout. + +Extract the cgroup-quota probe into a small helper (`_cgroup_cpu_quota() -> int | None`) so it is unit-testable by pointing it at fixture files / monkeypatched readers. + +### 3. GIL release around rayon FFI — `src/ffi/mod.rs` + +Wrap each parallel-capable FFI call's Rust core in `py.allow_threads(|| ...)` so the GIL is dropped for the duration of the rayon region. Applies to the parallel entry points: `reconstruct_haplotypes_from_sparse`, `reconstruct_haplotypes_fused`, `shift_and_realign_tracks_sparse`, `get_diffs_sparse`, `intervals_to_tracks`, `tracks_to_intervals`. + +Correctness constraints (must hold, verified by the parity suite staying green): + +- The rayon closures touch **no Python** — confirmed by the lock-free audit; they operate purely on `ndarray` views. Safe to run without the GIL. +- `PyReadonlyArray` / `PyReadwriteArray` borrows are resolved to `ndarray` views **before** entering `allow_threads`, and no Python code runs inside, so numpy buffers are not mutated underneath us. Standard PyO3 pattern. +- Functions that currently take `py: Python` already have the token; those that don't gain a `py: Python` parameter (PyO3 injects it, signature-transparent to Python callers). + +This removes "GIL held across the parallel region" as a variable and lets other Python threads progress during the rust compute — a defensive fix independent of whether the harness reproduces a hang. + +### 4. Multiprocess stress reproducer + forced-parallel equivalence — `tests/` + +**(a) Stress reproducer** (`tests/integration/test_rayon_stress.py`, marked `slow`). Mirrors #263: + +- `multiprocessing.get_context("spawn")` + `concurrent.futures.ProcessPoolExecutor` with N workers. +- Each worker opens its own `Dataset` and iterates it many times. +- Run with `GVL_FORCE_PARALLEL=1` and a per-worker `RAYON_NUM_THREADS` chosen so that `n_workers × threads` comfortably exceeds the available cores — reproducing the #263 oversubscription condition that widens the race window. +- Wrap `future.result(timeout=...)` in a wall-clock timeout. **Hang → `TimeoutError` → test fails = deadlock reproduced.** Clean completion across repeated launches = evidence the cause was oversubscription. +- Parameterize a couple of `(n_workers, launches)` points; keep total runtime bounded so it fits the `slow` tier. + +**(b) Forced-parallel equivalence** (unit/integration, default tier). With `GVL_FORCE_PARALLEL=1`, assert `dataset[...]` output is byte-identical to the serial result (`GVL_FORCE_PARALLEL` unset / small input) across the output modes (haplotypes, tracks, annotated, variants). This reaches what the tiny-golden parity test cannot: real end-to-end parallel dispatch through the public API on the actual corpus. + +**(c) `cap_threads` unit tests** (`tests/unit/test_threads.py`, extend). Cover: `GVL_FORCE_PARALLEL` truthy/falsy parsing; `RAYON_NUM_THREADS` is **overwritten** not preserved; `_cgroup_cpu_quota()` parses v2 `cpu.max` (quota and `"max"`), v1 quota/period, and returns `None` on missing files; effective count = `min(affinity, quota)`. + +### 5. Pre-existing pyrefly hook failures — `tests/parity/` + +The pre-commit `pyrefly` hook currently fails on `unbound-name` errors that predate this work and block every commit: + +- `tests/parity/test_gen_dataset_goldens.py:265,270` — `ds`, `out_numba` +- `tests/parity/test_variants_dataset_parity.py:101,115` — `ds`, `golden` + +Root cause: a variable is assigned inside a `try` whose `except` branch calls `pytest.skip(...)`. pyrefly does not model `pytest.skip` as `NoReturn`, so it treats the `except` path as falling through, leaving the variable possibly-unbound at its later use. + +Fix by making each skipping `except` branch **visibly terminate** so flow analysis sees the variable as bound — e.g. follow `pytest.skip(...)` with a bare `raise` (unreachable at runtime, since `skip` raises; statically marks the branch `NoReturn`). **No inline `# pyrefly: ignore`** — per the env-drift gotcha, inline ignores that become unused in some environments break CI via `unused-ignore`. Verify with `pixi run -e dev typecheck` reporting zero errors, and confirm the pre-commit hook passes without `--no-verify`. + +### 6. Contingent: root-cause fix beyond `allow_threads` + +Only if the stress harness still reproduces a hang **after** the cap fix and `allow_threads`. Driven by systematic-debugging on the reproducer. Suspect order, given the lock-free audit: (a) a dependency's rayon usage on the shared global pool (e.g. bigtools during track reads); (b) rayon global-pool init races under contention. Left as a spec-level placeholder — no speculative code without harness evidence. + +## Verification + +Rust rebuilt (`maturin develop --release`) before any Python test that imports the extension, per the repo's stale-binary caveat. + +- `pixi run -e dev cargo-test` — Rust unit tests (compiles from source). +- `pixi run -e dev pytest tests/parity/test_rayon_equivalence.py -q` — serial==parallel==golden stays byte-identical after `allow_threads`. +- `pixi run -e dev pytest tests/unit/test_threads.py -q` — cap/quota/force-parallel unit tests. +- `pixi run -e dev pytest tests/integration/test_rayon_stress.py -q` — stress reproducer (slow tier; run explicitly). +- `pixi run -e dev typecheck` — zero pyrefly errors; the pre-commit hook passes **without** `--no-verify` (confirms the parity `unbound-name` fixes). +- Full tree `pixi run -e dev pytest tests -q` + `ruff check python/ tests/` before push (shared-code change touches `_threads.py`). + +## Risks & mitigations + +- **`allow_threads` correctness** — mitigated by the lock-free audit (closures touch no Python) and the parity suite remaining byte-identical. If any closure is later found to touch Python, that call stays GIL-held. +- **Overwriting a user's `RAYON_NUM_THREADS`** — intentional per #263; `GVL_NUM_THREADS` is the documented user knob. Documented in the `_threads.py` module docstring. +- **Stress test flakiness / runtime** — bounded launches, `slow` marker, generous timeout; the test asserts *completion*, not timing, so it is not perf-sensitive. +- **Harness fails to reproduce** — that is itself a result: it makes oversubscription the leading explanation and the cap fix the resolution. Recorded in the PR. + +## Public API / skill impact + +`GVL_FORCE_PARALLEL` and the `RAYON_NUM_THREADS`-overwrite behavior are env-level knobs, not `__all__` symbols — no `skills/genvarloader/SKILL.md` public-API change. Document both env vars in the `_threads.py` module docstring; mention `GVL_NUM_THREADS` / `GVL_FORCE_PARALLEL` in user-facing docs if a threading section exists. diff --git a/python/genvarloader/_threads.py b/python/genvarloader/_threads.py index 48d255d9..cdbebcc7 100644 --- a/python/genvarloader/_threads.py +++ b/python/genvarloader/_threads.py @@ -5,21 +5,88 @@ rayon's global pool picks it up on first use. Must run before the first rust parallel call (rayon reads the env var at global-pool init time). Idempotent. + +Environment variables: +- GVL_NUM_THREADS: Set worker count explicitly (default: cgroup cpuset or + os.cpu_count). Overrides cgroup detection. +- GVL_FORCE_PARALLEL: Force parallelization even for small inputs + (default: use size threshold). Set to a truthy value (1, true, yes, on). +- RAYON_NUM_THREADS: Overwritten by cap_threads with GVL's resolved count. + An inherited value (e.g. from a base image) does not win. """ from __future__ import annotations +import math import os +from pathlib import Path _MIN_BYTES_PER_THREAD = 1 << 20 # 1 MiB _NUM_THREADS: int | None = None +_TRUTHY = frozenset({"1", "true", "yes", "on"}) + +# cgroup CPU-quota files (module-level so tests can repoint them). +_CGROUP_V2_CPU_MAX = Path("/sys/fs/cgroup/cpu.max") +_CGROUP_V1_QUOTA = Path("/sys/fs/cgroup/cpu/cpu.cfs_quota_us") +_CGROUP_V1_PERIOD = Path("/sys/fs/cgroup/cpu/cpu.cfs_period_us") + + +def _read_int(path: Path) -> int | None: + try: + return int(path.read_text().strip()) + except (OSError, ValueError): + return None + + +def _cgroup_cpu_quota() -> int | None: + """Effective CPU count implied by a CFS quota, or None if unlimited/unreadable. + + A CFS *quota* (cpu.max / cpu.cfs_quota_us) is invisible to + sched_getaffinity, so a 15.3-core container still reports 16 cores by + affinity. See issue #263. + """ + # cgroup v2: " " or "max ". + try: + raw = _CGROUP_V2_CPU_MAX.read_text().split() + except OSError: + raw = None + if raw and len(raw) == 2: + quota_s, period_s = raw + if quota_s != "max": + try: + quota, period = int(quota_s), int(period_s) + except ValueError: + quota = period = 0 + if quota > 0 and period > 0: + return max(1, math.ceil(quota / period)) + else: + return None # explicitly unlimited + + # cgroup v1 fallback. + quota = _read_int(_CGROUP_V1_QUOTA) + period = _read_int(_CGROUP_V1_PERIOD) + if quota is not None and quota > 0 and period: + return max(1, math.ceil(quota / period)) + return None + + +def _force_parallel() -> bool: + """True iff GVL_FORCE_PARALLEL is set to a truthy value (read live).""" + return os.environ.get("GVL_FORCE_PARALLEL", "").strip().lower() in _TRUTHY + def _detect_cpus() -> int: try: - return max(1, len(os.sched_getaffinity(0))) # respects cgroup cpuset (Linux) + affinity = max( + 1, len(os.sched_getaffinity(0)) + ) # respects cgroup cpuset (Linux) except AttributeError: - return max(1, os.cpu_count() or 1) + affinity = max(1, os.cpu_count() or 1) + quota = _cgroup_cpu_quota() + if quota is not None: + return max(1, min(affinity, quota)) + return affinity def _resolve_num_threads() -> int: @@ -35,13 +102,16 @@ def _resolve_num_threads() -> int: def cap_threads() -> int: """Resolve worker count once and pin rayon's pool via RAYON_NUM_THREADS. - Must run before the first rust parallel call (rayon reads RAYON_NUM_THREADS - at global-pool init). Idempotent. + Overwrites any ambient RAYON_NUM_THREADS: an inherited value (e.g. from a + base image) must not defeat GVL's cgroup-aware cap (issue #263). Users who + want explicit control set GVL_NUM_THREADS. Must run before the first rust + parallel call (rayon reads RAYON_NUM_THREADS at global-pool init). + Idempotent. """ global _NUM_THREADS if _NUM_THREADS is None: _NUM_THREADS = _resolve_num_threads() - os.environ.setdefault("RAYON_NUM_THREADS", str(_NUM_THREADS)) + os.environ["RAYON_NUM_THREADS"] = str(_NUM_THREADS) return _NUM_THREADS @@ -50,4 +120,8 @@ def num_threads() -> int: def should_parallelize(total_bytes: int) -> bool: + # GVL_FORCE_PARALLEL bypasses the size gate so the multithreaded paths run + # on small inputs (tests, repro harnesses). See issue #263. + if _force_parallel(): + return True return total_bytes >= num_threads() * _MIN_BYTES_PER_THREAD diff --git a/src/ffi/mod.rs b/src/ffi/mod.rs index b1ca34fd..913af48b 100644 --- a/src/ffi/mod.rs +++ b/src/ffi/mod.rs @@ -49,19 +49,31 @@ pub fn get_diffs_sparse<'py>( parallel: bool, ) -> Bound<'py, PyArray2> { let go = geno_offsets.as_array(); - let diffs = genotypes::get_diffs_sparse( - geno_offset_idx.as_array(), - geno_v_idxs.as_array(), - go.row(0), - go.row(1), - ilens.as_array(), - keep.as_ref().map(|a| a.as_array()), - keep_offsets.as_ref().map(|a| a.as_array()), - q_starts.as_ref().map(|a| a.as_array()), - q_ends.as_ref().map(|a| a.as_array()), - v_starts.as_ref().map(|a| a.as_array()), - parallel, - ); + let go_starts = go.row(0); + let go_stops = go.row(1); + let geno_offset_idx_a = geno_offset_idx.as_array(); + let geno_v_idxs_a = geno_v_idxs.as_array(); + let ilens_a = ilens.as_array(); + let keep_a = keep.as_ref().map(|a| a.as_array()); + let keep_offsets_a = keep_offsets.as_ref().map(|a| a.as_array()); + let q_starts_a = q_starts.as_ref().map(|a| a.as_array()); + let q_ends_a = q_ends.as_ref().map(|a| a.as_array()); + let v_starts_a = v_starts.as_ref().map(|a| a.as_array()); + let diffs = py.detach(move || { + genotypes::get_diffs_sparse( + geno_offset_idx_a, + geno_v_idxs_a, + go_starts, + go_stops, + ilens_a, + keep_a, + keep_offsets_a, + q_starts_a, + q_ends_a, + v_starts_a, + parallel, + ) + }); diffs.into_pyarray(py) } @@ -69,6 +81,7 @@ pub fn get_diffs_sparse<'py>( #[pyfunction] #[allow(clippy::too_many_arguments)] pub fn intervals_to_tracks( + py: Python<'_>, offset_idxs: PyReadonlyArray1, starts: PyReadonlyArray1, itv_starts: PyReadonlyArray1, @@ -79,17 +92,27 @@ pub fn intervals_to_tracks( out_offsets: PyReadonlyArray1, parallel: bool, ) { - intervals::intervals_to_tracks( - offset_idxs.as_array(), - starts.as_array(), - itv_starts.as_array(), - itv_ends.as_array(), - itv_values.as_array(), - itv_offsets.as_array(), - out.as_array_mut(), - out_offsets.as_array(), - parallel, - ); + let offset_idxs_a = offset_idxs.as_array(); + let starts_a = starts.as_array(); + let itv_starts_a = itv_starts.as_array(); + let itv_ends_a = itv_ends.as_array(); + let itv_values_a = itv_values.as_array(); + let itv_offsets_a = itv_offsets.as_array(); + let out_a = out.as_array_mut(); + let out_offsets_a = out_offsets.as_array(); + py.detach(move || { + intervals::intervals_to_tracks( + offset_idxs_a, + starts_a, + itv_starts_a, + itv_ends_a, + itv_values_a, + itv_offsets_a, + out_a, + out_offsets_a, + parallel, + ); + }); } /// Exonic keep-mask (see `genotypes::choose_exonic_variants`). Returns @@ -484,6 +507,7 @@ pub fn assemble_variant_buffers_i32<'py>( #[pyfunction] #[allow(clippy::too_many_arguments)] pub fn reconstruct_haplotypes_from_sparse( + py: Python<'_>, mut out: PyReadwriteArray1, out_offsets: PyReadonlyArray1, regions: PyReadonlyArray2, @@ -506,28 +530,48 @@ pub fn reconstruct_haplotypes_from_sparse( ) { use crate::reconstruct; let go = geno_offsets.as_array(); - reconstruct::reconstruct_haplotypes_from_sparse( - out.as_array_mut(), - out_offsets.as_array(), - regions.as_array(), - shifts.as_array(), - geno_offset_idx.as_array(), - go.row(0), - go.row(1), - geno_v_idxs.as_array(), - v_starts.as_array(), - ilens.as_array(), - alt_alleles.as_array(), - alt_offsets.as_array(), - ref_.as_array(), - ref_offsets.as_array(), - pad_char, - keep.as_ref().map(|k| k.as_array()), - keep_offsets.as_ref().map(|ko| ko.as_array()), - annot_v_idxs.as_mut().map(|a| a.as_array_mut()), - annot_ref_pos.as_mut().map(|a| a.as_array_mut()), - parallel, - ); + let go_starts = go.row(0); + let go_stops = go.row(1); + let out_a = out.as_array_mut(); + let out_offsets_a = out_offsets.as_array(); + let regions_a = regions.as_array(); + let shifts_a = shifts.as_array(); + let geno_offset_idx_a = geno_offset_idx.as_array(); + let geno_v_idxs_a = geno_v_idxs.as_array(); + let v_starts_a = v_starts.as_array(); + let ilens_a = ilens.as_array(); + let alt_alleles_a = alt_alleles.as_array(); + let alt_offsets_a = alt_offsets.as_array(); + let ref_a = ref_.as_array(); + let ref_offsets_a = ref_offsets.as_array(); + let keep_a = keep.as_ref().map(|k| k.as_array()); + let keep_offsets_a = keep_offsets.as_ref().map(|ko| ko.as_array()); + let annot_v_idxs_a = annot_v_idxs.as_mut().map(|a| a.as_array_mut()); + let annot_ref_pos_a = annot_ref_pos.as_mut().map(|a| a.as_array_mut()); + py.detach(move || { + reconstruct::reconstruct_haplotypes_from_sparse( + out_a, + out_offsets_a, + regions_a, + shifts_a, + geno_offset_idx_a, + go_starts, + go_stops, + geno_v_idxs_a, + v_starts_a, + ilens_a, + alt_alleles_a, + alt_offsets_a, + ref_a, + ref_offsets_a, + pad_char, + keep_a, + keep_offsets_a, + annot_v_idxs_a, + annot_ref_pos_a, + parallel, + ); + }); } /// Fused haplotypes __getitem__ kernel (Task 13). @@ -584,95 +628,106 @@ pub fn reconstruct_haplotypes_fused<'py>( let geno_v_idxs_a = geno_v_idxs.as_array(); let v_starts_a = v_starts.as_array(); let ilens_a = ilens.as_array(); + let alt_alleles_a = alt_alleles.as_array(); + let alt_offsets_a = alt_offsets.as_array(); + let ref_a = ref_.as_array(); + let ref_offsets_a = ref_offsets.as_array(); + let keep_a = keep.as_ref().map(|a| a.as_array()); + let keep_offsets_a = keep_offsets.as_ref().map(|a| a.as_array()); + let to_rc_a = to_rc.as_ref().map(|a| a.as_array()); let (batch_size, ploidy) = geno_offset_idx_a.dim(); let n_work = batch_size * ploidy; - // Step 1: compute per-haplotype length diffs (reuses get_diffs_sparse core). - // Mirrors _haps.py _haplotype_ilens exactly: pass q_starts/q_ends/v_starts so - // partial deletions that span a query boundary are correctly clipped. - // q_starts = regions[:, 1], q_ends = regions[:, 2] (both already in regions_a). - // v_starts is the same array passed in — it is the per-variant genomic start. - let q_starts_owned: ndarray::Array1 = regions_a.column(1).to_owned(); - let q_ends_owned: ndarray::Array1 = regions_a.column(2).to_owned(); - let diffs = genotypes::get_diffs_sparse( - geno_offset_idx_a, - geno_v_idxs_a, - go_starts, - go_stops, - ilens_a, - keep.as_ref().map(|a| a.as_array()), - keep_offsets.as_ref().map(|a| a.as_array()), - Some(q_starts_owned.view()), // q_starts = regions[:, 1] - Some(q_ends_owned.view()), // q_ends = regions[:, 2] - Some(v_starts_a), // v_starts = per-variant genomic starts - parallel, - ); + let (out_data, out_offsets_vec) = py.detach(move || { + // Step 1: compute per-haplotype length diffs (reuses get_diffs_sparse core). + // Mirrors _haps.py _haplotype_ilens exactly: pass q_starts/q_ends/v_starts so + // partial deletions that span a query boundary are correctly clipped. + // q_starts = regions[:, 1], q_ends = regions[:, 2] (both already in regions_a). + // v_starts is the same array passed in — it is the per-variant genomic start. + let q_starts_owned: ndarray::Array1 = regions_a.column(1).to_owned(); + let q_ends_owned: ndarray::Array1 = regions_a.column(2).to_owned(); + let diffs = genotypes::get_diffs_sparse( + geno_offset_idx_a, + geno_v_idxs_a, + go_starts, + go_stops, + ilens_a, + keep_a, + keep_offsets_a, + Some(q_starts_owned.view()), // q_starts = regions[:, 1] + Some(q_ends_owned.view()), // q_ends = regions[:, 2] + Some(v_starts_a), // v_starts = per-variant genomic starts + parallel, + ); - // Step 2: compute per-haplotype output lengths and prefix-sum offsets. - // Mirrors the Python side: out_lengths = hap_lengths (or fixed output_length). - // hap_lengths = regions[:, 2] - regions[:, 1] + diffs (end - start + diff) - // out_offsets shape: (n_work + 1,) - let mut out_offsets_vec: Array1 = Array1::zeros(n_work + 1); - { - let mut acc: i64 = 0; - out_offsets_vec[0] = 0; - for k in 0..n_work { - let query = k / ploidy; - let hap = k % ploidy; - let len: i64 = if output_length >= 0 { - output_length - } else { - let ref_len = (regions_a[[query, 2]] - regions_a[[query, 1]]) as i64; - let diff = diffs[[query, hap]] as i64; - (ref_len + diff).max(0) - }; - acc += len; - out_offsets_vec[k + 1] = acc; + // Step 2: compute per-haplotype output lengths and prefix-sum offsets. + // Mirrors the Python side: out_lengths = hap_lengths (or fixed output_length). + // hap_lengths = regions[:, 2] - regions[:, 1] + diffs (end - start + diff) + // out_offsets shape: (n_work + 1,) + let mut out_offsets_vec: Array1 = Array1::zeros(n_work + 1); + { + let mut acc: i64 = 0; + out_offsets_vec[0] = 0; + for k in 0..n_work { + let query = k / ploidy; + let hap = k % ploidy; + let len: i64 = if output_length >= 0 { + output_length + } else { + let ref_len = (regions_a[[query, 2]] - regions_a[[query, 1]]) as i64; + let diff = diffs[[query, hap]] as i64; + (ref_len + diff).max(0) + }; + acc += len; + out_offsets_vec[k + 1] = acc; + } } - } - // Step 3: allocate the output buffer in Rust — Python never calls np.empty. - let total = out_offsets_vec[n_work] as usize; - let mut out_data: Array1 = uninit_output(total); - - // Step 4: reconstruct all haplotypes into the owned buffer (reuses batch core). - reconstruct::reconstruct_haplotypes_from_sparse( - out_data.view_mut(), - out_offsets_vec.view(), - regions_a, - shifts_a, - geno_offset_idx_a, - go_starts, - go_stops, - geno_v_idxs_a, - v_starts_a, - ilens_a, - alt_alleles.as_array(), - alt_offsets.as_array(), - ref_.as_array(), - ref_offsets.as_array(), - pad_char, - keep.as_ref().map(|k| k.as_array()), - keep_offsets.as_ref().map(|ko| ko.as_array()), - None, // annot_v_idxs — not supported in fused plain path - None, // annot_ref_pos — not supported in fused plain path - parallel, - ); + // Step 3: allocate the output buffer in Rust — Python never calls np.empty. + let total = out_offsets_vec[n_work] as usize; + let mut out_data: Array1 = uninit_output(total); - // Step 4b: optional in-kernel reverse-complement (one bool per (query, hap) work item). - if let Some(to_rc) = to_rc.as_ref() { - debug_assert_eq!( - to_rc.as_array().len(), - out_offsets_vec.len() - 1, - "to_rc mask length must equal number of output rows (offsets.len() - 1)" - ); - crate::reverse::rc_flat_rows_inplace( - out_data.as_slice_mut().unwrap(), + // Step 4: reconstruct all haplotypes into the owned buffer (reuses batch core). + reconstruct::reconstruct_haplotypes_from_sparse( + out_data.view_mut(), out_offsets_vec.view(), - to_rc.as_array(), + regions_a, + shifts_a, + geno_offset_idx_a, + go_starts, + go_stops, + geno_v_idxs_a, + v_starts_a, + ilens_a, + alt_alleles_a, + alt_offsets_a, + ref_a, + ref_offsets_a, + pad_char, + keep_a, + keep_offsets_a, + None, // annot_v_idxs — not supported in fused plain path + None, // annot_ref_pos — not supported in fused plain path + parallel, ); - } + + // Step 4b: optional in-kernel reverse-complement (one bool per (query, hap) work item). + if let Some(to_rc) = to_rc_a.as_ref() { + debug_assert_eq!( + to_rc.len(), + out_offsets_vec.len() - 1, + "to_rc mask length must equal number of output rows (offsets.len() - 1)" + ); + crate::reverse::rc_flat_rows_inplace( + out_data.as_slice_mut().unwrap(), + out_offsets_vec.view(), + *to_rc, + ); + } + + (out_data, out_offsets_vec) + }); // Step 5: return owned arrays — Python wraps them with no further coercions. (out_data.into_pyarray(py), out_offsets_vec.into_pyarray(py)) @@ -726,50 +781,68 @@ pub fn reconstruct_haplotypes_spliced_fused<'py>( // out_offsets are precomputed by the Python splice plan — use them directly. let out_offsets_a = out_offsets.as_array(); - let total = out_offsets_a[out_offsets_a.len() - 1] as usize; - - // Allocate output buffer. - let mut out_data: Array1 = uninit_output(total); - - // Reconstruct all haplotypes into the owned buffer (reuses batch core). - reconstruct::reconstruct_haplotypes_from_sparse( - out_data.view_mut(), - out_offsets_a, - permuted_regions.as_array(), - flat_shifts.as_array(), - flat_geno_offset_idx.as_array(), - go_starts, - go_stops, - geno_v_idxs.as_array(), - v_starts.as_array(), - ilens.as_array(), - alt_alleles.as_array(), - alt_offsets.as_array(), - ref_.as_array(), - ref_offsets.as_array(), - pad_char, - keep.as_ref().map(|k| k.as_array()), - keep_offsets.as_ref().map(|ko| ko.as_array()), - None, // annot_v_idxs — not used in splice path - None, // annot_ref_pos — not used in splice path - parallel, - ); - - // Optional in-place RC per permuted element (negative-strand haplotypes). - // out_offsets_a is the permuted per-element offsets array (splice_plan.permuted_out_offsets), - // so each masked element is RC'd in its own byte range — matching the to_rc_per_elem post-pass. - if let Some(to_rc) = to_rc.as_ref() { - debug_assert_eq!( - to_rc.as_array().len(), - out_offsets_a.len() - 1, - "to_rc mask length must equal number of output rows (offsets.len() - 1)" - ); - crate::reverse::rc_flat_rows_inplace( - out_data.as_slice_mut().unwrap(), + let permuted_regions_a = permuted_regions.as_array(); + let flat_shifts_a = flat_shifts.as_array(); + let flat_geno_offset_idx_a = flat_geno_offset_idx.as_array(); + let geno_v_idxs_a = geno_v_idxs.as_array(); + let v_starts_a = v_starts.as_array(); + let ilens_a = ilens.as_array(); + let alt_alleles_a = alt_alleles.as_array(); + let alt_offsets_a = alt_offsets.as_array(); + let ref_a = ref_.as_array(); + let ref_offsets_a = ref_offsets.as_array(); + let keep_a = keep.as_ref().map(|k| k.as_array()); + let keep_offsets_a = keep_offsets.as_ref().map(|ko| ko.as_array()); + let to_rc_a = to_rc.as_ref().map(|a| a.as_array()); + + let out_data = py.detach(move || { + let total = out_offsets_a[out_offsets_a.len() - 1] as usize; + + // Allocate output buffer. + let mut out_data: Array1 = uninit_output(total); + + // Reconstruct all haplotypes into the owned buffer (reuses batch core). + reconstruct::reconstruct_haplotypes_from_sparse( + out_data.view_mut(), out_offsets_a, - to_rc.as_array(), + permuted_regions_a, + flat_shifts_a, + flat_geno_offset_idx_a, + go_starts, + go_stops, + geno_v_idxs_a, + v_starts_a, + ilens_a, + alt_alleles_a, + alt_offsets_a, + ref_a, + ref_offsets_a, + pad_char, + keep_a, + keep_offsets_a, + None, // annot_v_idxs — not used in splice path + None, // annot_ref_pos — not used in splice path + parallel, ); - } + + // Optional in-place RC per permuted element (negative-strand haplotypes). + // out_offsets_a is the permuted per-element offsets array (splice_plan.permuted_out_offsets), + // so each masked element is RC'd in its own byte range — matching the to_rc_per_elem post-pass. + if let Some(to_rc) = to_rc_a.as_ref() { + debug_assert_eq!( + to_rc.len(), + out_offsets_a.len() - 1, + "to_rc mask length must equal number of output rows (offsets.len() - 1)" + ); + crate::reverse::rc_flat_rows_inplace( + out_data.as_slice_mut().unwrap(), + out_offsets_a, + *to_rc, + ); + } + + out_data + }); // Return out_data only — Python already holds out_offsets (no round-trip). out_data.into_pyarray(py) @@ -826,52 +899,70 @@ pub fn reconstruct_annotated_haplotypes_spliced_fused<'py>( // out_offsets are precomputed by the Python splice plan — use them directly. let out_offsets_a = out_offsets.as_array(); - let total = out_offsets_a[out_offsets_a.len() - 1] as usize; - - // Allocate the sequence + annotation buffers. - let mut out_data: Array1 = uninit_output(total); - let mut annot_v: Array1 = uninit_output(total); - let mut annot_pos: Array1 = uninit_output(total); - - // Reconstruct all haplotypes + annotations into the owned buffers (reuses batch core). - reconstruct::reconstruct_haplotypes_from_sparse( - out_data.view_mut(), - out_offsets_a, - permuted_regions.as_array(), - flat_shifts.as_array(), - flat_geno_offset_idx.as_array(), - go_starts, - go_stops, - geno_v_idxs.as_array(), - v_starts.as_array(), - ilens.as_array(), - alt_alleles.as_array(), - alt_offsets.as_array(), - ref_.as_array(), - ref_offsets.as_array(), - pad_char, - keep.as_ref().map(|k| k.as_array()), - keep_offsets.as_ref().map(|ko| ko.as_array()), - Some(annot_v.view_mut()), // annot_v_idxs — variant index per nucleotide - Some(annot_pos.view_mut()), // annot_ref_pos — reference coordinate per nucleotide - parallel, - ); - - // Optional in-place RC per permuted element. Sequence bytes are reverse-complemented; - // annotation rows are reversed only (no complement) — matching - // _FlatAnnotatedHaps.reverse_masked. out_offsets_a is the permuted per-element - // offsets array, so each masked element is transformed in its own byte range. - if let Some(to_rc) = to_rc.as_ref() { - let m = to_rc.as_array(); - debug_assert_eq!( - m.len(), - out_offsets_a.len() - 1, - "to_rc mask length must equal number of output rows (offsets.len() - 1)" + let permuted_regions_a = permuted_regions.as_array(); + let flat_shifts_a = flat_shifts.as_array(); + let flat_geno_offset_idx_a = flat_geno_offset_idx.as_array(); + let geno_v_idxs_a = geno_v_idxs.as_array(); + let v_starts_a = v_starts.as_array(); + let ilens_a = ilens.as_array(); + let alt_alleles_a = alt_alleles.as_array(); + let alt_offsets_a = alt_offsets.as_array(); + let ref_a = ref_.as_array(); + let ref_offsets_a = ref_offsets.as_array(); + let keep_a = keep.as_ref().map(|k| k.as_array()); + let keep_offsets_a = keep_offsets.as_ref().map(|ko| ko.as_array()); + let to_rc_a = to_rc.as_ref().map(|a| a.as_array()); + + let (out_data, annot_v, annot_pos) = py.detach(move || { + let total = out_offsets_a[out_offsets_a.len() - 1] as usize; + + // Allocate the sequence + annotation buffers. + let mut out_data: Array1 = uninit_output(total); + let mut annot_v: Array1 = uninit_output(total); + let mut annot_pos: Array1 = uninit_output(total); + + // Reconstruct all haplotypes + annotations into the owned buffers (reuses batch core). + reconstruct::reconstruct_haplotypes_from_sparse( + out_data.view_mut(), + out_offsets_a, + permuted_regions_a, + flat_shifts_a, + flat_geno_offset_idx_a, + go_starts, + go_stops, + geno_v_idxs_a, + v_starts_a, + ilens_a, + alt_alleles_a, + alt_offsets_a, + ref_a, + ref_offsets_a, + pad_char, + keep_a, + keep_offsets_a, + Some(annot_v.view_mut()), // annot_v_idxs — variant index per nucleotide + Some(annot_pos.view_mut()), // annot_ref_pos — reference coordinate per nucleotide + parallel, ); - crate::reverse::rc_flat_rows_inplace(out_data.as_slice_mut().unwrap(), out_offsets_a, m); - crate::reverse::reverse_flat_rows_inplace(annot_v.as_slice_mut().unwrap(), out_offsets_a, m); - crate::reverse::reverse_flat_rows_inplace(annot_pos.as_slice_mut().unwrap(), out_offsets_a, m); - } + + // Optional in-place RC per permuted element. Sequence bytes are reverse-complemented; + // annotation rows are reversed only (no complement) — matching + // _FlatAnnotatedHaps.reverse_masked. out_offsets_a is the permuted per-element + // offsets array, so each masked element is transformed in its own byte range. + if let Some(to_rc) = to_rc_a.as_ref() { + let m = *to_rc; + debug_assert_eq!( + m.len(), + out_offsets_a.len() - 1, + "to_rc mask length must equal number of output rows (offsets.len() - 1)" + ); + crate::reverse::rc_flat_rows_inplace(out_data.as_slice_mut().unwrap(), out_offsets_a, m); + crate::reverse::reverse_flat_rows_inplace(annot_v.as_slice_mut().unwrap(), out_offsets_a, m); + crate::reverse::reverse_flat_rows_inplace(annot_pos.as_slice_mut().unwrap(), out_offsets_a, m); + } + + (out_data, annot_v, annot_pos) + }); ( out_data.into_pyarray(py), @@ -944,95 +1035,107 @@ pub fn reconstruct_annotated_haplotypes_fused<'py>( let geno_v_idxs_a = geno_v_idxs.as_array(); let v_starts_a = v_starts.as_array(); let ilens_a = ilens.as_array(); + let alt_alleles_a = alt_alleles.as_array(); + let alt_offsets_a = alt_offsets.as_array(); + let ref_a = ref_.as_array(); + let ref_offsets_a = ref_offsets.as_array(); + let keep_a = keep.as_ref().map(|a| a.as_array()); + let keep_offsets_a = keep_offsets.as_ref().map(|a| a.as_array()); + let to_rc_a = to_rc.as_ref().map(|a| a.as_array()); let (batch_size, ploidy) = geno_offset_idx_a.dim(); let n_work = batch_size * ploidy; - // Step 1: compute per-haplotype length diffs (reuses get_diffs_sparse core). - // Mirrors _haps.py _haplotype_ilens exactly: pass q_starts/q_ends/v_starts so - // partial deletions that span a query boundary are correctly clipped. - // q_starts = regions[:, 1], q_ends = regions[:, 2] (both already in regions_a). - // v_starts is the same array passed in — it is the per-variant genomic start. - let q_starts_owned: ndarray::Array1 = regions_a.column(1).to_owned(); - let q_ends_owned: ndarray::Array1 = regions_a.column(2).to_owned(); - let diffs = genotypes::get_diffs_sparse( - geno_offset_idx_a, - geno_v_idxs_a, - go_starts, - go_stops, - ilens_a, - keep.as_ref().map(|a| a.as_array()), - keep_offsets.as_ref().map(|a| a.as_array()), - Some(q_starts_owned.view()), // q_starts = regions[:, 1] - Some(q_ends_owned.view()), // q_ends = regions[:, 2] - Some(v_starts_a), // v_starts = per-variant genomic starts - parallel, - ); + let (out_data, annot_v, annot_pos, out_offsets_vec) = py.detach(move || { + // Step 1: compute per-haplotype length diffs (reuses get_diffs_sparse core). + // Mirrors _haps.py _haplotype_ilens exactly: pass q_starts/q_ends/v_starts so + // partial deletions that span a query boundary are correctly clipped. + // q_starts = regions[:, 1], q_ends = regions[:, 2] (both already in regions_a). + // v_starts is the same array passed in — it is the per-variant genomic start. + let q_starts_owned: ndarray::Array1 = regions_a.column(1).to_owned(); + let q_ends_owned: ndarray::Array1 = regions_a.column(2).to_owned(); + let diffs = genotypes::get_diffs_sparse( + geno_offset_idx_a, + geno_v_idxs_a, + go_starts, + go_stops, + ilens_a, + keep_a, + keep_offsets_a, + Some(q_starts_owned.view()), // q_starts = regions[:, 1] + Some(q_ends_owned.view()), // q_ends = regions[:, 2] + Some(v_starts_a), // v_starts = per-variant genomic starts + parallel, + ); - // Step 2: compute per-haplotype output lengths and prefix-sum offsets. - // Mirrors the Python side: out_lengths = hap_lengths (or fixed output_length). - // hap_lengths = regions[:, 2] - regions[:, 1] + diffs (end - start + diff) - // out_offsets shape: (n_work + 1,) - let mut out_offsets_vec: Array1 = Array1::zeros(n_work + 1); - { - let mut acc: i64 = 0; - out_offsets_vec[0] = 0; - for k in 0..n_work { - let query = k / ploidy; - let hap = k % ploidy; - let len: i64 = if output_length >= 0 { - output_length - } else { - let ref_len = (regions_a[[query, 2]] - regions_a[[query, 1]]) as i64; - let diff = diffs[[query, hap]] as i64; - (ref_len + diff).max(0) - }; - acc += len; - out_offsets_vec[k + 1] = acc; + // Step 2: compute per-haplotype output lengths and prefix-sum offsets. + // Mirrors the Python side: out_lengths = hap_lengths (or fixed output_length). + // hap_lengths = regions[:, 2] - regions[:, 1] + diffs (end - start + diff) + // out_offsets shape: (n_work + 1,) + let mut out_offsets_vec: Array1 = Array1::zeros(n_work + 1); + { + let mut acc: i64 = 0; + out_offsets_vec[0] = 0; + for k in 0..n_work { + let query = k / ploidy; + let hap = k % ploidy; + let len: i64 = if output_length >= 0 { + output_length + } else { + let ref_len = (regions_a[[query, 2]] - regions_a[[query, 1]]) as i64; + let diff = diffs[[query, hap]] as i64; + (ref_len + diff).max(0) + }; + acc += len; + out_offsets_vec[k + 1] = acc; + } } - } - // Step 3: allocate the output buffer and annotation buffers in Rust. - let total = out_offsets_vec[n_work] as usize; - let mut out_data: Array1 = uninit_output(total); - let mut annot_v: Array1 = uninit_output(total); - let mut annot_pos: Array1 = uninit_output(total); - - // Step 4: reconstruct all haplotypes into the owned buffers (reuses batch core). - reconstruct::reconstruct_haplotypes_from_sparse( - out_data.view_mut(), - out_offsets_vec.view(), - regions_a, - shifts_a, - geno_offset_idx_a, - go_starts, - go_stops, - geno_v_idxs_a, - v_starts_a, - ilens_a, - alt_alleles.as_array(), - alt_offsets.as_array(), - ref_.as_array(), - ref_offsets.as_array(), - pad_char, - keep.as_ref().map(|k| k.as_array()), - keep_offsets.as_ref().map(|ko| ko.as_array()), - Some(annot_v.view_mut()), // annot_v_idxs — variant index per nucleotide - Some(annot_pos.view_mut()), // annot_ref_pos — reference coordinate per nucleotide - parallel, - ); + // Step 3: allocate the output buffer and annotation buffers in Rust. + let total = out_offsets_vec[n_work] as usize; + let mut out_data: Array1 = uninit_output(total); + let mut annot_v: Array1 = uninit_output(total); + let mut annot_pos: Array1 = uninit_output(total); - if let Some(to_rc) = to_rc.as_ref() { - let m = to_rc.as_array(); - debug_assert_eq!( - m.len(), - out_offsets_vec.len() - 1, - "to_rc mask length must equal number of output rows (offsets.len() - 1)" + // Step 4: reconstruct all haplotypes into the owned buffers (reuses batch core). + reconstruct::reconstruct_haplotypes_from_sparse( + out_data.view_mut(), + out_offsets_vec.view(), + regions_a, + shifts_a, + geno_offset_idx_a, + go_starts, + go_stops, + geno_v_idxs_a, + v_starts_a, + ilens_a, + alt_alleles_a, + alt_offsets_a, + ref_a, + ref_offsets_a, + pad_char, + keep_a, + keep_offsets_a, + Some(annot_v.view_mut()), // annot_v_idxs — variant index per nucleotide + Some(annot_pos.view_mut()), // annot_ref_pos — reference coordinate per nucleotide + parallel, ); - crate::reverse::rc_flat_rows_inplace(out_data.as_slice_mut().unwrap(), out_offsets_vec.view(), m); - crate::reverse::reverse_flat_rows_inplace(annot_v.as_slice_mut().unwrap(), out_offsets_vec.view(), m); - crate::reverse::reverse_flat_rows_inplace(annot_pos.as_slice_mut().unwrap(), out_offsets_vec.view(), m); - } + + if let Some(to_rc) = to_rc_a.as_ref() { + let m = *to_rc; + debug_assert_eq!( + m.len(), + out_offsets_vec.len() - 1, + "to_rc mask length must equal number of output rows (offsets.len() - 1)" + ); + crate::reverse::rc_flat_rows_inplace(out_data.as_slice_mut().unwrap(), out_offsets_vec.view(), m); + crate::reverse::reverse_flat_rows_inplace(annot_v.as_slice_mut().unwrap(), out_offsets_vec.view(), m); + crate::reverse::reverse_flat_rows_inplace(annot_pos.as_slice_mut().unwrap(), out_offsets_vec.view(), m); + } + + (out_data, annot_v, annot_pos, out_offsets_vec) + }); + // Step 5: return owned arrays — Python wraps them with no further coercions. ( out_data.into_pyarray(py), @@ -1055,15 +1158,22 @@ pub fn get_reference<'py>( parallel: bool, to_rc: Option>, ) -> Bound<'py, PyArray1> { - let out = reference::get_reference( - regions.as_array(), - out_offsets.as_array(), - reference.as_array(), - ref_offsets.as_array(), - pad_char, - parallel, - to_rc.as_ref().map(|a| a.as_array()), - ); + let regions_a = regions.as_array(); + let out_offsets_a = out_offsets.as_array(); + let reference_a = reference.as_array(); + let ref_offsets_a = ref_offsets.as_array(); + let to_rc_a = to_rc.as_ref().map(|a| a.as_array()); + let out = py.detach(move || { + reference::get_reference( + regions_a, + out_offsets_a, + reference_a, + ref_offsets_a, + pad_char, + parallel, + to_rc_a, + ) + }); out.into_pyarray(py) } @@ -1076,6 +1186,7 @@ pub fn get_reference<'py>( #[pyfunction] #[allow(clippy::too_many_arguments)] pub fn shift_and_realign_tracks_sparse( + py: Python<'_>, mut out: PyReadwriteArray1, out_offsets: PyReadonlyArray1, regions: PyReadonlyArray2, @@ -1096,26 +1207,43 @@ pub fn shift_and_realign_tracks_sparse( ) { use crate::tracks; let go = geno_offsets.as_array(); - tracks::shift_and_realign_tracks_sparse( - out.as_array_mut(), - out_offsets.as_array(), - regions.as_array(), - shifts.as_array(), - geno_offset_idx.as_array(), - geno_v_idxs.as_array(), - go.row(0), - go.row(1), - v_starts.as_array(), - ilens.as_array(), - tracks.as_array(), - track_offsets.as_array(), - params.as_array(), - keep.as_ref().map(|k| k.as_array()), - keep_offsets.as_ref().map(|ko| ko.as_array()), - strategy_id, - base_seed, - parallel, - ); + let go_starts = go.row(0); + let go_stops = go.row(1); + let out_a = out.as_array_mut(); + let out_offsets_a = out_offsets.as_array(); + let regions_a = regions.as_array(); + let shifts_a = shifts.as_array(); + let geno_offset_idx_a = geno_offset_idx.as_array(); + let geno_v_idxs_a = geno_v_idxs.as_array(); + let v_starts_a = v_starts.as_array(); + let ilens_a = ilens.as_array(); + let tracks_a = tracks.as_array(); + let track_offsets_a = track_offsets.as_array(); + let params_a = params.as_array(); + let keep_a = keep.as_ref().map(|k| k.as_array()); + let keep_offsets_a = keep_offsets.as_ref().map(|ko| ko.as_array()); + py.detach(move || { + tracks::shift_and_realign_tracks_sparse( + out_a, + out_offsets_a, + regions_a, + shifts_a, + geno_offset_idx_a, + geno_v_idxs_a, + go_starts, + go_stops, + v_starts_a, + ilens_a, + tracks_a, + track_offsets_a, + params_a, + keep_a, + keep_offsets_a, + strategy_id, + base_seed, + parallel, + ); + }); } /// RLE-encode a ragged f32 track buffer into (starts, ends, values, offsets). @@ -1136,12 +1264,12 @@ pub fn tracks_to_intervals<'py>( Bound<'py, PyArray1>, ) { use crate::tracks; - let (starts, ends, values, offsets) = tracks::tracks_to_intervals( - regions.as_array(), - tracks.as_array(), - track_offsets.as_array(), - parallel, - ); + let regions_a = regions.as_array(); + let tracks_a = tracks.as_array(); + let track_offsets_a = track_offsets.as_array(); + let (starts, ends, values, offsets) = py.detach(move || { + tracks::tracks_to_intervals(regions_a, tracks_a, track_offsets_a, parallel) + }); ( starts.into_pyarray(py), ends.into_pyarray(py), @@ -1172,6 +1300,7 @@ pub fn tracks_to_intervals<'py>( #[pyfunction] #[allow(clippy::too_many_arguments)] pub fn intervals_and_realign_track_fused( + py: Python<'_>, mut out: PyReadwriteArray1, // (b*p*l) — caller's per-track slice out_offsets: PyReadonlyArray1, // (b*p + 1) regions: PyReadonlyArray2, // (b, 3) @@ -1206,70 +1335,87 @@ pub fn intervals_and_realign_track_fused( let out_offsets_a = out_offsets.as_array(); let regions_a = regions.as_array(); - - // Determine scratch buffer size from track_offsets. let track_offsets_a = track_offsets.as_array(); - let scratch_len = track_offsets_a[track_offsets_a.len() - 1] as usize; - - // Allocate Rust-side scratch buffer — replaces Python `_tracks = np.empty(...)`. - // intervals_to_tracks calls out.fill(0.0) as its first step, so full-write is - // guaranteed; uninit_output is safe here. - let mut scratch = uninit_output::(scratch_len); - - // Extract query starts (regions[:, 1]) as a contiguous owned array. - // regions_a.column(1) is a non-contiguous view (row-major storage); we - // must own/contiguify it before passing to intervals_to_tracks which - // expects a contiguous ArrayView1. - let q_starts: ndarray::Array1 = regions_a.column(1).to_owned(); - - // Step 1: paint reference-coordinate intervals into scratch (reuses intervals core). - intervals::intervals_to_tracks( - offset_idxs.as_array(), - q_starts.view(), - itv_starts.as_array(), - itv_ends.as_array(), - itv_values.as_array(), - itv_offsets.as_array(), - scratch.view_mut(), - track_offsets_a, - parallel, - ); - - // Step 2: shift and realign into caller's out slice (reuses tracks core). - tracks::shift_and_realign_tracks_sparse( - out.as_array_mut(), - out_offsets_a, - regions_a, - shifts.as_array(), - geno_offset_idx.as_array(), - geno_v_idxs.as_array(), - go_starts, - go_stops, - v_starts.as_array(), - ilens.as_array(), - scratch.view(), - track_offsets_a, - params.as_array(), - keep.as_ref().map(|k| k.as_array()), - keep_offsets.as_ref().map(|ko| ko.as_array()), - strategy_id, - base_seed, - parallel, - ); - - // Step 3: optional in-place reverse for negative-strand tracks (reverse only, no complement). - if let Some(to_rc) = to_rc.as_ref() { - debug_assert_eq!( - to_rc.as_array().len(), - out_offsets.as_array().len() - 1, - "to_rc mask length must equal number of output rows (offsets.len() - 1)" + let mut out_a = out.as_array_mut(); + let shifts_a = shifts.as_array(); + let geno_offset_idx_a = geno_offset_idx.as_array(); + let geno_v_idxs_a = geno_v_idxs.as_array(); + let v_starts_a = v_starts.as_array(); + let ilens_a = ilens.as_array(); + let offset_idxs_a = offset_idxs.as_array(); + let itv_starts_a = itv_starts.as_array(); + let itv_ends_a = itv_ends.as_array(); + let itv_values_a = itv_values.as_array(); + let itv_offsets_a = itv_offsets.as_array(); + let params_a = params.as_array(); + let keep_a = keep.as_ref().map(|k| k.as_array()); + let keep_offsets_a = keep_offsets.as_ref().map(|ko| ko.as_array()); + let to_rc_a = to_rc.as_ref().map(|a| a.as_array()); + + py.detach(move || { + // Determine scratch buffer size from track_offsets. + let scratch_len = track_offsets_a[track_offsets_a.len() - 1] as usize; + + // Allocate Rust-side scratch buffer — replaces Python `_tracks = np.empty(...)`. + // intervals_to_tracks calls out.fill(0.0) as its first step, so full-write is + // guaranteed; uninit_output is safe here. + let mut scratch = uninit_output::(scratch_len); + + // Extract query starts (regions[:, 1]) as a contiguous owned array. + // regions_a.column(1) is a non-contiguous view (row-major storage); we + // must own/contiguify it before passing to intervals_to_tracks which + // expects a contiguous ArrayView1. + let q_starts: ndarray::Array1 = regions_a.column(1).to_owned(); + + // Step 1: paint reference-coordinate intervals into scratch (reuses intervals core). + intervals::intervals_to_tracks( + offset_idxs_a, + q_starts.view(), + itv_starts_a, + itv_ends_a, + itv_values_a, + itv_offsets_a, + scratch.view_mut(), + track_offsets_a, + parallel, ); - crate::reverse::reverse_flat_rows_inplace( - out.as_slice_mut().unwrap(), - out_offsets.as_array(), - to_rc.as_array(), + + // Step 2: shift and realign into caller's out slice (reuses tracks core). + tracks::shift_and_realign_tracks_sparse( + out_a.view_mut(), + out_offsets_a, + regions_a, + shifts_a, + geno_offset_idx_a, + geno_v_idxs_a, + go_starts, + go_stops, + v_starts_a, + ilens_a, + scratch.view(), + track_offsets_a, + params_a, + keep_a, + keep_offsets_a, + strategy_id, + base_seed, + parallel, ); - } + + // Step 3: optional in-place reverse for negative-strand tracks (reverse only, no complement). + if let Some(to_rc) = to_rc_a.as_ref() { + debug_assert_eq!( + to_rc.len(), + out_offsets_a.len() - 1, + "to_rc mask length must equal number of output rows (offsets.len() - 1)" + ); + crate::reverse::reverse_flat_rows_inplace( + out_a.as_slice_mut().unwrap(), + out_offsets_a, + *to_rc, + ); + } + }); Ok(()) } diff --git a/tests/integration/test_rayon_forced_parallel.py b/tests/integration/test_rayon_forced_parallel.py new file mode 100644 index 00000000..72e6c24c --- /dev/null +++ b/tests/integration/test_rayon_forced_parallel.py @@ -0,0 +1,89 @@ +"""Forced-parallel dispatch must be byte-identical to the serial size-gated path. + +GVL_FORCE_PARALLEL=1 makes should_parallelize() return True, so dataset[...] +runs the real rayon path end-to-end on the small test corpus — coverage the +tiny-golden parity suite cannot reach. + +Note: parametrized only for "haplotypes" because RaggedVariants.data is an +object-dtype array of dicts that np.testing.assert_array_equal cannot compare +element-wise; the haplotypes path fully covers the rayon reconstruct+track path. +""" + +from __future__ import annotations + +from pathlib import Path + +import numpy as np +import pyBigWig +import pytest +from genoray import VCF + +import genvarloader as gvl + + +@pytest.fixture() +def variant_track_dataset(source_bed, vcf_dir, reference, tmp_path: Path): + """A haplotypes+track dataset whose getitem hits reconstruct + intervals_to_tracks.""" + contig_sizes = [("chr1", 2_000_000), ("chr2", 2_000_000)] + bw_paths: dict[str, str] = {} + for i, sample in enumerate(["s0", "s1", "s2"]): + bw_path = tmp_path / f"{sample}.bw" + with pyBigWig.open(str(bw_path), "w") as bw: + bw.addHeader(contig_sizes, maxZooms=0) + value = float(i + 1) + bw.addEntries( + ["chr1", "chr1", "chr2", "chr2"], + [499_990, 1_010_686, 17_320, 1_234_560], + ends=[500_030, 1_010_706, 17_340, 1_234_580], + values=[value, value, value, value], + ) + bw_paths[sample] = str(bw_path) + out = tmp_path / "ds.gvl" + gvl.write( + path=out, + bed=source_bed, + variants=VCF(vcf_dir / "filtered_source.vcf.gz"), + tracks=gvl.BigWigs("5ss", bw_paths), + max_jitter=2, + ) + return out + + +def _materialize(ds): + """Reduce a getitem result to a list of numpy arrays for comparison. + + For ragged outputs we compare BOTH the flat data buffer and the offsets + (row boundaries), so a same-bytes/different-layout difference cannot slip + through the byte-identical equivalence check. + """ + out = ds[:, :] + items = out if isinstance(out, tuple) else (out,) + arrays = [] + for it in items: + # Ragged-like objects expose .data (flat buffer) and .offsets (row + # boundaries); dense arrays are ndarrays already. + arrays.append(np.asarray(getattr(it, "data", it))) + offsets = getattr(it, "offsets", None) + if offsets is not None: + arrays.append(np.asarray(offsets)) + return arrays + + +@pytest.mark.parametrize("seq_kind", ["haplotypes"]) +def test_forced_parallel_matches_serial( + variant_track_dataset, reference, monkeypatch, seq_kind +): + def open_ds(): + return gvl.Dataset.open(variant_track_dataset, reference=reference).with_seqs( + seq_kind + ) + + monkeypatch.delenv("GVL_FORCE_PARALLEL", raising=False) + serial = _materialize(open_ds()) + + monkeypatch.setenv("GVL_FORCE_PARALLEL", "1") + parallel = _materialize(open_ds()) + + assert len(serial) == len(parallel) + for s, p in zip(serial, parallel): + np.testing.assert_array_equal(s, p) diff --git a/tests/integration/test_rayon_stress.py b/tests/integration/test_rayon_stress.py new file mode 100644 index 00000000..03d45254 --- /dev/null +++ b/tests/integration/test_rayon_stress.py @@ -0,0 +1,101 @@ +"""Stress reproducer for issue #263: concurrent spawn workers iterating a +Dataset under forced-parallel + oversubscribed rayon must not deadlock. + +A hang (futex-parked workers, per #263) surfaces as future.result(timeout=...) +raising TimeoutError → test failure. Clean completion across repeated launches +is evidence the cause was oversubscription (fixed by the cap_threads changes). +""" + +from __future__ import annotations + +import multiprocessing as mp +import os +from concurrent.futures import ProcessPoolExecutor, TimeoutError as FTimeoutError +from pathlib import Path + +import numpy as np +import pyBigWig +import pytest +from genoray import VCF + +import genvarloader as gvl + +pytestmark = pytest.mark.slow + +# Tuning: if a slow CI box trips the timeout WITHOUT a real hang, lower +# ITERS_PER_WORKER/LAUNCHES or raise PER_LAUNCH_TIMEOUT_S — never remove the +# timeout (it is the deadlock detector). Keep N_WORKERS*RAYON_NUM_THREADS > cores. +N_WORKERS = 5 +ITERS_PER_WORKER = 40 +LAUNCHES = 4 +PER_LAUNCH_TIMEOUT_S = 120 + + +def _iterate_dataset(ds_path: str, reference_path: str, iters: int) -> int: + """Worker body (must be importable/picklable for spawn). Returns bytes touched.""" + # Force the parallel path and oversubscribe with a fixed per-worker thread + # count. cap_threads() OVERWRITES RAYON_NUM_THREADS with GVL's resolved cap, + # so GVL_NUM_THREADS=8 (not a bare RAYON_NUM_THREADS) is what pins each worker + # to 8 rayon threads → N_WORKERS*8 threads regardless of host core count. + os.environ["GVL_FORCE_PARALLEL"] = "1" + os.environ["GVL_NUM_THREADS"] = "8" + os.environ["RAYON_NUM_THREADS"] = "8" + ds = gvl.Dataset.open(Path(ds_path), reference=Path(reference_path)).with_seqs( + "haplotypes" + ) + total = 0 + n = len(ds) + for _ in range(iters): + out = ds[:n, :] + first = out[0] if isinstance(out, tuple) else out + total += int(np.asarray(getattr(first, "data", first)).size) + return total + + +@pytest.fixture() +def stress_dataset(source_bed, vcf_dir, reference, tmp_path: Path) -> tuple[Path, Path]: + contig_sizes = [("chr1", 2_000_000), ("chr2", 2_000_000)] + bw_paths: dict[str, str] = {} + for i, sample in enumerate(["s0", "s1", "s2"]): + bw_path = tmp_path / f"{sample}.bw" + with pyBigWig.open(str(bw_path), "w") as bw: + bw.addHeader(contig_sizes, maxZooms=0) + v = float(i + 1) + bw.addEntries( + ["chr1", "chr2"], + [499_990, 17_320], + ends=[500_030, 17_340], + values=[v, v], + ) + bw_paths[sample] = str(bw_path) + out = tmp_path / "stress.gvl" + gvl.write( + path=out, + bed=source_bed, + variants=VCF(vcf_dir / "filtered_source.vcf.gz"), + tracks=gvl.BigWigs("5ss", bw_paths), + max_jitter=2, + ) + return out, reference.path + + +def test_concurrent_spawn_workers_do_not_deadlock(stress_dataset): + ds_path, ref_path = stress_dataset + ctx = mp.get_context("spawn") + for launch in range(LAUNCHES): + with ProcessPoolExecutor(max_workers=N_WORKERS, mp_context=ctx) as ex: + futs = [ + ex.submit( + _iterate_dataset, str(ds_path), str(ref_path), ITERS_PER_WORKER + ) + for _ in range(N_WORKERS) + ] + try: + results = [f.result(timeout=PER_LAUNCH_TIMEOUT_S) for f in futs] + except FTimeoutError: + pytest.fail( + f"launch {launch}: worker did not finish within " + f"{PER_LAUNCH_TIMEOUT_S}s — likely the #263 rayon deadlock." + ) + raise # unreachable (pytest.fail raises); marks branch NoReturn for pyrefly + assert all(r > 0 for r in results) diff --git a/tests/parity/test_gen_dataset_goldens.py b/tests/parity/test_gen_dataset_goldens.py index 4e6de5f8..087649ce 100644 --- a/tests/parity/test_gen_dataset_goldens.py +++ b/tests/parity/test_gen_dataset_goldens.py @@ -260,11 +260,13 @@ def test_gen_variants_af(phased_svar_gvl, reference, monkeypatch): ds = ds_base.with_seqs("variants").with_settings(min_af=0.1, max_af=0.9) except Exception as e: pytest.skip(f"AF filtering unavailable: {e}") + raise # unreachable (skip raises); tells pyrefly this branch is NoReturn try: monkeypatch.setenv("GVL_BACKEND", "numba") out_numba = ds[:, :] except KeyError as e: pytest.skip(f"AF key missing: {e}") + raise # unreachable; NoReturn marker for pyrefly monkeypatch.setenv("GVL_BACKEND", "rust") out_rust = ds[:, :] _oracle_check(out_numba, out_rust, "ds_variants_af") diff --git a/tests/parity/test_variants_dataset_parity.py b/tests/parity/test_variants_dataset_parity.py index d63b46be..e15399a0 100644 --- a/tests/parity/test_variants_dataset_parity.py +++ b/tests/parity/test_variants_dataset_parity.py @@ -89,12 +89,14 @@ def test_variants_af_filter_parity(phased_svar_gvl, reference): f"AF filtering unavailable on this dataset — skipping compact_keep " f"exercise ({type(e).__name__}: {e})" ) + raise # unreachable; NoReturn marker for pyrefly # Load golden — may not exist if AF was unavailable at generation time. try: golden = _golden.load_flat_golden("ds_variants_af") except FileNotFoundError: pytest.skip("ds_variants_af golden not generated (AF unavailable at gen time)") + raise # unreachable; NoReturn marker for pyrefly spy_fn, ck_calls, restore = _golden.make_kernel_spy("compact_keep_i32") try: diff --git a/tests/unit/test_threads.py b/tests/unit/test_threads.py index f28350a9..50f3a8af 100644 --- a/tests/unit/test_threads.py +++ b/tests/unit/test_threads.py @@ -1,5 +1,8 @@ +import math import os +import pytest + import genvarloader._threads as th @@ -48,3 +51,95 @@ def test_should_parallelize_threshold(monkeypatch): thresh = 4 * th._MIN_BYTES_PER_THREAD assert th.should_parallelize(thresh - 1) is False assert th.should_parallelize(thresh) is True + + +@pytest.mark.parametrize("val", ["1", "true", "TRUE", "yes", "on", "On"]) +def test_force_parallel_truthy(monkeypatch, val): + monkeypatch.setenv("GVL_FORCE_PARALLEL", val) + # Below the byte threshold, but forced on → parallel. + assert th.should_parallelize(0) is True + + +@pytest.mark.parametrize("val", ["0", "false", "no", "off", "", "banana"]) +def test_force_parallel_falsy_falls_back_to_threshold(monkeypatch, val): + monkeypatch.setattr(th, "_NUM_THREADS", None) + monkeypatch.delenv("GVL_NUM_THREADS", raising=False) + monkeypatch.setenv("GVL_FORCE_PARALLEL", val) + _constrain_detected_cpus(monkeypatch, 4) + # Not forced → normal size gate applies. + assert th.should_parallelize(0) is False + assert th.should_parallelize(4 * th._MIN_BYTES_PER_THREAD) is True + + +def test_force_parallel_unset(monkeypatch): + monkeypatch.delenv("GVL_FORCE_PARALLEL", raising=False) + monkeypatch.setattr(th, "_NUM_THREADS", None) + monkeypatch.delenv("GVL_NUM_THREADS", raising=False) + _constrain_detected_cpus(monkeypatch, 4) + assert th.should_parallelize(0) is False + + +def test_cgroup_quota_v2_parses_cpu_max(monkeypatch, tmp_path): + f = tmp_path / "cpu.max" + f.write_text("1530000 100000\n") # 15.3 cores + monkeypatch.setattr(th, "_CGROUP_V2_CPU_MAX", f) + monkeypatch.setattr(th, "_CGROUP_V1_QUOTA", tmp_path / "nope_quota") + monkeypatch.setattr(th, "_CGROUP_V1_PERIOD", tmp_path / "nope_period") + assert th._cgroup_cpu_quota() == math.ceil(1530000 / 100000) # 16 + + +def test_cgroup_quota_v2_max_is_none(monkeypatch, tmp_path): + f = tmp_path / "cpu.max" + f.write_text("max 100000\n") # unlimited + monkeypatch.setattr(th, "_CGROUP_V2_CPU_MAX", f) + monkeypatch.setattr(th, "_CGROUP_V1_QUOTA", tmp_path / "nope_quota") + monkeypatch.setattr(th, "_CGROUP_V1_PERIOD", tmp_path / "nope_period") + assert th._cgroup_cpu_quota() is None + + +def test_cgroup_quota_v1_fallback(monkeypatch, tmp_path): + q = tmp_path / "cfs_quota_us" + p = tmp_path / "cfs_period_us" + q.write_text("800000\n") + p.write_text("100000\n") + monkeypatch.setattr(th, "_CGROUP_V2_CPU_MAX", tmp_path / "absent") + monkeypatch.setattr(th, "_CGROUP_V1_QUOTA", q) + monkeypatch.setattr(th, "_CGROUP_V1_PERIOD", p) + assert th._cgroup_cpu_quota() == 8 + + +def test_cgroup_quota_none_when_absent(monkeypatch, tmp_path): + monkeypatch.setattr(th, "_CGROUP_V2_CPU_MAX", tmp_path / "absent1") + monkeypatch.setattr(th, "_CGROUP_V1_QUOTA", tmp_path / "absent2") + monkeypatch.setattr(th, "_CGROUP_V1_PERIOD", tmp_path / "absent3") + assert th._cgroup_cpu_quota() is None + + +def test_detect_cpus_takes_min_of_affinity_and_quota(monkeypatch, tmp_path): + _constrain_detected_cpus(monkeypatch, 16) # affinity reports 16 + monkeypatch.setattr(th, "_cgroup_cpu_quota", lambda: 15) + assert th._detect_cpus() == 15 + + +def test_detect_cpus_ignores_quota_when_none(monkeypatch): + _constrain_detected_cpus(monkeypatch, 16) + monkeypatch.setattr(th, "_cgroup_cpu_quota", lambda: None) + assert th._detect_cpus() == 16 + + +def test_cap_threads_overwrites_ambient_rayon(monkeypatch): + # An ambient RAYON_NUM_THREADS (base image) must NOT win over GVL's count. + monkeypatch.setenv("RAYON_NUM_THREADS", "16") + monkeypatch.setattr(th, "_NUM_THREADS", None) + monkeypatch.setenv("GVL_NUM_THREADS", "4") + n = th.cap_threads() + assert n == 4 + assert os.environ["RAYON_NUM_THREADS"] == "4" + + +def test_cap_threads_sets_when_unset(monkeypatch): + monkeypatch.delenv("RAYON_NUM_THREADS", raising=False) + monkeypatch.setattr(th, "_NUM_THREADS", None) + monkeypatch.setenv("GVL_NUM_THREADS", "3") + th.cap_threads() + assert os.environ["RAYON_NUM_THREADS"] == "3"