diff --git a/src/microplex_us/pipelines/donor_imputers.py b/src/microplex_us/pipelines/donor_imputers.py new file mode 100644 index 0000000..bc1bc3f --- /dev/null +++ b/src/microplex_us/pipelines/donor_imputers.py @@ -0,0 +1,304 @@ +"""Donor imputer implementations for US pipeline donor synthesis.""" + +from __future__ import annotations + +import importlib.util +from typing import Any + +import numpy as np +import pandas as pd +from sklearn.ensemble import RandomForestClassifier + + +class ColumnwiseQRFDonorImputer: + """Columnwise QRF donor imputer, optionally with zero-inflated support.""" + + def __init__( + self, + *, + condition_vars: list[str], + target_vars: list[str], + n_estimators: int = 100, + zero_inflated_vars: set[str] | None = None, + nonnegative_vars: set[str] | None = None, + zero_threshold: float = 0.05, + quantiles: tuple[float, ...] = (0.05, 0.1, 0.25, 0.5, 0.75, 0.9, 0.95), + ) -> None: + self.condition_vars = list(condition_vars) + self.target_vars = list(target_vars) + self.n_estimators = int(n_estimators) + self.zero_inflated_vars = set(zero_inflated_vars or ()) + self.nonnegative_vars = set(nonnegative_vars or ()) + self.zero_threshold = float(zero_threshold) + self.quantiles = tuple(float(value) for value in quantiles) + self._models: dict[str, Any] = {} + self._zero_models: dict[str, RandomForestClassifier] = {} + + def fit( + self, + data: pd.DataFrame, + *, + weight_col: str | None = "weight", + epochs: int | None = None, + batch_size: int | None = None, + learning_rate: float | None = None, + verbose: bool = False, + ) -> ColumnwiseQRFDonorImputer: + del weight_col, epochs, batch_size, learning_rate, verbose + if importlib.util.find_spec("quantile_forest") is None: + raise ImportError( + "quantile-forest is required for donor_imputer_backend='qrf'" + ) + from quantile_forest import RandomForestQuantileRegressor + + self._models = {} + self._zero_models = {} + for column in self.target_vars: + subset = data[self.condition_vars + [column]].dropna() + if len(subset) < 25: + continue + x_values = subset[self.condition_vars].to_numpy(dtype=float) + y_values = subset[column].to_numpy(dtype=float) + if ( + column in self.zero_inflated_vars + and (y_values == 0).mean() >= self.zero_threshold + and (y_values == 0).sum() >= 10 + and (y_values != 0).sum() >= 10 + ): + # Gate trained as zero vs nonzero (both signs), not as + # zero-or-negative vs positive. The old `y > 0` label + # silently dropped every negative training row along + # with zeros, so the QRF below only ever saw positive + # rows and could never emit a negative prediction - the + # v7 bug that blanked the negative tail of capital + # gains, partnership income, farm income, etc. The + # `!= 0` label is the minimal fix; the full upgrade to + # `microimpute.ZeroInflatedImputer` (regime-aware + # tripartite routing with separate positive / negative + # QRFs) is tracked as a follow-up. + zero_model = RandomForestClassifier( + n_estimators=max(50, self.n_estimators // 2), + random_state=42, + n_jobs=-1, + ) + zero_model.fit(x_values, (y_values != 0).astype(int)) + self._zero_models[column] = zero_model + x_values = x_values[y_values != 0] + y_values = y_values[y_values != 0] + if len(y_values) < 25: + continue + model = RandomForestQuantileRegressor( + n_estimators=self.n_estimators, + random_state=42, + n_jobs=-1, + ) + model.fit(x_values, y_values) + self._models[column] = model + return self + + def generate( + self, + conditions: pd.DataFrame, + seed: int | None = None, + ) -> pd.DataFrame: + rng = np.random.RandomState(seed or 42) + synthetic = conditions.copy().reset_index(drop=True) + x_values = synthetic[self.condition_vars].to_numpy(dtype=float) + for column in self.target_vars: + model = self._models.get(column) + if model is None: + synthetic[column] = np.nan + continue + values = np.zeros(len(synthetic), dtype=float) + target_rows = np.ones(len(synthetic), dtype=bool) + zero_model = self._zero_models.get(column) + if zero_model is not None: + probabilities = zero_model.predict_proba(x_values) + positive_probs = ( + probabilities[:, 1] + if probabilities.shape[1] > 1 + else np.zeros(len(synthetic), dtype=float) + ) + target_rows = rng.random(len(synthetic)) < positive_probs + values[:] = 0.0 + if target_rows.any(): + predictions = model.predict( + x_values[target_rows], + quantiles=list(self.quantiles), + ) + quantile_choices = rng.choice( + len(self.quantiles), size=target_rows.sum() + ) + draws = predictions[np.arange(target_rows.sum()), quantile_choices] + if column in self.nonnegative_vars: + draws = np.maximum(draws, 0.0) + values[target_rows] = draws + synthetic[column] = values + return synthetic + + +class RegimeAwareDonorImputer: + """Donor imputer that wraps `microimpute.ZeroInflatedImputer` per column. + + Each target is fit with an independent `ZeroInflatedImputer`, which + auto-detects one of seven regimes (THREE_SIGN / ZI_POSITIVE / + ZI_NEGATIVE / SIGN_ONLY / POSITIVE_ONLY / NEGATIVE_ONLY / + DEGENERATE_ZERO) from the training distribution and composes a + gate classifier + one or two base imputers as appropriate. + + Key advantages over `ColumnwiseQRFDonorImputer`: + + 1. Negative values in training are preserved in predictions for + three-sign targets (capital gains, partnership/S-corp income, + farm income, rental income). The v7 `y > 0` bug is structurally + impossible under regime-aware routing. + 2. Predictions on three-sign targets never land in the interior + band between ``max(train_neg)`` and ``min(train_pos)`` - the + tripartite gate routes to sign-specific base imputers that each + see only one sign of training data. + + This class is a thin columnwise adapter: one `ZeroInflatedImputer` + is fit per target, using `microimpute.QRF` as the base. Fit and + generate work column-by-column so memory scales with the single + largest base imputer, not with the total target count. + """ + + def __init__( + self, + condition_vars: list[str], + target_vars: list[str], + n_estimators: int = 100, + nonnegative_vars: set[str] | None = None, + classifier_type: str = "hist_gb", + min_class_count: int = 10, + min_class_fraction: float = 0.01, + seed: int = 42, + ) -> None: + self.condition_vars = list(condition_vars) + self.target_vars = list(target_vars) + self.n_estimators = int(n_estimators) + self.nonnegative_vars = set(nonnegative_vars or ()) + self.classifier_type = str(classifier_type) + self.min_class_count = int(min_class_count) + self.min_class_fraction = float(min_class_fraction) + self.seed = int(seed) + self._fitted: dict[str, Any] = {} + self._regimes: dict[str, str] = {} + + def fit( + self, + data: pd.DataFrame, + *, + weight_col: str | None = "weight", + epochs: int | None = None, + batch_size: int | None = None, + learning_rate: float | None = None, + verbose: bool = False, + ) -> RegimeAwareDonorImputer: + del weight_col, epochs, batch_size, learning_rate, verbose + + if importlib.util.find_spec("microimpute.models.zero_inflated") is None: + raise ImportError( + "microimpute with microimpute.models.zero_inflated is required " + "for donor_imputer_backend='regime_aware'." + ) + if importlib.util.find_spec("quantile_forest") is None: + raise ImportError( + "quantile-forest is required for the RegimeAwareDonorImputer " + "base QRF." + ) + + from microimpute.models.qrf import QRF + from microimpute.models.zero_inflated import ZeroInflatedImputer + + self._fitted = {} + self._regimes = {} + for column in self.target_vars: + subset = data[self.condition_vars + [column]].dropna() + if len(subset) < 25: + continue + # base_imputer_kwargs={} because microimpute 2.x's + # ZeroInflatedImputer._fit_base_single already passes + # log_level="ERROR" to the base, and duplicating it here + # raises TypeError. Upstream fix tracked. + wrapper = ZeroInflatedImputer( + base_imputer_class=QRF, + base_imputer_kwargs={}, + min_class_count=self.min_class_count, + min_class_fraction=self.min_class_fraction, + classifier_type=self.classifier_type, + seed=self.seed, + ) + fitted = wrapper.fit( + subset, + predictors=list(self.condition_vars), + imputed_variables=[column], + ) + self._fitted[column] = fitted + self._regimes[column] = wrapper.get_regime(column) + return self + + def generate( + self, + conditions: pd.DataFrame, + seed: int | None = None, + ) -> pd.DataFrame: + synthetic = conditions.copy().reset_index(drop=True) + master_seed = self.seed if seed is None else int(seed) + master_rng = np.random.default_rng(master_seed) + for column in self.target_vars: + fitted = self._fitted.get(column) + if fitted is None: + synthetic[column] = np.nan + continue + column_seed = int( + master_rng.integers(0, np.iinfo(np.int32).max, dtype=np.int64) + ) + self._reset_prediction_rngs(fitted, seed=column_seed) + preds = fitted.predict(synthetic[self.condition_vars]) + values = preds[column].to_numpy(dtype=float) + if column in self.nonnegative_vars: + values = np.maximum(values, 0.0) + synthetic[column] = values + return synthetic + + def _reset_prediction_rngs( + self, + obj: Any, + *, + seed: int, + visited: set[int] | None = None, + ) -> None: + if visited is None: + visited = set() + if obj is None or isinstance(obj, (str, bytes, int, float, bool)): + return + object_id = id(obj) + if object_id in visited: + return + visited.add(object_id) + + if hasattr(obj, "_rng"): + obj._rng = np.random.default_rng(seed) + child_rng = np.random.default_rng(seed) + + if isinstance(obj, dict): + children = list(obj.values()) + elif isinstance(obj, (list, tuple, set)): + children = list(obj) + else: + children = [] + for attr_name in ("models", "_per_variable", "_non_numeric_bundle"): + child = getattr(obj, attr_name, None) + if child is not None: + children.append(child) + + for child in children: + child_seed = int( + child_rng.integers(0, np.iinfo(np.int32).max, dtype=np.int64) + ) + self._reset_prediction_rngs( + child, + seed=child_seed, + visited=visited, + ) diff --git a/src/microplex_us/pipelines/us.py b/src/microplex_us/pipelines/us.py index 076357e..ec541a2 100644 --- a/src/microplex_us/pipelines/us.py +++ b/src/microplex_us/pipelines/us.py @@ -2,7 +2,6 @@ from __future__ import annotations -import importlib.util import logging import sys import time @@ -40,13 +39,16 @@ from microplex.hierarchical import TaxUnitOptimizer from microplex.synthesizer import Synthesizer from microplex.targets import TargetQuery, TargetSpec -from sklearn.ensemble import RandomForestClassifier from microplex_us.pe_source_impute_engine import ( PE_SOURCE_IMPUTE_BLOCK_ENGINE, PESourceImputeBlockRunRequest, PESourceImputeConditionedBlockRunRequest, ) +from microplex_us.pipelines.donor_imputers import ( + ColumnwiseQRFDonorImputer, + RegimeAwareDonorImputer, +) from microplex_us.pipelines.pe_l0 import PolicyEngineL0Calibrator from microplex_us.pipelines.pe_native_optimization import ( optimize_policyengine_us_native_loss_dataset, @@ -176,300 +178,6 @@ def _emit_us_pipeline_progress(message: str, /, **context: object) -> None: AGE_BINS = [0, 18, 35, 55, 65, np.inf] -class ColumnwiseQRFDonorImputer: - """Columnwise QRF donor imputer, optionally with zero-inflated support.""" - - def __init__( - self, - *, - condition_vars: list[str], - target_vars: list[str], - n_estimators: int = 100, - zero_inflated_vars: set[str] | None = None, - nonnegative_vars: set[str] | None = None, - zero_threshold: float = 0.05, - quantiles: tuple[float, ...] = (0.05, 0.1, 0.25, 0.5, 0.75, 0.9, 0.95), - ) -> None: - self.condition_vars = list(condition_vars) - self.target_vars = list(target_vars) - self.n_estimators = int(n_estimators) - self.zero_inflated_vars = set(zero_inflated_vars or ()) - self.nonnegative_vars = set(nonnegative_vars or ()) - self.zero_threshold = float(zero_threshold) - self.quantiles = tuple(float(value) for value in quantiles) - self._models: dict[str, Any] = {} - self._zero_models: dict[str, RandomForestClassifier] = {} - - def fit( - self, - data: pd.DataFrame, - *, - weight_col: str | None = "weight", - epochs: int | None = None, - batch_size: int | None = None, - learning_rate: float | None = None, - verbose: bool = False, - ) -> ColumnwiseQRFDonorImputer: - del weight_col, epochs, batch_size, learning_rate, verbose - if importlib.util.find_spec("quantile_forest") is None: - raise ImportError( - "quantile-forest is required for donor_imputer_backend='qrf'" - ) - from quantile_forest import RandomForestQuantileRegressor - - self._models = {} - self._zero_models = {} - for column in self.target_vars: - subset = data[self.condition_vars + [column]].dropna() - if len(subset) < 25: - continue - x_values = subset[self.condition_vars].to_numpy(dtype=float) - y_values = subset[column].to_numpy(dtype=float) - if ( - column in self.zero_inflated_vars - and (y_values == 0).mean() >= self.zero_threshold - and (y_values == 0).sum() >= 10 - and (y_values != 0).sum() >= 10 - ): - # Gate trained as zero vs nonzero (both signs), not as - # zero-or-negative vs positive. The old `y > 0` label - # silently dropped every negative training row along - # with zeros, so the QRF below only ever saw positive - # rows and could never emit a negative prediction — the - # v7 bug that blanked the negative tail of capital - # gains, partnership income, farm income, etc. The - # `!= 0` label is the minimal fix; the full upgrade to - # `microimpute.ZeroInflatedImputer` (regime-aware - # tripartite routing with separate positive / negative - # QRFs) is tracked as a follow-up. - zero_model = RandomForestClassifier( - n_estimators=max(50, self.n_estimators // 2), - random_state=42, - n_jobs=-1, - ) - zero_model.fit(x_values, (y_values != 0).astype(int)) - self._zero_models[column] = zero_model - x_values = x_values[y_values != 0] - y_values = y_values[y_values != 0] - if len(y_values) < 25: - continue - model = RandomForestQuantileRegressor( - n_estimators=self.n_estimators, - random_state=42, - n_jobs=-1, - ) - model.fit(x_values, y_values) - self._models[column] = model - return self - - def generate( - self, - conditions: pd.DataFrame, - seed: int | None = None, - ) -> pd.DataFrame: - rng = np.random.RandomState(seed or 42) - synthetic = conditions.copy().reset_index(drop=True) - x_values = synthetic[self.condition_vars].to_numpy(dtype=float) - for column in self.target_vars: - model = self._models.get(column) - if model is None: - synthetic[column] = np.nan - continue - values = np.zeros(len(synthetic), dtype=float) - target_rows = np.ones(len(synthetic), dtype=bool) - zero_model = self._zero_models.get(column) - if zero_model is not None: - probabilities = zero_model.predict_proba(x_values) - positive_probs = ( - probabilities[:, 1] - if probabilities.shape[1] > 1 - else np.zeros(len(synthetic), dtype=float) - ) - target_rows = rng.random(len(synthetic)) < positive_probs - values[:] = 0.0 - if target_rows.any(): - predictions = model.predict( - x_values[target_rows], - quantiles=list(self.quantiles), - ) - quantile_choices = rng.choice( - len(self.quantiles), size=target_rows.sum() - ) - draws = predictions[np.arange(target_rows.sum()), quantile_choices] - if column in self.nonnegative_vars: - draws = np.maximum(draws, 0.0) - values[target_rows] = draws - synthetic[column] = values - return synthetic - - -class RegimeAwareDonorImputer: - """Donor imputer that wraps `microimpute.ZeroInflatedImputer` per column. - - Each target is fit with an independent `ZeroInflatedImputer`, which - auto-detects one of seven regimes (THREE_SIGN / ZI_POSITIVE / - ZI_NEGATIVE / SIGN_ONLY / POSITIVE_ONLY / NEGATIVE_ONLY / - DEGENERATE_ZERO) from the training distribution and composes a - gate classifier + one or two base imputers as appropriate. - - Key advantages over `ColumnwiseQRFDonorImputer`: - - 1. Negative values in training are preserved in predictions for - three-sign targets (capital gains, partnership/S-corp income, - farm income, rental income). The v7 `y > 0` bug is structurally - impossible under regime-aware routing. - 2. Predictions on three-sign targets never land in the interior - band between ``max(train_neg)`` and ``min(train_pos)`` — the - tripartite gate routes to sign-specific base imputers that each - see only one sign of training data. - - This class is a thin columnwise adapter: one `ZeroInflatedImputer` - is fit per target, using `microimpute.QRF` as the base. Fit and - generate work column-by-column so memory scales with the single - largest base imputer, not with the total target count. - """ - - def __init__( - self, - condition_vars: list[str], - target_vars: list[str], - n_estimators: int = 100, - nonnegative_vars: set[str] | None = None, - classifier_type: str = "hist_gb", - min_class_count: int = 10, - min_class_fraction: float = 0.01, - seed: int = 42, - ) -> None: - self.condition_vars = list(condition_vars) - self.target_vars = list(target_vars) - self.n_estimators = int(n_estimators) - self.nonnegative_vars = set(nonnegative_vars or ()) - self.classifier_type = str(classifier_type) - self.min_class_count = int(min_class_count) - self.min_class_fraction = float(min_class_fraction) - self.seed = int(seed) - self._fitted: dict[str, Any] = {} - self._regimes: dict[str, str] = {} - - def fit( - self, - data: pd.DataFrame, - *, - weight_col: str | None = "weight", - epochs: int | None = None, - batch_size: int | None = None, - learning_rate: float | None = None, - verbose: bool = False, - ) -> RegimeAwareDonorImputer: - del weight_col, epochs, batch_size, learning_rate, verbose - - if importlib.util.find_spec("microimpute") is None: - raise ImportError( - "microimpute>=2.1 is required for donor_imputer_backend=" - "'regime_aware'; install with `uv pip install microimpute`." - ) - if importlib.util.find_spec("quantile_forest") is None: - raise ImportError( - "quantile-forest is required for the RegimeAwareDonorImputer " - "base QRF." - ) - - from microimpute.models.qrf import QRF - from microimpute.models.zero_inflated import ZeroInflatedImputer - - self._fitted = {} - self._regimes = {} - for column in self.target_vars: - subset = data[self.condition_vars + [column]].dropna() - if len(subset) < 25: - continue - # base_imputer_kwargs={} because microimpute 2.x's - # ZeroInflatedImputer._fit_base_single already passes - # log_level="ERROR" to the base, and duplicating it here - # raises TypeError. Upstream fix tracked. - wrapper = ZeroInflatedImputer( - base_imputer_class=QRF, - base_imputer_kwargs={}, - min_class_count=self.min_class_count, - min_class_fraction=self.min_class_fraction, - classifier_type=self.classifier_type, - seed=self.seed, - ) - fitted = wrapper.fit( - subset, - predictors=list(self.condition_vars), - imputed_variables=[column], - ) - self._fitted[column] = fitted - self._regimes[column] = wrapper.get_regime(column) - return self - - def generate( - self, - conditions: pd.DataFrame, - seed: int | None = None, - ) -> pd.DataFrame: - synthetic = conditions.copy().reset_index(drop=True) - master_seed = self.seed if seed is None else int(seed) - master_rng = np.random.default_rng(master_seed) - for column in self.target_vars: - fitted = self._fitted.get(column) - if fitted is None: - synthetic[column] = np.nan - continue - column_seed = int( - master_rng.integers(0, np.iinfo(np.int32).max, dtype=np.int64) - ) - self._reset_prediction_rngs(fitted, seed=column_seed) - preds = fitted.predict(synthetic[self.condition_vars]) - values = preds[column].to_numpy(dtype=float) - if column in self.nonnegative_vars: - values = np.maximum(values, 0.0) - synthetic[column] = values - return synthetic - - def _reset_prediction_rngs( - self, - obj: Any, - *, - seed: int, - visited: set[int] | None = None, - ) -> None: - if visited is None: - visited = set() - if obj is None or isinstance(obj, (str, bytes, int, float, bool)): - return - object_id = id(obj) - if object_id in visited: - return - visited.add(object_id) - - if hasattr(obj, "_rng"): - obj._rng = np.random.default_rng(seed) - child_rng = np.random.default_rng(seed) - - if isinstance(obj, dict): - children = list(obj.values()) - elif isinstance(obj, (list, tuple, set)): - children = list(obj) - else: - children = [] - for attr_name in ("models", "_per_variable", "_non_numeric_bundle"): - child = getattr(obj, attr_name, None) - if child is not None: - children.append(child) - - for child in children: - child_seed = int( - child_rng.integers(0, np.iinfo(np.int32).max, dtype=np.int64) - ) - self._reset_prediction_rngs( - child, - seed=child_seed, - visited=visited, - ) - - AGE_LABELS = ["0-17", "18-34", "35-54", "55-64", "65+"] INCOME_BINS = [-np.inf, 25_000, 50_000, 100_000, np.inf] INCOME_LABELS = ["<25k", "25-50k", "50-100k", "100k+"] diff --git a/tests/pipelines/test_regime_aware_donor_imputer.py b/tests/pipelines/test_regime_aware_donor_imputer.py index 34e5274..0649287 100644 --- a/tests/pipelines/test_regime_aware_donor_imputer.py +++ b/tests/pipelines/test_regime_aware_donor_imputer.py @@ -36,6 +36,7 @@ pytest.importorskip("quantile_forest") pytest.importorskip("microimpute") +pytest.importorskip("microimpute.models.zero_inflated") def _three_sign_frame_with_gap(