Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
304 changes: 304 additions & 0 deletions src/microplex_us/pipelines/donor_imputers.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,304 @@
"""Donor imputer implementations for US pipeline donor synthesis."""

from __future__ import annotations

import importlib.util
from typing import Any

import numpy as np
import pandas as pd
from sklearn.ensemble import RandomForestClassifier


class ColumnwiseQRFDonorImputer:
"""Columnwise QRF donor imputer, optionally with zero-inflated support."""

def __init__(
self,
*,
condition_vars: list[str],
target_vars: list[str],
n_estimators: int = 100,
zero_inflated_vars: set[str] | None = None,
nonnegative_vars: set[str] | None = None,
zero_threshold: float = 0.05,
quantiles: tuple[float, ...] = (0.05, 0.1, 0.25, 0.5, 0.75, 0.9, 0.95),
) -> None:
self.condition_vars = list(condition_vars)
self.target_vars = list(target_vars)
self.n_estimators = int(n_estimators)
self.zero_inflated_vars = set(zero_inflated_vars or ())
self.nonnegative_vars = set(nonnegative_vars or ())
self.zero_threshold = float(zero_threshold)
self.quantiles = tuple(float(value) for value in quantiles)
self._models: dict[str, Any] = {}
self._zero_models: dict[str, RandomForestClassifier] = {}

def fit(
self,
data: pd.DataFrame,
*,
weight_col: str | None = "weight",
epochs: int | None = None,
batch_size: int | None = None,
learning_rate: float | None = None,
verbose: bool = False,
) -> ColumnwiseQRFDonorImputer:
del weight_col, epochs, batch_size, learning_rate, verbose
if importlib.util.find_spec("quantile_forest") is None:
raise ImportError(
"quantile-forest is required for donor_imputer_backend='qrf'"
)
from quantile_forest import RandomForestQuantileRegressor

