Skip to content

Parallelize chunked matrix building approach (#818)#821

Open
juaristi22 wants to merge 5 commits intomainfrom
maria/parallelizing
Open

Parallelize chunked matrix building approach (#818)#821
juaristi22 wants to merge 5 commits intomainfrom
maria/parallelizing

Conversation

@juaristi22
Copy link
Copy Markdown
Collaborator

@juaristi22 juaristi22 commented Apr 23, 2026

Fix #818.

Summary

Parallelizes UnifiedMatrixBuilder.build_matrix_chunked() across Modal workers behind an opt-in flag. Extracts the ~450-line inline chunked-matrix logic into a ChunkedMatrixAssembler coordinator class, replaces the list-then-concat final assembly with a two-pass streaming CSR build, and adds a Modal dispatch layer that fans per-chunk work across up to 50 workers.

The parallel path is opt-in (--parallel in combination with --chunked-matrix). Existing serial and non-chunked paths remain the default and are unchanged.

What changed

Phase 1 — refactor behind a coordinator (7a24547)

  • New policyengine_us_data/calibration/chunked_matrix_assembler.py:
    • SharedBuildState, ChunkPlan, ChunkResult dataclasses.
    • partition_chunks() pure helper.
    • stream_csr_from_shards() — two-pass streaming CSR assembly: pass 1 counts per-row nonzeros to compute indptr; pass 2 scatters entries into preallocated data/indices arrays. No COO intermediate, no scipy-internal doubling.
    • ChunkedMatrixAssembler class with run_chunks, run_single_chunk, assemble_final.
  • UnifiedMatrixBuilder.build_matrix_chunked() shrinks from ~450 lines to an ~80-line facade; public signature unchanged. Per-chunk loop and list-then-concat final assembly removed.
  • _build_entity_relationship lifted to module scope so SharedBuildState stays pickle-clean for cross-process dispatch.

Phase 2 — Modal dispatch (0846a54, 1607259, 9429aaf, 81268f3)

  • New policyengine_us_data/calibration/chunked_matrix_modal.py — coordinator-side dispatch. Pickles SharedBuildState to the pipeline volume, partitions chunk ids into contiguous batches, spawns workers via modal.Function.from_name, aggregates per-chunk errors, streams the final CSR after workers finish.
  • New modal_app/matrix_chunk_worker.pybuild_matrix_chunk_worker @app.function registered on _calibration_app. pipeline.py imports the worker and calls app.include(_calibration_app), so a single modal deploy modal_app/pipeline.py registers it under policyengine-us-data-pipeline.
  • UnifiedMatrixBuilder.build_matrix_chunked() gains parallel, num_matrix_workers, run_id kwargs — all default off/50/empty.
  • unified_calibration CLI gains --parallel / --no-parallel and --num-matrix-workers. run_id threads through the POLICYENGINE_US_DATA_RUN_ID env var set by build_package_remote.
  • build_package_remote and _build_package_impl accept and forward chunked_matrix, chunk_size, parallel_matrix, num_matrix_workers to the unified_calibration subprocess.
  • modal_app/pipeline.py: run_pipeline gains matching kwargs and forwards them to build_package_remote.remote().
  • .github/workflows/pipeline.yaml: four new workflow_dispatch inputs — chunked_matrix, chunk_size, parallel_matrix, num_matrix_workers — all defaulting to off/25000/off/50.

Memory improvement

Streaming CSR assembly replaces the three-list-concat that previously held every shard in memory before building the CSR. Measured on a 5M-nnz synthetic fixture: peak RSS during assembly is 1.22× final CSR size (46.6 MiB RSS delta vs. 38.2 MiB final CSR bytes), compared with an estimated 2-3× for the old path.

Behavior by flag combination

chunked_matrix parallel_matrix Path
false false Non-chunked build_matrix (unchanged)
true false In-process serial chunked (identical to pre-PR --chunked-matrix)
false true Non-chunked; info log that --parallel was ignored
true true Modal dispatch, up to num_matrix_workers workers

Test coverage

Layer File Tests
Pure helpers + streaming CSR + assembler orchestration tests/unit/calibration/test_chunked_matrix_assembler.py 11
Dispatch layer with fake worker + fake volume tests/unit/calibration/test_chunked_matrix_modal.py 9
Fixture-scale byte-equivalence (chunked CSR ≡ non-chunked CSR) tests/integration/test_chunked_matrix_builder.py 6 (unchanged, pass against the facade)
Env-gated Modal lookup smoke tests/integration/test_matrix_chunk_worker_modal.py 2 (skipped unless MODAL_TOKEN_ID + MODAL_TOKEN_SECRET + POLICYENGINE_US_DATA_MODAL_SMOKE=1)

Full calibration unit suite: 392 passed, 7 skipped, 0 failed (up from 382 before phase 2).

Post-merge validation

The next automatic pipeline run (triggered by push.yaml's auto-generated Update package version commit) uses the default flags (off) and behaves identically to main today. To exercise the parallel path as a one-off validation run:

gh workflow run pipeline.yaml --ref main \
  -f chunked_matrix=true \
  -f parallel_matrix=true \
  -f num_matrix_workers=50

IMPORTANT: Do not trigger the pipeline run while a production job is running without specifying an alternative Modal environment as it might affect its processes.

Outstanding

  • Pre-merge Modal deploy smoke not yet run — blocked on an in-flight production run on the same Modal environment.
  • First real end-to-end parallel run will be the post-merge manual dispatch above.
  • Full-CPS wall-time measurement will be added here after that run.

Related

  • Depends on Pipeline environment fixes #819 (Modal venv-activation fixes) — merged, now on main.
  • Implementation plan: Chunked Matrix Parallelization Plan.md at repo root.
  • Long-term refactor this aligns with: US Data Pipeline Refactor.md Phase 4 (MatrixAssembler extraction).

Test plan

  • pytest tests/unit/calibration/ tests/integration/test_chunked_matrix_builder.py — 398 pass (392 unit + 6 integration), 7 skipped, 0 failed.
  • ruff check and ruff format --check clean on all changed files.
  • modal deploy modal_app/pipeline.py succeeds and registers build_matrix_chunk_worker under policyengine-us-data-pipeline.
  • POLICYENGINE_US_DATA_MODAL_SMOKE=1 pytest tests/integration/test_matrix_chunk_worker_modal.py — both tests pass.
  • One-off gh workflow run pipeline.yaml dispatch with the parallel flags completes end-to-end; output CSR matches a serial baseline; wall time documented above.

🤖 Generated with Claude Code

@juaristi22 juaristi22 marked this pull request as draft April 23, 2026 10:13
juaristi22 and others added 2 commits April 23, 2026 18:26
Phase 1 of issue #818: refactor `UnifiedMatrixBuilder.build_matrix_chunked`
behind a coordinator class so a later commit can parallelize per-chunk
work across Modal workers.

Changes:
- New `policyengine_us_data/calibration/chunked_matrix_assembler.py`
  with `SharedBuildState`, `ChunkPlan`, `ChunkResult` dataclasses and
  a `ChunkedMatrixAssembler` class exposing `run_chunks`,
  `run_single_chunk`, and `assemble_final`. The per-chunk body
  (H5 materialization, per-chunk `Microsimulation`, variable
  calculation, COO shard write) moves into `run_single_chunk`.
- Replace the list-then-concat final-assembly block with a two-pass
  streaming CSR build: pass 1 counts per-row nnz across shards to
  compute `indptr`; pass 2 scatters entries into preallocated
  `data`/`indices` arrays, avoiding the scipy COO->CSR memory peak.
  Measured peak RSS on a 5M-nnz synthetic fixture is 1.22x the final
  CSR, vs. the 2-3x peak the old path reintroduced.
- `build_matrix_chunked` becomes a ~80-line facade; target querying,
  uprating, constraint extraction, and manifest handling stay on
  `UnifiedMatrixBuilder`. Public signature unchanged; all 6 existing
  chunked-matrix integration tests pass without edits.
- Lift `_build_entity_relationship` to module scope as
  `build_entity_relationship(sim)` so `SharedBuildState` stays
  pickle-clean for cross-process dispatch in phase 2. The memoizing
  wrapper on `UnifiedMatrixBuilder` remains for non-chunked callers.
- 10 new unit tests in `tests/unit/calibration/test_chunked_matrix_assembler.py`
  cover partition correctness, streaming CSR (including a memory
  bound), resume-skip, range-mismatch rejection, and dispatcher
  routing via `run_chunks`.

The Modal parallel-dispatch function, `build_matrix_chunk_worker`
Modal function, and `--parallel` / `--num-matrix-workers` CLI flags
will land in the next commit (phase 2).

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
Phase 2 of issue #818. Builds on the phase-1 coordinator extraction
to fan chunked matrix building out across Modal workers, cutting the
full-CPS build's wall time from the ~14 h serial ceiling into a
wall-time window proportional to one chunk.

Changes:
- `policyengine_us_data/calibration/chunked_matrix_modal.py`: new
  `dispatch_chunks_modal` coordinator. Pickles `SharedBuildState` to
  `{chunk_root}/chunk_build_state.pkl` on the pipeline volume,
  partitions `range(n_chunks)` into contiguous batches, spawns one
  `build_matrix_chunk_worker` per batch via `modal.Function.from_name`,
  collects results, aggregates per-chunk errors, and streams the
  final CSR from shards on the volume. Contiguous batching (not
  round-robin) keeps resume-friendly prefixes on disk.
- `modal_app/matrix_chunk_worker.py`: new `build_matrix_chunk_worker`
  `@app.function` registered on the existing
  `policyengine-us-data-fit-weights` app (same app as
  `build_package_remote`). Mirrors `build_areas_worker` resources
  (memory=16384, cpu=1.0, timeout=28800, max_containers=50,
  nonpreemptible). Worker reads the pickled shared state, constructs
  a `ChunkedMatrixAssembler` with `resume=True`, calls `run_chunks`
  on its batch, commits the volume, returns nnz + errors.
- `SharedBuildState` grows a `chunk_size` field so a worker can
  reconstruct the assembler from the pickle alone (no extra args).
- `build_matrix_chunked` accepts `parallel`, `num_matrix_workers`,
  `run_id`; routes to `dispatch_chunks_modal` when `parallel=True`,
  preserves the in-process serial path otherwise.
- `unified_calibration` CLI gains `--parallel` (default off) and
  `--num-matrix-workers` (default 50). `--parallel` without
  `--chunked-matrix` logs an info message and runs the non-chunked
  path unchanged. `run_id` flows from env
  `POLICYENGINE_US_DATA_RUN_ID`, set by `build_package_remote`.
- `build_package_remote` / `_build_package_impl` take
  `chunked_matrix`, `chunk_size`, `parallel_matrix`,
  `num_matrix_workers` kwargs and forward them to the
  `unified_calibration` subprocess.

Tests:
- 9 unit tests in `tests/unit/calibration/test_chunked_matrix_modal.py`
  cover `partition_chunk_ids_contiguous` (exact division, remainder,
  more workers than chunks, zero chunks, invalid num_workers) and
  `dispatch_chunks_modal` with injected `worker_function` + `volume`
  fakes (spawn/assemble happy path, zero-chunks short-circuit, error
  aggregation, shared-state pickle side effect).
- New `test_shared_build_state_roundtrips_pickle` in the assembler
  unit tests guards the phase-2 boundary: if `SharedBuildState`
  stops pickling cleanly, we catch it here rather than in a Modal
  worker. 11 assembler unit tests total.
- `tests/integration/test_matrix_chunk_worker_modal.py` is an
  env-gated smoke test (MODAL_TOKEN_ID + MODAL_TOKEN_SECRET +
  POLICYENGINE_US_DATA_MODAL_SMOKE=1) that validates the deployed
  worker is lookupable via `modal.Function.from_name` and that
  production-scale batching is sane. Full end-to-end validation
  (write shared state, spawn, verify shards on the volume) is a
  pre-merge manual step documented in the PR.

Verified:
- 392 calibration unit tests pass (up from 382; 10 new phase-2
  tests; 0 regressions), plus the 6 existing chunked-matrix
  integration tests still green against the phase-1 facade.
- `ruff check` and `ruff format --check` clean on all changed files.

Validation still pending on real Modal (deploy + one-chunk
benchmark + full-CPS `workflow_dispatch` run); gated on the Modal
venv-activation PR landing.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
@juaristi22 juaristi22 force-pushed the maria/parallelizing branch from 3de7d81 to 0846a54 Compare April 23, 2026 13:00
juaristi22 and others added 3 commits April 23, 2026 19:01
Closes the wiring gap between issue #818's CLI flags and the
pipeline orchestrator. `run_pipeline` now accepts `chunked_matrix`,
`chunk_size`, `parallel_matrix`, and `num_matrix_workers` kwargs
and forwards them to `build_package_remote.remote()`. All four
default off/50, so the auto-triggered pipeline run on the next
`Update package version` commit after this PR merges will continue
to use the existing non-chunked path.

`pipeline.yaml`'s `workflow_dispatch` exposes the same four knobs
(defaults also off/50). To trigger the one-off chunked-parallel
validation run after this PR merges:

    gh workflow run pipeline.yaml \
      -f chunked_matrix=true \
      -f parallel_matrix=true \
      -f num_matrix_workers=50

Subsequent automatic pipeline runs (on version-bump commits) pick
up the defaults and stay non-chunked until someone dispatches
manually with the flags on again.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
Without this import, ``modal deploy modal_app/pipeline.py`` (run by
``pipeline.yaml`` on dispatch) would skip ``build_matrix_chunk_worker``
because it lives in its own module — ``_calibration_app`` only knows
about functions that have been loaded. Importing the worker here runs
its ``@app.function`` decorator at module load, so
``app.include(_calibration_app)`` picks it up and the worker is
deployed alongside ``build_package_remote``. Without it,
``modal.Function.from_name`` in ``dispatch_chunks_modal`` would fail
at runtime on the first parallel-matrix attempt.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
``dispatch_chunks_modal._lookup_worker_function`` was looking the
worker up under ``policyengine-us-data-fit-weights`` — the name
declared on ``_calibration_app`` in
``modal_app/remote_calibration_runner.py`` where the worker's
``@app.function`` decorator attaches at the Python level. That is
not the name the function is registered under in Modal's registry.

``modal_app/pipeline.py`` merges the fit-weights sub-app into the
pipeline app via ``app.include(_calibration_app)`` before deploy.
After ``modal deploy modal_app/pipeline.py`` (what both the
``pipeline.yaml`` dispatch step and the ``pr.yaml`` preview step
run), Modal's registry only knows about
``policyengine-us-data-pipeline`` — there is no independent
``policyengine-us-data-fit-weights`` entry. A ``Function.from_name``
call with the sub-app's name would raise at runtime on the first
parallel-matrix invocation.

Caught by inspecting ``modal app list`` on a deploy: the fit-weights
app is absent; only ``policyengine-us-data-pipeline`` (and the
dataset-only ``policyengine-us-data`` ephemeral from
``push.yaml``'s ``modal run modal_app/data_build.py``) appear.

Changes:
- ``MODAL_APP_NAME`` now ``policyengine-us-data-pipeline``, with a
  comment pointing at the ``app.include`` hop that makes this
  necessary.
- ``matrix_chunk_worker.py`` docstring rewritten to explain the
  two-level naming (Python-object name vs registry name after
  ``app.include``).
- ``test_matrix_chunk_worker_modal.py`` looks up under the pipeline
  app name and updates the "Deploy first with" instruction to
  ``modal deploy modal_app/pipeline.py`` (the correct deploy path —
  the previous instruction would have created an orphan fit-weights
  registration that production code never looks up).

Unit tests (which inject a fake ``worker_function``) are unaffected
and still pass. The env-gated integration smoke was previously
guaranteed to fail on first real deploy; it can now pass.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
@juaristi22 juaristi22 marked this pull request as ready for review April 23, 2026 16:58
@juaristi22 juaristi22 requested a review from anth-volk April 23, 2026 16:58
@juaristi22
Copy link
Copy Markdown
Collaborator Author

Ready for review but not merge until smoke testing modal deployment.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

Parallelize chunked matrix building behind a coordinator class (follow-up to #753)

1 participant