Parallelize chunked matrix building approach (#818)#821
Open
juaristi22 wants to merge 5 commits intomainfrom
Open
Parallelize chunked matrix building approach (#818)#821juaristi22 wants to merge 5 commits intomainfrom
juaristi22 wants to merge 5 commits intomainfrom
Conversation
Phase 1 of issue #818: refactor `UnifiedMatrixBuilder.build_matrix_chunked` behind a coordinator class so a later commit can parallelize per-chunk work across Modal workers. Changes: - New `policyengine_us_data/calibration/chunked_matrix_assembler.py` with `SharedBuildState`, `ChunkPlan`, `ChunkResult` dataclasses and a `ChunkedMatrixAssembler` class exposing `run_chunks`, `run_single_chunk`, and `assemble_final`. The per-chunk body (H5 materialization, per-chunk `Microsimulation`, variable calculation, COO shard write) moves into `run_single_chunk`. - Replace the list-then-concat final-assembly block with a two-pass streaming CSR build: pass 1 counts per-row nnz across shards to compute `indptr`; pass 2 scatters entries into preallocated `data`/`indices` arrays, avoiding the scipy COO->CSR memory peak. Measured peak RSS on a 5M-nnz synthetic fixture is 1.22x the final CSR, vs. the 2-3x peak the old path reintroduced. - `build_matrix_chunked` becomes a ~80-line facade; target querying, uprating, constraint extraction, and manifest handling stay on `UnifiedMatrixBuilder`. Public signature unchanged; all 6 existing chunked-matrix integration tests pass without edits. - Lift `_build_entity_relationship` to module scope as `build_entity_relationship(sim)` so `SharedBuildState` stays pickle-clean for cross-process dispatch in phase 2. The memoizing wrapper on `UnifiedMatrixBuilder` remains for non-chunked callers. - 10 new unit tests in `tests/unit/calibration/test_chunked_matrix_assembler.py` cover partition correctness, streaming CSR (including a memory bound), resume-skip, range-mismatch rejection, and dispatcher routing via `run_chunks`. The Modal parallel-dispatch function, `build_matrix_chunk_worker` Modal function, and `--parallel` / `--num-matrix-workers` CLI flags will land in the next commit (phase 2). Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
Phase 2 of issue #818. Builds on the phase-1 coordinator extraction to fan chunked matrix building out across Modal workers, cutting the full-CPS build's wall time from the ~14 h serial ceiling into a wall-time window proportional to one chunk. Changes: - `policyengine_us_data/calibration/chunked_matrix_modal.py`: new `dispatch_chunks_modal` coordinator. Pickles `SharedBuildState` to `{chunk_root}/chunk_build_state.pkl` on the pipeline volume, partitions `range(n_chunks)` into contiguous batches, spawns one `build_matrix_chunk_worker` per batch via `modal.Function.from_name`, collects results, aggregates per-chunk errors, and streams the final CSR from shards on the volume. Contiguous batching (not round-robin) keeps resume-friendly prefixes on disk. - `modal_app/matrix_chunk_worker.py`: new `build_matrix_chunk_worker` `@app.function` registered on the existing `policyengine-us-data-fit-weights` app (same app as `build_package_remote`). Mirrors `build_areas_worker` resources (memory=16384, cpu=1.0, timeout=28800, max_containers=50, nonpreemptible). Worker reads the pickled shared state, constructs a `ChunkedMatrixAssembler` with `resume=True`, calls `run_chunks` on its batch, commits the volume, returns nnz + errors. - `SharedBuildState` grows a `chunk_size` field so a worker can reconstruct the assembler from the pickle alone (no extra args). - `build_matrix_chunked` accepts `parallel`, `num_matrix_workers`, `run_id`; routes to `dispatch_chunks_modal` when `parallel=True`, preserves the in-process serial path otherwise. - `unified_calibration` CLI gains `--parallel` (default off) and `--num-matrix-workers` (default 50). `--parallel` without `--chunked-matrix` logs an info message and runs the non-chunked path unchanged. `run_id` flows from env `POLICYENGINE_US_DATA_RUN_ID`, set by `build_package_remote`. - `build_package_remote` / `_build_package_impl` take `chunked_matrix`, `chunk_size`, `parallel_matrix`, `num_matrix_workers` kwargs and forward them to the `unified_calibration` subprocess. Tests: - 9 unit tests in `tests/unit/calibration/test_chunked_matrix_modal.py` cover `partition_chunk_ids_contiguous` (exact division, remainder, more workers than chunks, zero chunks, invalid num_workers) and `dispatch_chunks_modal` with injected `worker_function` + `volume` fakes (spawn/assemble happy path, zero-chunks short-circuit, error aggregation, shared-state pickle side effect). - New `test_shared_build_state_roundtrips_pickle` in the assembler unit tests guards the phase-2 boundary: if `SharedBuildState` stops pickling cleanly, we catch it here rather than in a Modal worker. 11 assembler unit tests total. - `tests/integration/test_matrix_chunk_worker_modal.py` is an env-gated smoke test (MODAL_TOKEN_ID + MODAL_TOKEN_SECRET + POLICYENGINE_US_DATA_MODAL_SMOKE=1) that validates the deployed worker is lookupable via `modal.Function.from_name` and that production-scale batching is sane. Full end-to-end validation (write shared state, spawn, verify shards on the volume) is a pre-merge manual step documented in the PR. Verified: - 392 calibration unit tests pass (up from 382; 10 new phase-2 tests; 0 regressions), plus the 6 existing chunked-matrix integration tests still green against the phase-1 facade. - `ruff check` and `ruff format --check` clean on all changed files. Validation still pending on real Modal (deploy + one-chunk benchmark + full-CPS `workflow_dispatch` run); gated on the Modal venv-activation PR landing. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
3de7d81 to
0846a54
Compare
Closes the wiring gap between issue #818's CLI flags and the pipeline orchestrator. `run_pipeline` now accepts `chunked_matrix`, `chunk_size`, `parallel_matrix`, and `num_matrix_workers` kwargs and forwards them to `build_package_remote.remote()`. All four default off/50, so the auto-triggered pipeline run on the next `Update package version` commit after this PR merges will continue to use the existing non-chunked path. `pipeline.yaml`'s `workflow_dispatch` exposes the same four knobs (defaults also off/50). To trigger the one-off chunked-parallel validation run after this PR merges: gh workflow run pipeline.yaml \ -f chunked_matrix=true \ -f parallel_matrix=true \ -f num_matrix_workers=50 Subsequent automatic pipeline runs (on version-bump commits) pick up the defaults and stay non-chunked until someone dispatches manually with the flags on again. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
Without this import, ``modal deploy modal_app/pipeline.py`` (run by ``pipeline.yaml`` on dispatch) would skip ``build_matrix_chunk_worker`` because it lives in its own module — ``_calibration_app`` only knows about functions that have been loaded. Importing the worker here runs its ``@app.function`` decorator at module load, so ``app.include(_calibration_app)`` picks it up and the worker is deployed alongside ``build_package_remote``. Without it, ``modal.Function.from_name`` in ``dispatch_chunks_modal`` would fail at runtime on the first parallel-matrix attempt. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
``dispatch_chunks_modal._lookup_worker_function`` was looking the worker up under ``policyengine-us-data-fit-weights`` — the name declared on ``_calibration_app`` in ``modal_app/remote_calibration_runner.py`` where the worker's ``@app.function`` decorator attaches at the Python level. That is not the name the function is registered under in Modal's registry. ``modal_app/pipeline.py`` merges the fit-weights sub-app into the pipeline app via ``app.include(_calibration_app)`` before deploy. After ``modal deploy modal_app/pipeline.py`` (what both the ``pipeline.yaml`` dispatch step and the ``pr.yaml`` preview step run), Modal's registry only knows about ``policyengine-us-data-pipeline`` — there is no independent ``policyengine-us-data-fit-weights`` entry. A ``Function.from_name`` call with the sub-app's name would raise at runtime on the first parallel-matrix invocation. Caught by inspecting ``modal app list`` on a deploy: the fit-weights app is absent; only ``policyengine-us-data-pipeline`` (and the dataset-only ``policyengine-us-data`` ephemeral from ``push.yaml``'s ``modal run modal_app/data_build.py``) appear. Changes: - ``MODAL_APP_NAME`` now ``policyengine-us-data-pipeline``, with a comment pointing at the ``app.include`` hop that makes this necessary. - ``matrix_chunk_worker.py`` docstring rewritten to explain the two-level naming (Python-object name vs registry name after ``app.include``). - ``test_matrix_chunk_worker_modal.py`` looks up under the pipeline app name and updates the "Deploy first with" instruction to ``modal deploy modal_app/pipeline.py`` (the correct deploy path — the previous instruction would have created an orphan fit-weights registration that production code never looks up). Unit tests (which inject a fake ``worker_function``) are unaffected and still pass. The env-gated integration smoke was previously guaranteed to fail on first real deploy; it can now pass. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
Collaborator
Author
|
Ready for review but not merge until smoke testing modal deployment. |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Add this suggestion to a batch that can be applied as a single commit.This suggestion is invalid because no changes were made to the code.Suggestions cannot be applied while the pull request is closed.Suggestions cannot be applied while viewing a subset of changes.Only one suggestion per line can be applied in a batch.Add this suggestion to a batch that can be applied as a single commit.Applying suggestions on deleted lines is not supported.You must change the existing code in this line in order to create a valid suggestion.Outdated suggestions cannot be applied.This suggestion has been applied or marked resolved.Suggestions cannot be applied from pending reviews.Suggestions cannot be applied on multi-line comments.Suggestions cannot be applied while the pull request is queued to merge.Suggestion cannot be applied right now. Please check back later.
Fix #818.
Summary
Parallelizes
UnifiedMatrixBuilder.build_matrix_chunked()across Modal workers behind an opt-in flag. Extracts the ~450-line inline chunked-matrix logic into aChunkedMatrixAssemblercoordinator class, replaces the list-then-concat final assembly with a two-pass streaming CSR build, and adds a Modal dispatch layer that fans per-chunk work across up to 50 workers.The parallel path is opt-in (
--parallelin combination with--chunked-matrix). Existing serial and non-chunked paths remain the default and are unchanged.What changed
Phase 1 — refactor behind a coordinator (7a24547)
policyengine_us_data/calibration/chunked_matrix_assembler.py:SharedBuildState,ChunkPlan,ChunkResultdataclasses.partition_chunks()pure helper.stream_csr_from_shards()— two-pass streaming CSR assembly: pass 1 counts per-row nonzeros to computeindptr; pass 2 scatters entries into preallocateddata/indicesarrays. No COO intermediate, no scipy-internal doubling.ChunkedMatrixAssemblerclass withrun_chunks,run_single_chunk,assemble_final.UnifiedMatrixBuilder.build_matrix_chunked()shrinks from ~450 lines to an ~80-line facade; public signature unchanged. Per-chunk loop and list-then-concat final assembly removed._build_entity_relationshiplifted to module scope soSharedBuildStatestays pickle-clean for cross-process dispatch.Phase 2 — Modal dispatch (0846a54, 1607259, 9429aaf, 81268f3)
policyengine_us_data/calibration/chunked_matrix_modal.py— coordinator-side dispatch. PicklesSharedBuildStateto the pipeline volume, partitions chunk ids into contiguous batches, spawns workers viamodal.Function.from_name, aggregates per-chunk errors, streams the final CSR after workers finish.modal_app/matrix_chunk_worker.py—build_matrix_chunk_worker@app.functionregistered on_calibration_app.pipeline.pyimports the worker and callsapp.include(_calibration_app), so a singlemodal deploy modal_app/pipeline.pyregisters it underpolicyengine-us-data-pipeline.UnifiedMatrixBuilder.build_matrix_chunked()gainsparallel,num_matrix_workers,run_idkwargs — all default off/50/empty.unified_calibrationCLI gains--parallel/--no-paralleland--num-matrix-workers.run_idthreads through thePOLICYENGINE_US_DATA_RUN_IDenv var set bybuild_package_remote.build_package_remoteand_build_package_implaccept and forwardchunked_matrix,chunk_size,parallel_matrix,num_matrix_workersto theunified_calibrationsubprocess.modal_app/pipeline.py:run_pipelinegains matching kwargs and forwards them tobuild_package_remote.remote()..github/workflows/pipeline.yaml: four newworkflow_dispatchinputs —chunked_matrix,chunk_size,parallel_matrix,num_matrix_workers— all defaulting to off/25000/off/50.Memory improvement
Streaming CSR assembly replaces the three-list-concat that previously held every shard in memory before building the CSR. Measured on a 5M-nnz synthetic fixture: peak RSS during assembly is 1.22× final CSR size (46.6 MiB RSS delta vs. 38.2 MiB final CSR bytes), compared with an estimated 2-3× for the old path.
Behavior by flag combination
chunked_matrixparallel_matrixfalsefalsebuild_matrix(unchanged)truefalse--chunked-matrix)falsetrue--parallelwas ignoredtruetruenum_matrix_workersworkersTest coverage
tests/unit/calibration/test_chunked_matrix_assembler.pytests/unit/calibration/test_chunked_matrix_modal.pytests/integration/test_chunked_matrix_builder.pytests/integration/test_matrix_chunk_worker_modal.pyMODAL_TOKEN_ID+MODAL_TOKEN_SECRET+POLICYENGINE_US_DATA_MODAL_SMOKE=1)Full calibration unit suite: 392 passed, 7 skipped, 0 failed (up from 382 before phase 2).
Post-merge validation
The next automatic pipeline run (triggered by
push.yaml's auto-generatedUpdate package versioncommit) uses the default flags (off) and behaves identically to main today. To exercise the parallel path as a one-off validation run:IMPORTANT: Do not trigger the pipeline run while a production job is running without specifying an alternative Modal environment as it might affect its processes.
Outstanding
Related
Chunked Matrix Parallelization Plan.mdat repo root.US Data Pipeline Refactor.mdPhase 4 (MatrixAssemblerextraction).Test plan
pytest tests/unit/calibration/ tests/integration/test_chunked_matrix_builder.py— 398 pass (392 unit + 6 integration), 7 skipped, 0 failed.ruff checkandruff format --checkclean on all changed files.modal deploy modal_app/pipeline.pysucceeds and registersbuild_matrix_chunk_workerunderpolicyengine-us-data-pipeline.POLICYENGINE_US_DATA_MODAL_SMOKE=1 pytest tests/integration/test_matrix_chunk_worker_modal.py— both tests pass.gh workflow run pipeline.yamldispatch with the parallel flags completes end-to-end; output CSR matches a serial baseline; wall time documented above.🤖 Generated with Claude Code