self._models = {}
self._zero_models = {}
for column in self.target_vars:
subset = data[self.condition_vars + [column]].dropna()
if len(subset) < 25:
continue
x_values = subset[self.condition_vars].to_numpy(dtype=float)
y_values = subset[column].to_numpy(dtype=float)
if (
column in self.zero_inflated_vars
and (y_values == 0).mean() >= self.zero_threshold
and (y_values == 0).sum() >= 10
and (y_values != 0).sum() >= 10
):
# Gate trained as zero vs nonzero (both signs), not as
# zero-or-negative vs positive. The old `y > 0` label
# silently dropped every negative training row along
# with zeros, so the QRF below only ever saw positive
# rows and could never emit a negative prediction - the
# v7 bug that blanked the negative tail of capital
# gains, partnership income, farm income, etc. The
# `!= 0` label is the minimal fix; the full upgrade to
# `microimpute.ZeroInflatedImputer` (regime-aware
# tripartite routing with separate positive / negative
# QRFs) is tracked as a follow-up.
zero_model = RandomForestClassifier(
n_estimators=max(50, self.n_estimators // 2),
random_state=42,
n_jobs=-1,
)
zero_model.fit(x_values, (y_values != 0).astype(int))
self._zero_models[column] = zero_model
x_values = x_values[y_values != 0]
y_values = y_values[y_values != 0]
if len(y_values) < 25:
continue
model = RandomForestQuantileRegressor(
n_estimators=self.n_estimators,
random_state=42,
n_jobs=-1,
)
model.fit(x_values, y_values)
self._models[column] = model
return self

def generate(
self,
conditions: pd.DataFrame,
seed: int | None = None,
) -> pd.DataFrame:
rng = np.random.RandomState(seed or 42)
synthetic = conditions.copy().reset_index(drop=True)
x_values = synthetic[self.condition_vars].to_numpy(dtype=float)
for column in self.target_vars:
model = self._models.get(column)
if model is None:
synthetic[column] = np.nan
continue
values = np.zeros(len(synthetic), dtype=float)
target_rows = np.ones(len(synthetic), dtype=bool)
zero_model = self._zero_models.get(column)
if zero_model is not None:
probabilities = zero_model.predict_proba(x_values)
positive_probs = (
probabilities[:, 1]
if probabilities.shape[1] > 1
else np.zeros(len(synthetic), dtype=float)
)
target_rows = rng.random(len(synthetic)) < positive_probs
values[:] = 0.0
if target_rows.any():
predictions = model.predict(
x_values[target_rows],
quantiles=list(self.quantiles),
)
quantile_choices = rng.choice(
len(self.quantiles), size=target_rows.sum()
)
draws = predictions[np.arange(target_rows.sum()), quantile_choices]
if column in self.nonnegative_vars:
draws = np.maximum(draws, 0.0)
values[target_rows] = draws
synthetic[column] = values
return synthetic


class RegimeAwareDonorImputer:
"""Donor imputer that wraps `microimpute.ZeroInflatedImputer` per column.

Each target is fit with an independent `ZeroInflatedImputer`, which
auto-detects one of seven regimes (THREE_SIGN / ZI_POSITIVE /
ZI_NEGATIVE / SIGN_ONLY / POSITIVE_ONLY / NEGATIVE_ONLY /
DEGENERATE_ZERO) from the training distribution and composes a
gate classifier + one or two base imputers as appropriate.

Key advantages over `ColumnwiseQRFDonorImputer`:

1. Negative values in training are preserved in predictions for
three-sign targets (capital gains, partnership/S-corp income,
farm income, rental income). The v7 `y > 0` bug is structurally
impossible under regime-aware routing.
2. Predictions on three-sign targets never land in the interior
band between ``max(train_neg)`` and ``min(train_pos)`` - the
tripartite gate routes to sign-specific base imputers that each
see only one sign of training data.

This class is a thin columnwise adapter: one `ZeroInflatedImputer`
is fit per target, using `microimpute.QRF` as the base. Fit and
generate work column-by-column so memory scales with the single
largest base imputer, not with the total target count.
"""

def __init__(
self,
condition_vars: list[str],
target_vars: list[str],
n_estimators: int = 100,
nonnegative_vars: set[str] | None = None,
classifier_type: str = "hist_gb",
min_class_count: int = 10,
min_class_fraction: float = 0.01,
seed: int = 42,
) -> None:
self.condition_vars = list(condition_vars)
self.target_vars = list(target_vars)
self.n_estimators = int(n_estimators)
self.nonnegative_vars = set(nonnegative_vars or ())
self.classifier_type = str(classifier_type)
self.min_class_count = int(min_class_count)
self.min_class_fraction = float(min_class_fraction)
self.seed = int(seed)
self._fitted: dict[str, Any] = {}
self._regimes: dict[str, str] = {}

def fit(
self,
data: pd.DataFrame,
*,
weight_col: str | None = "weight",
epochs: int | None = None,
batch_size: int | None = None,
learning_rate: float | None = None,
verbose: bool = False,
) -> RegimeAwareDonorImputer:
del weight_col, epochs, batch_size, learning_rate, verbose

if importlib.util.find_spec("microimpute.models.zero_inflated") is None:
raise ImportError(
"microimpute with microimpute.models.zero_inflated is required "
"for donor_imputer_backend='regime_aware'."
)
if importlib.util.find_spec("quantile_forest") is None:
raise ImportError(
"quantile-forest is required for the RegimeAwareDonorImputer "
"base QRF."
)

from microimpute.models.qrf import QRF
from microimpute.models.zero_inflated import ZeroInflatedImputer

self._fitted = {}
self._regimes = {}
for column in self.target_vars:
subset = data[self.condition_vars + [column]].dropna()
if len(subset) < 25:
continue
# base_imputer_kwargs={} because microimpute 2.x's
# ZeroInflatedImputer._fit_base_single already passes
# log_level="ERROR" to the base, and duplicating it here
# raises TypeError. Upstream fix tracked.
wrapper = ZeroInflatedImputer(
base_imputer_class=QRF,
base_imputer_kwargs={},
min_class_count=self.min_class_count,
min_class_fraction=self.min_class_fraction,
classifier_type=self.classifier_type,
seed=self.seed,
)
fitted = wrapper.fit(
subset,
predictors=list(self.condition_vars),
imputed_variables=[column],
)
self._fitted[column] = fitted
self._regimes[column] = wrapper.get_regime(column)
return self

def generate(
self,
conditions: pd.DataFrame,
seed: int | None = None,
) -> pd.DataFrame:
synthetic = conditions.copy().reset_index(drop=True)
master_seed = self.seed if seed is None else int(seed)
master_rng = np.random.default_rng(master_seed)
for column in self.target_vars:
fitted = self._fitted.get(column)
if fitted is None:
synthetic[column] = np.nan
continue
column_seed = int(
master_rng.integers(0, np.iinfo(np.int32).max, dtype=np.int64)
)
self._reset_prediction_rngs(fitted, seed=column_seed)
preds = fitted.predict(synthetic[self.condition_vars])
values = preds[column].to_numpy(dtype=float)
if column in self.nonnegative_vars:
values = np.maximum(values, 0.0)
synthetic[column] = values
return synthetic

def _reset_prediction_rngs(
self,
obj: Any,
*,
seed: int,
visited: set[int] | None = None,
) -> None:
if visited is None:
visited = set()
if obj is None or isinstance(obj, (str, bytes, int, float, bool)):
return
object_id = id(obj)
if object_id in visited:
return
visited.add(object_id)

if hasattr(obj, "_rng"):
obj._rng = np.random.default_rng(seed)
child_rng = np.random.default_rng(seed)

if isinstance(obj, dict):
children = list(obj.values())
elif isinstance(obj, (list, tuple, set)):
children = list(obj)
else:
children = []
for attr_name in ("models", "_per_variable", "_non_numeric_bundle"):
child = getattr(obj, attr_name, None)
if child is not None:
children.append(child)

for child in children:
child_seed = int(
child_rng.integers(0, np.iinfo(np.int32).max, dtype=np.int64)
)
self._reset_prediction_rngs(
child,
seed=child_seed,
visited=visited,
)
Loading
Loading