Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
24 changes: 24 additions & 0 deletions .github/workflows/pipeline.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,22 @@ on:
description: "Override version (default: read from pyproject.toml)"
default: ""
type: string
chunked_matrix:
description: "Build the calibration matrix in chunks (opt-in)"
default: false
type: boolean
chunk_size:
description: "Clone-household columns per chunk"
default: "25000"
type: string
parallel_matrix:
description: "Fan chunked matrix building across Modal workers"
default: false
type: boolean
num_matrix_workers:
description: "Number of Modal workers for parallel matrix build"
default: "50"
type: string

concurrency:
group: pipeline-main
Expand Down Expand Up @@ -68,6 +84,10 @@ jobs:
SKIP_NATIONAL="${{ inputs.skip_national || 'false' }}"
RESUME_RUN_ID="${{ inputs.resume_run_id || '' }}"
VERSION_OVERRIDE="${{ inputs.version_override || '' }}"
CHUNKED_MATRIX="${{ inputs.chunked_matrix || 'false' }}"
CHUNK_SIZE="${{ inputs.chunk_size || '25000' }}"
PARALLEL_MATRIX="${{ inputs.parallel_matrix || 'false' }}"
NUM_MATRIX_WORKERS="${{ inputs.num_matrix_workers || '50' }}"

python -c "
import modal
Expand All @@ -81,6 +101,10 @@ jobs:
skip_national='${SKIP_NATIONAL}' == 'true',
resume_run_id='${RESUME_RUN_ID}' or None,
version_override='${VERSION_OVERRIDE}' or '',
chunked_matrix='${CHUNKED_MATRIX}' == 'true',
chunk_size=int('${CHUNK_SIZE}'),
parallel_matrix='${PARALLEL_MATRIX}' == 'true',
num_matrix_workers=int('${NUM_MATRIX_WORKERS}'),
)
print(f'Pipeline spawned.')
print(f'Function call ID: {fc.object_id}')
Expand Down
3 changes: 3 additions & 0 deletions changelog.d/818.changed.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
Extract `ChunkedMatrixAssembler` from `UnifiedMatrixBuilder.build_matrix_chunked` and replace the list-then-concat final assembly with a two-pass streaming CSR build. The facade signature and all existing chunked-matrix behaviour are unchanged.

Parallelize chunked matrix building across Modal workers. Adds `dispatch_chunks_modal` (`policyengine_us_data/calibration/chunked_matrix_modal.py`) that pickles `SharedBuildState` to the pipeline volume, fans contiguous chunk-id batches to `build_matrix_chunk_worker` (`modal_app/matrix_chunk_worker.py`, registered on the `policyengine-us-data-fit-weights` app), and streams the final CSR from shards on the volume. New CLI flags on `unified_calibration`: `--parallel` (default off) and `--num-matrix-workers` (default 50). `build_package_remote` threads `run_id` to the subprocess via the `POLICYENGINE_US_DATA_RUN_ID` env var and forwards `--parallel` / `--num-matrix-workers` when `parallel_matrix=True`. `--parallel` without `--chunked-matrix` logs an info message and runs the non-chunked path unchanged.
119 changes: 119 additions & 0 deletions modal_app/matrix_chunk_worker.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,119 @@
"""Modal worker that materializes a batch of matrix chunks.

The ``@app.function`` decorator here attaches to ``_calibration_app``
(declared as ``policyengine-us-data-fit-weights`` in
``remote_calibration_runner.py``), alongside ``build_package_remote``.
At deploy time, ``modal_app/pipeline.py`` merges that app into the
pipeline app via ``app.include(_calibration_app)``, so after
``modal deploy modal_app/pipeline.py`` the function is registered
under ``policyengine-us-data-pipeline`` in Modal's registry — that's
the name ``dispatch_chunks_modal`` uses in its
``modal.Function.from_name`` lookup.

Each worker reads the shared ``ChunkedMatrixAssembler`` state from
``pipeline_volume``, materializes its assigned chunks to COO shard
files on the volume, and commits. The coordinator reads the shards
back after all workers finish and streams them into the final CSR
matrix.
"""

from __future__ import annotations

import pickle
import sys
import traceback
from pathlib import Path
from typing import Dict, List

_baked = "/root/policyengine-us-data"
_local = str(Path(__file__).resolve().parent.parent)
for _p in (_baked, _local):
if _p not in sys.path:
sys.path.insert(0, _p)

from modal_app.images import cpu_image # noqa: E402
from modal_app.remote_calibration_runner import ( # noqa: E402
PIPELINE_MOUNT,
app,
hf_secret,
pipeline_vol,
)


def _chunk_root(run_id: str) -> str:
return f"{PIPELINE_MOUNT}/artifacts/{run_id}/matrix_build"


@app.function(
image=cpu_image,
secrets=[hf_secret],
volumes={PIPELINE_MOUNT: pipeline_vol},
memory=16384,
cpu=1.0,
timeout=28800,
max_containers=50,
nonpreemptible=True,
)
def build_matrix_chunk_worker(run_id: str, chunk_ids: List[int]) -> Dict:
"""Materialize ``chunk_ids`` from the pickled ``SharedBuildState``.

Args:
run_id: Pipeline run identifier; selects the volume path for
this worker's shared state and shard output directory.
chunk_ids: Chunk indices this worker is responsible for.

Returns:
Dict with ``chunk_ids``, ``nnz_per_chunk``, and ``errors``
lists suitable for the coordinator to aggregate.
"""
from policyengine_us_data.calibration.chunked_matrix_assembler import (
ChunkedMatrixAssembler,
)

