Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
78 changes: 78 additions & 0 deletions src/microplex/calibration/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,84 @@ def _build_linear_constraint_system(
return A, b, names, n_categorical


def _build_sparse_constraint_system(
data: pd.DataFrame,
marginal_targets: dict[str, dict[str, float]],
continuous_targets: dict[str, float] | None,
linear_constraints: tuple[LinearConstraint, ...],
):
"""Build a CSR linear system without the dense `np.vstack` intermediate.

Returns ``(X_csr, b, names, n_categorical)`` with exactly the same
semantics as ``_build_linear_constraint_system`` but the matrix is
`scipy.sparse.csr_matrix` built row-by-row, never allocating a
dense float64 array of shape ``(n_targets, n_records)``. At v7
scale (~1.5M records x ~4k constraints) this is the difference
between ~24 GB dense peak and ~500 MB sparse CSR.

Marginal-target rows are stored via row-indexed integer lookup
(only the `n_matching_rows` entries with value 1.0 are kept).
Continuous-target rows are dense in the record dimension but still
built without stacking. `LinearConstraint` rows are passed through
with `.nonzero()` filtering so explicit-zero coefficients don't
balloon the CSR.
"""
from scipy import sparse as _sp

indptr: list[int] = [0]
indices: list[np.ndarray] = []
data_vals: list[np.ndarray] = []
targets: list[float] = []
names: list[str] = []
n_categorical = 0
n_records = len(data)

def _append_row(col_indices: np.ndarray, values: np.ndarray) -> None:
indices.append(col_indices.astype(np.int64, copy=False))
data_vals.append(values.astype(np.float64, copy=False))
indptr.append(indptr[-1] + len(col_indices))

for var, var_targets in marginal_targets.items():
column = data[var].to_numpy()
for category, target in var_targets.items():
match = np.flatnonzero(column == category)
_append_row(match, np.ones(len(match), dtype=np.float64))
targets.append(float(target))
names.append(f"{var}={category}")
n_categorical += 1

for var, target in (continuous_targets or {}).items():
values = data[var].to_numpy(dtype=np.float64, copy=False)
nz = np.flatnonzero(values != 0.0)
_append_row(nz, values[nz])
targets.append(float(target))
names.append(var)

for constraint in linear_constraints:
coef = np.asarray(constraint.coefficients, dtype=np.float64)
nz = np.flatnonzero(coef != 0.0)
_append_row(nz, coef[nz])
targets.append(float(constraint.target))
names.append(constraint.name)

if not names:
X_csr = _sp.csr_matrix((0, n_records), dtype=np.float64)
b = np.zeros(0, dtype=np.float64)
return X_csr, b, names, n_categorical

X_csr = _sp.csr_matrix(
(
np.concatenate(data_vals) if data_vals else np.zeros(0),
np.concatenate(indices) if indices else np.zeros(0, dtype=np.int64),
np.asarray(indptr, dtype=np.int64),
),
shape=(len(names), n_records),
dtype=np.float64,
)
b = np.asarray(targets, dtype=np.float64)
return X_csr, b, names, n_categorical


def _validate_calibration_inputs(
data: pd.DataFrame,
marginal_targets: dict[str, dict[str, float]],
Expand Down
169 changes: 169 additions & 0 deletions tests/test_sparse_constraint_system.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,169 @@
"""Sparse build path for the linear constraint system.

`_build_linear_constraint_system` currently materializes a dense numpy
`A` of shape `(n_targets, n_records)` via `np.vstack(rows)`. At
microplex-us v7 scale (1.5M records x ~4k+ constraints, mostly
marginal indicators that are 95 %+ zero) this dense materialization
eats ~24 GB and pushes the process past jetsam. Downstream L0
calibrators then immediately convert to CSR via `sp.csr_matrix(A)`,
which already produces the correct sparse result — we just got there
through a dense intermediate that wastes the memory.

These tests pin a sparse-native builder (`_build_sparse_constraint_system`)
that returns `(X_csr, b, names, n_categorical)` without ever
materializing the dense matrix. Semantics must agree with the dense
version up to float32 rounding.
"""

from __future__ import annotations

import numpy as np
import pandas as pd
import pytest

pytest.importorskip("scipy")

from scipy import sparse

from microplex.calibration import (
LinearConstraint,
_build_linear_constraint_system,
)


def _toy_frame(n: int = 1000, seed: int = 0) -> pd.DataFrame:
rng = np.random.default_rng(seed)
return pd.DataFrame(
{
"state": rng.choice(["CA", "NY", "TX", "FL"], size=n),
"age_group": rng.choice(["0-17", "18-64", "65+"], size=n),
"income": rng.lognormal(10, 1, size=n),
}
)


class TestSparseBuilderExists:
"""The sparse builder must be importable from microplex.calibration."""

def test_sparse_builder_is_importable(self) -> None:
from microplex.calibration import _build_sparse_constraint_system

assert callable(_build_sparse_constraint_system)


class TestSparseBuilderMatchesDense:
"""Sparse builder output must equal dense builder output numerically."""

def _build_both(self, data, marginal, continuous, linear):
from microplex.calibration import _build_sparse_constraint_system

A_dense, b_dense, names_dense, n_cat_dense = (
_build_linear_constraint_system(
data, marginal, continuous, linear
)
)
X_sparse, b_sparse, names_sparse, n_cat_sparse = (
_build_sparse_constraint_system(
data, marginal, continuous, linear
)
)
return (
A_dense,
b_dense,
names_dense,
n_cat_dense,
X_sparse,
b_sparse,
names_sparse,
n_cat_sparse,
)

def test_marginal_constraints_only(self) -> None:
data = _toy_frame()
marginal = {
"state": {
"CA": 280,
"NY": 230,
"TX": 260,
"FL": 230,
},
"age_group": {"0-17": 230, "18-64": 570, "65+": 200},
}
(
A_dense,
b_dense,
names_dense,
n_cat_dense,
X_sparse,
b_sparse,
names_sparse,
n_cat_sparse,
) = self._build_both(data, marginal, None, ())

assert sparse.issparse(X_sparse), type(X_sparse)
assert X_sparse.shape == A_dense.shape
np.testing.assert_allclose(
X_sparse.toarray(), A_dense, rtol=0, atol=0
)
np.testing.assert_array_equal(b_sparse, b_dense)
assert names_sparse == names_dense
assert n_cat_sparse == n_cat_dense

def test_continuous_and_linear_constraints(self) -> None:
data = _toy_frame()
marginal = {"state": {"CA": 250, "NY": 250, "TX": 250, "FL": 250}}
continuous = {"income": 20_000_000.0}
linear = (
LinearConstraint(
name="old_and_rich",
coefficients=(
(data["age_group"] == "65+").astype(float)
* data["income"]
).to_numpy(),
target=500_000.0,
),
)
(
A_dense,
b_dense,
names_dense,
n_cat_dense,
X_sparse,
b_sparse,
names_sparse,
n_cat_sparse,
) = self._build_both(data, marginal, continuous, linear)

assert sparse.issparse(X_sparse)
np.testing.assert_allclose(
X_sparse.toarray(), A_dense, rtol=1e-12, atol=0
)
np.testing.assert_array_equal(b_sparse, b_dense)
assert names_sparse == names_dense
assert n_cat_sparse == n_cat_dense


class TestSparseBuilderActuallySparsifies:
"""Marginal indicators must actually be stored sparsely."""

def test_marginal_density_is_low(self) -> None:
"""With 4 state × 3 age-group categories, density should be ~1/4 per target."""
from microplex.calibration import _build_sparse_constraint_system

data = _toy_frame(n=10_000, seed=1)
marginal = {
"state": {"CA": 2_500, "NY": 2_500, "TX": 2_500, "FL": 2_500},
"age_group": {
"0-17": 2_500,
"18-64": 5_500,
"65+": 2_000,
},
}
X_sparse, _b, _names, _n = _build_sparse_constraint_system(
data, marginal, None, ()
)
density = X_sparse.nnz / (X_sparse.shape[0] * X_sparse.shape[1])
# Upper bound: with 4-state + 3-age indicators the weighted
# density is <= max(1/4, 1/3) ≈ 0.33. Assert meaningfully less
# than dense so we know CSR is doing its job.
assert density < 0.45, density
Loading