Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 7 additions & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,12 @@ statmatch = [
l0 = [
"l0-python>=0.4", # L0 regularization for HardConcreteCalibrator
]
calibrate = [
# Identity-preserving gradient-descent chi-squared adapter.
# microcalibrate pins Python >=3.13; the marker avoids breaking
# microplex installs on 3.11/3.12 that don't need the extra.
"microcalibrate>=0.22; python_version >= '3.13'",
]
benchmark = [
"sdv>=1.0", # CT-GAN, TVAE, Copulas
"scikit-learn>=1.3", # For QRF comparison
Expand All @@ -78,7 +84,7 @@ docs = [
"sphinx-autodoc-typehints>=1.23",
]
all = [
"microplex[dev,benchmark,docs]",
"microplex[dev,benchmark,docs,calibrate]",
]

[project.urls]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2009,3 +2009,22 @@ def validate(self, data: pd.DataFrame) -> dict[str, Any]:
results["rmse"] = self.calibration_error_

return results



# --- Optional microcalibrate adapter ---------------------------------------
# Imported when `microplex[calibrate]` extra is installed. The adapter
# pulls in torch/optuna/l0 (~1.5 GB) which microplex does not otherwise
# require, so we keep it opt-in. Callers that need identity-preserving
# gradient-descent chi-squared calibration install the extra and import
# `MicrocalibrateAdapter` / `MicrocalibrateAdapterConfig` from this
# namespace.
try:
from microplex.calibration.microcalibrate_adapter import ( # noqa: F401
MicrocalibrateAdapter,
MicrocalibrateAdapterConfig,
)
except ImportError: # microcalibrate not installed
MicrocalibrateAdapter = None # type: ignore[assignment]
MicrocalibrateAdapterConfig = None # type: ignore[assignment]

224 changes: 224 additions & 0 deletions src/microplex/calibration/microcalibrate_adapter.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,224 @@
"""Country-agnostic adapter that wraps `microcalibrate.Calibration`.

Presents the legacy `microplex.calibration.Calibrator.fit_transform`
surface on top of a gradient-descent chi-squared solver so country
packages (`microplex-us`, `microplex-uk`, etc.) share one
identity-preserving calibrator instead of duplicating the glue. Every
input record survives to the output with a non-negative weight.

`microcalibrate` is an optional upstream dependency installed via the
``microplex[calibrate]`` extra. This module raises `ImportError` at
top-level if the extra isn't installed; `microplex.calibration`'s own
``__init__.py`` imports from here inside a ``try/except`` so callers
get the adapter when the extra is present and a clean no-op otherwise.
"""

from __future__ import annotations

from dataclasses import dataclass
from typing import Any, Sequence

import numpy as np
import pandas as pd
from microcalibrate import Calibration

from microplex.calibration import LinearConstraint


@dataclass(frozen=True)
class MicrocalibrateAdapterConfig:
"""Hyperparameters for `MicrocalibrateAdapter`.

Defaults mirror `microcalibrate.Calibration`'s own defaults
(epochs=32, learning_rate=1e-3, noise_level=10.0) except ``device``,
which microcalibrate auto-selects CUDA > MPS > CPU but we leave as
None so callers keep deterministic control.
"""

epochs: int = 32
learning_rate: float = 1e-3
noise_level: float = 10.0
dropout_rate: float = 0.0
device: str | None = None # None = let microcalibrate auto-select
seed: int = 42
regularize_with_l0: bool = False
l0_lambda: float = 5e-6
init_mean: float = 0.999
temperature: float = 0.5
sparse_learning_rate: float = 0.2
# Keep activation memory bounded at country-scale pipelines. 100k
# records per backward step keeps per-batch autograd activation
# under ~200 MB at k = 500 constraints (100_000 * 500 * 4 B).
# None = full-batch, which can OOM past ~500k records.
batch_size: int | None = 100_000


class MicrocalibrateAdapter:
"""Drop-in replacement for `Calibrator.fit_transform` / `validate`.

Usage:

>>> adapter = MicrocalibrateAdapter()
>>> result = adapter.fit_transform(
... data=households_df,
... weight_col="household_weight",
... linear_constraints=tuple_of_LinearConstraints,
... )
>>> validation = adapter.validate(result)

The returned DataFrame is a copy of ``data`` with ``weight_col``
updated.
"""

def __init__(
self,
config: MicrocalibrateAdapterConfig | None = None,
) -> None:
self.config = config or MicrocalibrateAdapterConfig()
self._last_calibration: Calibration | None = None
self._last_constraint_names: list[str] | None = None
self._last_targets: np.ndarray | None = None
self._last_performance: pd.DataFrame | None = None

def fit_transform(
self,
data: pd.DataFrame,
marginal_targets: dict[str, dict[str, float]] | None = None,
continuous_targets: dict[str, float] | None = None,
*,
weight_col: str = "weight",
linear_constraints: Sequence[LinearConstraint] = (),
) -> pd.DataFrame:
"""Calibrate weights via gradient-descent chi-squared.

``marginal_targets`` and ``continuous_targets`` are accepted for
signature parity with the legacy `Calibrator`, but this adapter
expects constraints to be expressed as `LinearConstraint` rows.
Callers should compile their marginal / continuous targets into
linear constraints before calling.
"""
if weight_col not in data.columns:
raise ValueError(
f"MicrocalibrateAdapter: weight column {weight_col!r} "
f"not found in data (columns: {list(data.columns)[:10]}...)"
)

n_records = len(data)
initial_weights = data[weight_col].to_numpy(dtype=float)

if not linear_constraints:
# Nothing to calibrate — preserve caller expectations.
self._last_calibration = None
self._last_constraint_names = []
self._last_targets = np.empty(0, dtype=float)
self._last_performance = None
return data.copy()

target_names = [c.name for c in linear_constraints]
targets = np.array([c.target for c in linear_constraints], dtype=float)

for constraint in linear_constraints:
if constraint.coefficients.shape != (n_records,):
raise ValueError(
f"MicrocalibrateAdapter: constraint {constraint.name!r} has "
f"coefficients shape {constraint.coefficients.shape}, expected "
f"({n_records},) matching the data length."
)

# float32 keeps the adapter's peak allocation at half the
# float64 default; microcalibrate casts to float32 anyway, so
# this is a free precision-compatible win.
estimate_matrix = pd.DataFrame(
{
c.name: np.asarray(c.coefficients, dtype=np.float32)
for c in linear_constraints
}
)

calibrator = Calibration(
weights=initial_weights,
targets=targets,
target_names=np.array(target_names),
estimate_matrix=estimate_matrix,
epochs=self.config.epochs,
learning_rate=self.config.learning_rate,
noise_level=self.config.noise_level,
dropout_rate=self.config.dropout_rate,
device=self.config.device,
seed=self.config.seed,
regularize_with_l0=self.config.regularize_with_l0,
l0_lambda=self.config.l0_lambda,
init_mean=self.config.init_mean,
temperature=self.config.temperature,
sparse_learning_rate=self.config.sparse_learning_rate,
batch_size=self.config.batch_size,
)

performance_df = calibrator.calibrate()
self._last_calibration = calibrator
self._last_constraint_names = target_names
self._last_targets = targets
self._last_performance = performance_df

result = data.copy()
result[weight_col] = calibrator.weights
return result

def validate(self, calibrated: pd.DataFrame | None = None) -> dict[str, Any]:
"""Return validation metrics in the shape the legacy pipeline expects.

The legacy `Calibrator.validate` returns ``{"converged",
"max_error", "sparsity", "linear_errors"}``. We populate the
same keys. ``calibrated`` is accepted for interface parity but
not read; the authoritative values come from the last
``calibrate()`` call.
"""
if self._last_calibration is None:
return {
"converged": True,
"max_error": 0.0,
"sparsity": 0.0,
"linear_errors": {},
}

estimates = self._last_calibration.estimate().to_numpy(dtype=float)
targets = self._last_targets
names = self._last_constraint_names

rel_errors = np.where(
np.abs(targets) > 1e-12,
np.abs(estimates - targets) / np.abs(targets),
np.abs(estimates - targets),
)
linear_errors = {
name: {
"target": float(target_value),
"estimate": float(estimate_value),
"relative_error": float(rel_error),
"absolute_error": float(abs(estimate_value - target_value)),
}
for name, target_value, estimate_value, rel_error in zip(
names, targets, estimates, rel_errors, strict=True
)
}

max_error = float(rel_errors.max()) if rel_errors.size else 0.0
weights = self._last_calibration.weights
sparsity = float((weights == 0).sum()) / max(len(weights), 1)

return {
"converged": bool(max_error < 0.05), # 5 % relative error bar
"max_error": max_error,
"sparsity": sparsity,
"linear_errors": linear_errors,
}

def performance_history(self) -> pd.DataFrame | None:
"""Per-epoch performance log from microcalibrate, if available."""
return self._last_performance


__all__ = [
"MicrocalibrateAdapter",
"MicrocalibrateAdapterConfig",
]
87 changes: 87 additions & 0 deletions tests/test_microcalibrate_adapter.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,87 @@
"""Upstream-home tests for the country-agnostic microcalibrate adapter.

The adapter wraps `microcalibrate.Calibration` with the legacy
`Calibrator.fit_transform` interface so country packages (microplex-us,
microplex-uk, etc.) share one identity-preserving calibrator instead of
duplicating the glue code.

microcalibrate is an optional extra; these tests import it via the
`microplex[calibrate]` route and skip gracefully when unavailable.
"""

from __future__ import annotations

import numpy as np
import pandas as pd
import pytest

from microplex.calibration import LinearConstraint

microcalibrate = pytest.importorskip("microcalibrate")


def _toy_data(n_records: int = 200, seed: int = 0) -> pd.DataFrame:
rng = np.random.default_rng(seed)
return pd.DataFrame(
{
"age": rng.integers(18, 70, size=n_records),
"weight": np.ones(n_records),
}
)


def _age_band(
data: pd.DataFrame, name: str, low: int, high: int, target: float
) -> LinearConstraint:
mask = (data["age"] >= low) & (data["age"] < high)
return LinearConstraint(
name=name,
coefficients=mask.astype(float).to_numpy(),
target=target,
)


class TestImportSurface:
"""The adapter must be importable from the canonical upstream location."""

def test_importable_from_microplex_calibration(self) -> None:
from microplex.calibration import (
MicrocalibrateAdapter,
MicrocalibrateAdapterConfig,
)

assert MicrocalibrateAdapter is not None
assert MicrocalibrateAdapterConfig is not None

def test_default_config_sets_batch_size(self) -> None:
"""Default batch_size is set so v7-scale fits stay under RSS budget."""
from microplex.calibration import MicrocalibrateAdapterConfig

config = MicrocalibrateAdapterConfig()
assert config.batch_size is not None
assert config.batch_size > 0


class TestConvergesOnSmallProblem:
"""End-to-end check: fit_transform moves weights toward targets."""

def test_three_age_bands_converge(self) -> None:
from microplex.calibration import (
MicrocalibrateAdapter,
MicrocalibrateAdapterConfig,
)

data = _toy_data(n_records=300)
constraints = (
_age_band(data, "age_18_30", 18, 30, 60.0),
_age_band(data, "age_30_45", 30, 45, 90.0),
_age_band(data, "age_45_70", 45, 70, 150.0),
)
adapter = MicrocalibrateAdapter(
MicrocalibrateAdapterConfig(
epochs=400, learning_rate=0.05, noise_level=0.0
)
)
result = adapter.fit_transform(data, linear_constraints=constraints)
validation = adapter.validate(result)
assert validation["max_error"] < 0.1, validation
Loading