pipeline_vol.reload()
chunk_root = Path(_chunk_root(run_id))
state_path = chunk_root / "chunk_build_state.pkl"
if not state_path.exists():
return {
"chunk_ids": list(chunk_ids),
"nnz_per_chunk": [],
"errors": [
{
"chunk_ids": list(chunk_ids),
"error": f"Missing shared state at {state_path}",
}
],
}

with open(state_path, "rb") as f:
shared_state = pickle.load(f)

assembler = ChunkedMatrixAssembler(
shared_state=shared_state,
chunk_root=chunk_root,
chunk_size=shared_state.chunk_size,
resume=True,
keep_chunks=False,
)

errors: List[Dict] = []
nnz_per_chunk: List[int] = []
for chunk_id in chunk_ids:
try:
result = assembler.run_single_chunk(chunk_id)
nnz_per_chunk.append(result.nnz)
except Exception as exc:
errors.append(
{
"chunk_id": chunk_id,
"error": str(exc),
"traceback": traceback.format_exc(),
}
)

pipeline_vol.commit()
return {
"chunk_ids": list(chunk_ids),
"nnz_per_chunk": nnz_per_chunk,
"errors": errors,
}
24 changes: 24 additions & 0 deletions modal_app/pipeline.py
Original file line number Diff line number Diff line change
Expand Up @@ -284,6 +284,13 @@ def _record_step(
PACKAGE_GPU_FUNCTIONS,
)

# Import registers ``build_matrix_chunk_worker`` on ``_calibration_app``
# so a single ``modal deploy modal_app/pipeline.py`` also deploys the
# worker via ``app.include(_calibration_app)`` below. Without this the
# dispatch layer's ``modal.Function.from_name`` lookup would fail at
# runtime.
from modal_app.matrix_chunk_worker import build_matrix_chunk_worker # noqa: F401

app.include(_calibration_app)

from modal_app.local_area import app as _local_area_app
Expand Down Expand Up @@ -681,6 +688,10 @@ def run_pipeline(
resume_run_id: str = None,
clear_checkpoints: bool = False,
version_override: str = "",
chunked_matrix: bool = False,
chunk_size: int = 25_000,
parallel_matrix: bool = False,
num_matrix_workers: int = 50,
) -> str:
"""Run the full pipeline end-to-end.

Expand All @@ -699,6 +710,15 @@ def run_pipeline(
scoped by commit SHA, so stale ones from other commits
are cleaned automatically. Use True only to force a
full rebuild of the current commit.
chunked_matrix: Build the calibration matrix in clone-household
chunks instead of the non-chunked path. Opt-in; default off.
chunk_size: Clone-household columns per chunk when
``chunked_matrix`` is True.
parallel_matrix: Fan chunked matrix building across Modal
workers via ``build_matrix_chunk_worker``. Only meaningful
when ``chunked_matrix`` is True; ignored otherwise.
num_matrix_workers: Number of Modal workers when
``parallel_matrix`` is True.

Returns:
The run ID for use with promote.
Expand Down Expand Up @@ -832,6 +852,10 @@ def run_pipeline(
workers=num_workers,
n_clones=n_clones,
run_id=run_id,
chunked_matrix=chunked_matrix,
chunk_size=chunk_size,
parallel_matrix=parallel_matrix,
num_matrix_workers=num_matrix_workers,
)
print(f" Package at: {pkg_path}")

Expand Down
30 changes: 29 additions & 1 deletion modal_app/remote_calibration_runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -363,6 +363,10 @@ def _build_package_impl(
workers: int = 8,
n_clones: int = 430,
run_id: str = "",
chunked_matrix: bool = False,
chunk_size: int = 25_000,
parallel_matrix: bool = False,
num_matrix_workers: int = 50,
) -> str:
"""Read data from pipeline volume, build X matrix, save package."""
_setup_repo()
Expand Down Expand Up @@ -401,10 +405,26 @@ def _build_package_impl(
if workers > 1:
cmd.extend(["--workers", str(workers)])
cmd.extend(["--n-clones", str(n_clones)])
if chunked_matrix:
cmd.extend(["--chunked-matrix", "--chunk-size", str(chunk_size)])
if parallel_matrix:
cmd.extend(
[
"--parallel",
"--num-matrix-workers",
str(num_matrix_workers),
]
)

build_env = os.environ.copy()
if run_id:
# ``unified_calibration.py`` reads this env var so workers can
# locate their shared state at {pipeline-artifacts}/{run_id}/
# matrix_build/chunk_build_state.pkl on the pipeline volume.
build_env["POLICYENGINE_US_DATA_RUN_ID"] = run_id
build_rc, build_lines = _run_streaming(
cmd,
env=os.environ.copy(),
env=build_env,
label="build",
)
if build_rc != 0:
Expand Down Expand Up @@ -443,6 +463,10 @@ def build_package_remote(
workers: int = 8,
n_clones: int = 430,
run_id: str = "",
chunked_matrix: bool = False,
chunk_size: int = 25_000,
parallel_matrix: bool = False,
num_matrix_workers: int = 50,
) -> str:
return _build_package_impl(
branch,
Expand All @@ -451,6 +475,10 @@ def build_package_remote(
workers=workers,
n_clones=n_clones,
run_id=run_id,
chunked_matrix=chunked_matrix,
chunk_size=chunk_size,
parallel_matrix=parallel_matrix,
num_matrix_workers=num_matrix_workers,
)


Expand Down
Loading
Loading