diff --git a/src/microplex/calibration/__init__.py b/src/microplex/calibration/__init__.py index c5ebc3a..4cf4379 100644 --- a/src/microplex/calibration/__init__.py +++ b/src/microplex/calibration/__init__.py @@ -84,6 +84,84 @@ def _build_linear_constraint_system( return A, b, names, n_categorical +def _build_sparse_constraint_system( + data: pd.DataFrame, + marginal_targets: dict[str, dict[str, float]], + continuous_targets: dict[str, float] | None, + linear_constraints: tuple[LinearConstraint, ...], +): + """Build a CSR linear system without the dense `np.vstack` intermediate. + + Returns ``(X_csr, b, names, n_categorical)`` with exactly the same + semantics as ``_build_linear_constraint_system`` but the matrix is + `scipy.sparse.csr_matrix` built row-by-row, never allocating a + dense float64 array of shape ``(n_targets, n_records)``. At v7 + scale (~1.5M records x ~4k constraints) this is the difference + between ~24 GB dense peak and ~500 MB sparse CSR. + + Marginal-target rows are stored via row-indexed integer lookup + (only the `n_matching_rows` entries with value 1.0 are kept). + Continuous-target rows are dense in the record dimension but still + built without stacking. `LinearConstraint` rows are passed through + with `.nonzero()` filtering so explicit-zero coefficients don't + balloon the CSR. + """ + from scipy import sparse as _sp + + indptr: list[int] = [0] + indices: list[np.ndarray] = [] + data_vals: list[np.ndarray] = [] + targets: list[float] = [] + names: list[str] = [] + n_categorical = 0 + n_records = len(data) + + def _append_row(col_indices: np.ndarray, values: np.ndarray) -> None: + indices.append(col_indices.astype(np.int64, copy=False)) + data_vals.append(values.astype(np.float64, copy=False)) + indptr.append(indptr[-1] + len(col_indices)) + + for var, var_targets in marginal_targets.items(): + column = data[var].to_numpy() + for category, target in var_targets.items(): + match = np.flatnonzero(column == category) + _append_row(match, np.ones(len(match), dtype=np.float64)) + targets.append(float(target)) + names.append(f"{var}={category}") + n_categorical += 1 + + for var, target in (continuous_targets or {}).items(): + values = data[var].to_numpy(dtype=np.float64, copy=False) + nz = np.flatnonzero(values != 0.0) + _append_row(nz, values[nz]) + targets.append(float(target)) + names.append(var) + + for constraint in linear_constraints: + coef = np.asarray(constraint.coefficients, dtype=np.float64) + nz = np.flatnonzero(coef != 0.0) + _append_row(nz, coef[nz]) + targets.append(float(constraint.target)) + names.append(constraint.name) + + if not names: + X_csr = _sp.csr_matrix((0, n_records), dtype=np.float64) + b = np.zeros(0, dtype=np.float64) + return X_csr, b, names, n_categorical + + X_csr = _sp.csr_matrix( + ( + np.concatenate(data_vals) if data_vals else np.zeros(0), + np.concatenate(indices) if indices else np.zeros(0, dtype=np.int64), + np.asarray(indptr, dtype=np.int64), + ), + shape=(len(names), n_records), + dtype=np.float64, + ) + b = np.asarray(targets, dtype=np.float64) + return X_csr, b, names, n_categorical + + def _validate_calibration_inputs( data: pd.DataFrame, marginal_targets: dict[str, dict[str, float]], diff --git a/tests/test_sparse_constraint_system.py b/tests/test_sparse_constraint_system.py new file mode 100644 index 0000000..515d03a --- /dev/null +++ b/tests/test_sparse_constraint_system.py @@ -0,0 +1,169 @@ +"""Sparse build path for the linear constraint system. + +`_build_linear_constraint_system` currently materializes a dense numpy +`A` of shape `(n_targets, n_records)` via `np.vstack(rows)`. At +microplex-us v7 scale (1.5M records x ~4k+ constraints, mostly +marginal indicators that are 95 %+ zero) this dense materialization +eats ~24 GB and pushes the process past jetsam. Downstream L0 +calibrators then immediately convert to CSR via `sp.csr_matrix(A)`, +which already produces the correct sparse result — we just got there +through a dense intermediate that wastes the memory. + +These tests pin a sparse-native builder (`_build_sparse_constraint_system`) +that returns `(X_csr, b, names, n_categorical)` without ever +materializing the dense matrix. Semantics must agree with the dense +version up to float32 rounding. +""" + +from __future__ import annotations + +import numpy as np +import pandas as pd +import pytest + +pytest.importorskip("scipy") + +from scipy import sparse + +from microplex.calibration import ( + LinearConstraint, + _build_linear_constraint_system, +) + + +def _toy_frame(n: int = 1000, seed: int = 0) -> pd.DataFrame: + rng = np.random.default_rng(seed) + return pd.DataFrame( + { + "state": rng.choice(["CA", "NY", "TX", "FL"], size=n), + "age_group": rng.choice(["0-17", "18-64", "65+"], size=n), + "income": rng.lognormal(10, 1, size=n), + } + ) + + +class TestSparseBuilderExists: + """The sparse builder must be importable from microplex.calibration.""" + + def test_sparse_builder_is_importable(self) -> None: + from microplex.calibration import _build_sparse_constraint_system + + assert callable(_build_sparse_constraint_system) + + +class TestSparseBuilderMatchesDense: + """Sparse builder output must equal dense builder output numerically.""" + + def _build_both(self, data, marginal, continuous, linear): + from microplex.calibration import _build_sparse_constraint_system + + A_dense, b_dense, names_dense, n_cat_dense = ( + _build_linear_constraint_system( + data, marginal, continuous, linear + ) + ) + X_sparse, b_sparse, names_sparse, n_cat_sparse = ( + _build_sparse_constraint_system( + data, marginal, continuous, linear + ) + ) + return ( + A_dense, + b_dense, + names_dense, + n_cat_dense, + X_sparse, + b_sparse, + names_sparse, + n_cat_sparse, + ) + + def test_marginal_constraints_only(self) -> None: + data = _toy_frame() + marginal = { + "state": { + "CA": 280, + "NY": 230, + "TX": 260, + "FL": 230, + }, + "age_group": {"0-17": 230, "18-64": 570, "65+": 200}, + } + ( + A_dense, + b_dense, + names_dense, + n_cat_dense, + X_sparse, + b_sparse, + names_sparse, + n_cat_sparse, + ) = self._build_both(data, marginal, None, ()) + + assert sparse.issparse(X_sparse), type(X_sparse) + assert X_sparse.shape == A_dense.shape + np.testing.assert_allclose( + X_sparse.toarray(), A_dense, rtol=0, atol=0 + ) + np.testing.assert_array_equal(b_sparse, b_dense) + assert names_sparse == names_dense + assert n_cat_sparse == n_cat_dense + + def test_continuous_and_linear_constraints(self) -> None: + data = _toy_frame() + marginal = {"state": {"CA": 250, "NY": 250, "TX": 250, "FL": 250}} + continuous = {"income": 20_000_000.0} + linear = ( + LinearConstraint( + name="old_and_rich", + coefficients=( + (data["age_group"] == "65+").astype(float) + * data["income"] + ).to_numpy(), + target=500_000.0, + ), + ) + ( + A_dense, + b_dense, + names_dense, + n_cat_dense, + X_sparse, + b_sparse, + names_sparse, + n_cat_sparse, + ) = self._build_both(data, marginal, continuous, linear) + + assert sparse.issparse(X_sparse) + np.testing.assert_allclose( + X_sparse.toarray(), A_dense, rtol=1e-12, atol=0 + ) + np.testing.assert_array_equal(b_sparse, b_dense) + assert names_sparse == names_dense + assert n_cat_sparse == n_cat_dense + + +class TestSparseBuilderActuallySparsifies: + """Marginal indicators must actually be stored sparsely.""" + + def test_marginal_density_is_low(self) -> None: + """With 4 state × 3 age-group categories, density should be ~1/4 per target.""" + from microplex.calibration import _build_sparse_constraint_system + + data = _toy_frame(n=10_000, seed=1) + marginal = { + "state": {"CA": 2_500, "NY": 2_500, "TX": 2_500, "FL": 2_500}, + "age_group": { + "0-17": 2_500, + "18-64": 5_500, + "65+": 2_000, + }, + } + X_sparse, _b, _names, _n = _build_sparse_constraint_system( + data, marginal, None, () + ) + density = X_sparse.nnz / (X_sparse.shape[0] * X_sparse.shape[1]) + # Upper bound: with 4-state + 3-age indicators the weighted + # density is <= max(1/4, 1/3) ≈ 0.33. Assert meaningfully less + # than dense so we know CSR is doing its job. + assert density < 0.45, density