diff --git a/pyproject.toml b/pyproject.toml index a2e5fb9..bbd4baf 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -65,6 +65,12 @@ statmatch = [ l0 = [ "l0-python>=0.4", # L0 regularization for HardConcreteCalibrator ] +calibrate = [ + # Identity-preserving gradient-descent chi-squared adapter. + # microcalibrate pins Python >=3.13; the marker avoids breaking + # microplex installs on 3.11/3.12 that don't need the extra. + "microcalibrate>=0.22; python_version >= '3.13'", +] benchmark = [ "sdv>=1.0", # CT-GAN, TVAE, Copulas "scikit-learn>=1.3", # For QRF comparison @@ -78,7 +84,7 @@ docs = [ "sphinx-autodoc-typehints>=1.23", ] all = [ - "microplex[dev,benchmark,docs]", + "microplex[dev,benchmark,docs,calibrate]", ] [project.urls] diff --git a/src/microplex/calibration.py b/src/microplex/calibration/__init__.py similarity index 98% rename from src/microplex/calibration.py rename to src/microplex/calibration/__init__.py index 13ad20b..c5ebc3a 100644 --- a/src/microplex/calibration.py +++ b/src/microplex/calibration/__init__.py @@ -2009,3 +2009,22 @@ def validate(self, data: pd.DataFrame) -> dict[str, Any]: results["rmse"] = self.calibration_error_ return results + + + +# --- Optional microcalibrate adapter --------------------------------------- +# Imported when `microplex[calibrate]` extra is installed. The adapter +# pulls in torch/optuna/l0 (~1.5 GB) which microplex does not otherwise +# require, so we keep it opt-in. Callers that need identity-preserving +# gradient-descent chi-squared calibration install the extra and import +# `MicrocalibrateAdapter` / `MicrocalibrateAdapterConfig` from this +# namespace. +try: + from microplex.calibration.microcalibrate_adapter import ( # noqa: F401 + MicrocalibrateAdapter, + MicrocalibrateAdapterConfig, + ) +except ImportError: # microcalibrate not installed + MicrocalibrateAdapter = None # type: ignore[assignment] + MicrocalibrateAdapterConfig = None # type: ignore[assignment] + diff --git a/src/microplex/calibration/microcalibrate_adapter.py b/src/microplex/calibration/microcalibrate_adapter.py new file mode 100644 index 0000000..a13f149 --- /dev/null +++ b/src/microplex/calibration/microcalibrate_adapter.py @@ -0,0 +1,224 @@ +"""Country-agnostic adapter that wraps `microcalibrate.Calibration`. + +Presents the legacy `microplex.calibration.Calibrator.fit_transform` +surface on top of a gradient-descent chi-squared solver so country +packages (`microplex-us`, `microplex-uk`, etc.) share one +identity-preserving calibrator instead of duplicating the glue. Every +input record survives to the output with a non-negative weight. + +`microcalibrate` is an optional upstream dependency installed via the +``microplex[calibrate]`` extra. This module raises `ImportError` at +top-level if the extra isn't installed; `microplex.calibration`'s own +``__init__.py`` imports from here inside a ``try/except`` so callers +get the adapter when the extra is present and a clean no-op otherwise. +""" + +from __future__ import annotations + +from dataclasses import dataclass +from typing import Any, Sequence + +import numpy as np +import pandas as pd +from microcalibrate import Calibration + +from microplex.calibration import LinearConstraint + + +@dataclass(frozen=True) +class MicrocalibrateAdapterConfig: + """Hyperparameters for `MicrocalibrateAdapter`. + + Defaults mirror `microcalibrate.Calibration`'s own defaults + (epochs=32, learning_rate=1e-3, noise_level=10.0) except ``device``, + which microcalibrate auto-selects CUDA > MPS > CPU but we leave as + None so callers keep deterministic control. + """ + + epochs: int = 32 + learning_rate: float = 1e-3 + noise_level: float = 10.0 + dropout_rate: float = 0.0 + device: str | None = None # None = let microcalibrate auto-select + seed: int = 42 + regularize_with_l0: bool = False + l0_lambda: float = 5e-6 + init_mean: float = 0.999 + temperature: float = 0.5 + sparse_learning_rate: float = 0.2 + # Keep activation memory bounded at country-scale pipelines. 100k + # records per backward step keeps per-batch autograd activation + # under ~200 MB at k = 500 constraints (100_000 * 500 * 4 B). + # None = full-batch, which can OOM past ~500k records. + batch_size: int | None = 100_000 + + +class MicrocalibrateAdapter: + """Drop-in replacement for `Calibrator.fit_transform` / `validate`. + + Usage: + + >>> adapter = MicrocalibrateAdapter() + >>> result = adapter.fit_transform( + ... data=households_df, + ... weight_col="household_weight", + ... linear_constraints=tuple_of_LinearConstraints, + ... ) + >>> validation = adapter.validate(result) + + The returned DataFrame is a copy of ``data`` with ``weight_col`` + updated. + """ + + def __init__( + self, + config: MicrocalibrateAdapterConfig | None = None, + ) -> None: + self.config = config or MicrocalibrateAdapterConfig() + self._last_calibration: Calibration | None = None + self._last_constraint_names: list[str] | None = None + self._last_targets: np.ndarray | None = None + self._last_performance: pd.DataFrame | None = None + + def fit_transform( + self, + data: pd.DataFrame, + marginal_targets: dict[str, dict[str, float]] | None = None, + continuous_targets: dict[str, float] | None = None, + *, + weight_col: str = "weight", + linear_constraints: Sequence[LinearConstraint] = (), + ) -> pd.DataFrame: + """Calibrate weights via gradient-descent chi-squared. + + ``marginal_targets`` and ``continuous_targets`` are accepted for + signature parity with the legacy `Calibrator`, but this adapter + expects constraints to be expressed as `LinearConstraint` rows. + Callers should compile their marginal / continuous targets into + linear constraints before calling. + """ + if weight_col not in data.columns: + raise ValueError( + f"MicrocalibrateAdapter: weight column {weight_col!r} " + f"not found in data (columns: {list(data.columns)[:10]}...)" + ) + + n_records = len(data) + initial_weights = data[weight_col].to_numpy(dtype=float) + + if not linear_constraints: + # Nothing to calibrate — preserve caller expectations. + self._last_calibration = None + self._last_constraint_names = [] + self._last_targets = np.empty(0, dtype=float) + self._last_performance = None + return data.copy() + + target_names = [c.name for c in linear_constraints] + targets = np.array([c.target for c in linear_constraints], dtype=float) + + for constraint in linear_constraints: + if constraint.coefficients.shape != (n_records,): + raise ValueError( + f"MicrocalibrateAdapter: constraint {constraint.name!r} has " + f"coefficients shape {constraint.coefficients.shape}, expected " + f"({n_records},) matching the data length." + ) + + # float32 keeps the adapter's peak allocation at half the + # float64 default; microcalibrate casts to float32 anyway, so + # this is a free precision-compatible win. + estimate_matrix = pd.DataFrame( + { + c.name: np.asarray(c.coefficients, dtype=np.float32) + for c in linear_constraints + } + ) + + calibrator = Calibration( + weights=initial_weights, + targets=targets, + target_names=np.array(target_names), + estimate_matrix=estimate_matrix, + epochs=self.config.epochs, + learning_rate=self.config.learning_rate, + noise_level=self.config.noise_level, + dropout_rate=self.config.dropout_rate, + device=self.config.device, + seed=self.config.seed, + regularize_with_l0=self.config.regularize_with_l0, + l0_lambda=self.config.l0_lambda, + init_mean=self.config.init_mean, + temperature=self.config.temperature, + sparse_learning_rate=self.config.sparse_learning_rate, + batch_size=self.config.batch_size, + ) + + performance_df = calibrator.calibrate() + self._last_calibration = calibrator + self._last_constraint_names = target_names + self._last_targets = targets + self._last_performance = performance_df + + result = data.copy() + result[weight_col] = calibrator.weights + return result + + def validate(self, calibrated: pd.DataFrame | None = None) -> dict[str, Any]: + """Return validation metrics in the shape the legacy pipeline expects. + + The legacy `Calibrator.validate` returns ``{"converged", + "max_error", "sparsity", "linear_errors"}``. We populate the + same keys. ``calibrated`` is accepted for interface parity but + not read; the authoritative values come from the last + ``calibrate()`` call. + """ + if self._last_calibration is None: + return { + "converged": True, + "max_error": 0.0, + "sparsity": 0.0, + "linear_errors": {}, + } + + estimates = self._last_calibration.estimate().to_numpy(dtype=float) + targets = self._last_targets + names = self._last_constraint_names + + rel_errors = np.where( + np.abs(targets) > 1e-12, + np.abs(estimates - targets) / np.abs(targets), + np.abs(estimates - targets), + ) + linear_errors = { + name: { + "target": float(target_value), + "estimate": float(estimate_value), + "relative_error": float(rel_error), + "absolute_error": float(abs(estimate_value - target_value)), + } + for name, target_value, estimate_value, rel_error in zip( + names, targets, estimates, rel_errors, strict=True + ) + } + + max_error = float(rel_errors.max()) if rel_errors.size else 0.0 + weights = self._last_calibration.weights + sparsity = float((weights == 0).sum()) / max(len(weights), 1) + + return { + "converged": bool(max_error < 0.05), # 5 % relative error bar + "max_error": max_error, + "sparsity": sparsity, + "linear_errors": linear_errors, + } + + def performance_history(self) -> pd.DataFrame | None: + """Per-epoch performance log from microcalibrate, if available.""" + return self._last_performance + + +__all__ = [ + "MicrocalibrateAdapter", + "MicrocalibrateAdapterConfig", +] diff --git a/tests/test_microcalibrate_adapter.py b/tests/test_microcalibrate_adapter.py new file mode 100644 index 0000000..37bd735 --- /dev/null +++ b/tests/test_microcalibrate_adapter.py @@ -0,0 +1,87 @@ +"""Upstream-home tests for the country-agnostic microcalibrate adapter. + +The adapter wraps `microcalibrate.Calibration` with the legacy +`Calibrator.fit_transform` interface so country packages (microplex-us, +microplex-uk, etc.) share one identity-preserving calibrator instead of +duplicating the glue code. + +microcalibrate is an optional extra; these tests import it via the +`microplex[calibrate]` route and skip gracefully when unavailable. +""" + +from __future__ import annotations + +import numpy as np +import pandas as pd +import pytest + +from microplex.calibration import LinearConstraint + +microcalibrate = pytest.importorskip("microcalibrate") + + +def _toy_data(n_records: int = 200, seed: int = 0) -> pd.DataFrame: + rng = np.random.default_rng(seed) + return pd.DataFrame( + { + "age": rng.integers(18, 70, size=n_records), + "weight": np.ones(n_records), + } + ) + + +def _age_band( + data: pd.DataFrame, name: str, low: int, high: int, target: float +) -> LinearConstraint: + mask = (data["age"] >= low) & (data["age"] < high) + return LinearConstraint( + name=name, + coefficients=mask.astype(float).to_numpy(), + target=target, + ) + + +class TestImportSurface: + """The adapter must be importable from the canonical upstream location.""" + + def test_importable_from_microplex_calibration(self) -> None: + from microplex.calibration import ( + MicrocalibrateAdapter, + MicrocalibrateAdapterConfig, + ) + + assert MicrocalibrateAdapter is not None + assert MicrocalibrateAdapterConfig is not None + + def test_default_config_sets_batch_size(self) -> None: + """Default batch_size is set so v7-scale fits stay under RSS budget.""" + from microplex.calibration import MicrocalibrateAdapterConfig + + config = MicrocalibrateAdapterConfig() + assert config.batch_size is not None + assert config.batch_size > 0 + + +class TestConvergesOnSmallProblem: + """End-to-end check: fit_transform moves weights toward targets.""" + + def test_three_age_bands_converge(self) -> None: + from microplex.calibration import ( + MicrocalibrateAdapter, + MicrocalibrateAdapterConfig, + ) + + data = _toy_data(n_records=300) + constraints = ( + _age_band(data, "age_18_30", 18, 30, 60.0), + _age_band(data, "age_30_45", 30, 45, 90.0), + _age_band(data, "age_45_70", 45, 70, 150.0), + ) + adapter = MicrocalibrateAdapter( + MicrocalibrateAdapterConfig( + epochs=400, learning_rate=0.05, noise_level=0.0 + ) + ) + result = adapter.fit_transform(data, linear_constraints=constraints) + validation = adapter.validate(result) + assert validation["max_error"] < 0.1, validation