diff --git a/changelog.d/top-tail-income-representation.added.md b/changelog.d/top-tail-income-representation.added.md new file mode 100644 index 00000000..e7b5bea5 --- /dev/null +++ b/changelog.d/top-tail-income-representation.added.md @@ -0,0 +1 @@ +Include PUF aggregate records and inject high-income PUF records into ExtendedCPS to improve top-tail income representation. diff --git a/policyengine_us_data/datasets/cps/extended_cps.py b/policyengine_us_data/datasets/cps/extended_cps.py index f38d5746..094049fe 100644 --- a/policyengine_us_data/datasets/cps/extended_cps.py +++ b/policyengine_us_data/datasets/cps/extended_cps.py @@ -16,6 +16,9 @@ logger = logging.getLogger(__name__) +# Minimum AGI for PUF records to be injected into the ExtendedCPS +AGI_INJECTION_THRESHOLD = 1_000_000 + # CPS-only variables that should be QRF-imputed for the PUF clone half # instead of naively duplicated from the CPS donor. These are # income-correlated variables that exist only in the CPS; demographics, @@ -446,8 +449,153 @@ def generate(self): new_data = self._rename_imputed_to_inputs(new_data) new_data = self._drop_formula_variables(new_data) + new_data = self._inject_high_income_puf_records(new_data) self.save_dataset(new_data) + def _inject_high_income_puf_records(self, new_data): + """Inject ultra-high-income PUF records into the extended CPS. + + The CPS severely under-represents the top of the income + distribution ($5M+ AGI brackets have -95% to -99% calibration + errors). This directly appends PUF records with AGI above + AGI_INJECTION_THRESHOLD as additional households, giving the + reweighter actual high-income observations. + + Uses raw PUF arrays for speed — only creates a Microsimulation + briefly to identify high-AGI households, then reads stored + arrays directly instead of calling calculate() per variable. + """ + from policyengine_us import Microsimulation + + logger.info("Loading PUF for high-income record injection...") + puf_sim = Microsimulation(dataset=self.puf) + + # Identify high-AGI households (only calculation needed) + hh_agi = puf_sim.calculate("adjusted_gross_income", map_to="household").values + tbs = puf_sim.tax_benefit_system + del puf_sim # Free simulation memory immediately + + # Load raw PUF arrays (fast, no simulation) + puf_data = self.puf(require=True).load_dataset() + all_hh_ids = puf_data["household_id"] + high_mask_hh = hh_agi > AGI_INJECTION_THRESHOLD + high_hh_id_set = set(all_hh_ids[high_mask_hh]) + + n_high_hh = int(high_mask_hh.sum()) + if n_high_hh == 0: + logger.info("No high-income PUF households found") + return new_data + + logger.info( + "Injecting %d PUF households with AGI > $%d", + n_high_hh, + AGI_INJECTION_THRESHOLD, + ) + + # Build entity-level masks from raw arrays + high_hh_id_arr = np.fromiter(high_hh_id_set, dtype=float) + person_mask = np.isin( + puf_data["person_household_id"], + high_hh_id_arr, + ) + # In PUF, household = tax_unit = spm_unit = family + group_mask = np.isin( + puf_data["household_id"], + high_hh_id_arr, + ) + # Marital units: find which belong to high-income persons + high_marital_ids = np.unique(puf_data["person_marital_unit_id"][person_mask]) + marital_mask = np.isin( + puf_data["marital_unit_id"], + high_marital_ids, + ) + + entity_masks = { + "person": person_mask, + "household": group_mask, + "tax_unit": group_mask, + "spm_unit": group_mask, + "family": group_mask, + "marital_unit": marital_mask, + } + + logger.info( + "High-income record counts: persons=%d, households=%d, marital_units=%d", + person_mask.sum(), + group_mask.sum(), + marital_mask.sum(), + ) + + # Compute ID offset to avoid collisions with existing data + id_offset = 0 + for key in new_data: + if key.endswith("_id"): + vals = new_data[key][self.time_period] + if vals.dtype.kind in ("f", "i", "u"): + id_offset = max(id_offset, int(vals.max())) + id_offset += 1_000_000 + + # For each variable in new_data, read from raw PUF arrays + # (no calculate() calls — uses stored values directly) + n_appended = 0 + for variable in list(new_data.keys()): + var_meta = tbs.variables.get(variable) + if var_meta is None: + continue + + entity = var_meta.entity.key + if entity not in entity_masks: + continue + + mask = entity_masks[entity] + + # Read from raw PUF arrays if available; pad with + # defaults for variables not in PUF (CPS-only variables) + existing = new_data[variable][self.time_period] + n_inject = int(mask.sum()) + + # For string/enum vars, use the most common existing value + # as default (empty string is not a valid enum value) + if existing.dtype.kind in ("S", "U"): + default_val = np.full(n_inject, existing[0], dtype=existing.dtype) + else: + default_val = np.zeros(n_inject, dtype=existing.dtype) + + use_puf = ( + variable in puf_data + and len(np.asarray(puf_data[variable])) == len(mask) + and np.asarray(puf_data[variable]).dtype.kind == existing.dtype.kind + ) + + if use_puf: + puf_values = np.asarray(puf_data[variable]) + puf_subset = puf_values[mask] + else: + puf_subset = default_val + + # Offset IDs to avoid collisions + if variable.endswith("_id") and puf_subset.dtype.kind in ( + "f", + "i", + "u", + ): + puf_subset = puf_subset + id_offset + + # Ensure dtype matches existing array + if puf_subset.dtype != existing.dtype: + try: + puf_subset = puf_subset.astype(existing.dtype) + except (ValueError, TypeError): + puf_subset = default_val + + new_data[variable][self.time_period] = np.concatenate( + [existing, puf_subset] + ) + n_appended += 1 + + logger.info("Appended PUF values for %d variables", n_appended) + return new_data + @classmethod def _rename_imputed_to_inputs(cls, data): """Rename QRF-imputed formula vars to their leaf inputs. diff --git a/policyengine_us_data/datasets/puf/puf.py b/policyengine_us_data/datasets/puf/puf.py index 040098c1..8fb6a82a 100644 --- a/policyengine_us_data/datasets/puf/puf.py +++ b/policyengine_us_data/datasets/puf/puf.py @@ -300,6 +300,81 @@ def decode_age_dependent(age_range: int) -> int: return rng.integers(low=lower, high=upper, endpoint=False) +MARS_IMPUTATION_PREDICTORS = [ + "E00200", + "E00300", + "E00600", + "E01000", + "E00900", + "E26270", + "E02400", + "E01500", + "XTOT", +] + + +def impute_aggregate_mars(puf: pd.DataFrame) -> pd.DataFrame: + """Impute MARS for aggregate records using QRF on income variables. + + PUF aggregate records (MARS=0) have complete income/deduction data + but no demographics. MARS is needed as a predictor for the + downstream demographic QRF in impute_missing_demographics(). + + We train a QRF on regular records' income profiles to predict + MARS, then impute it for the aggregate records. All other + demographics (AGERANGE, GENDER, EARNSPLIT, AGEDP1-3) are left + for impute_missing_demographics() to handle. + """ + from microimpute.models.qrf import QRF + + agg_mask = puf.MARS == 0 + n_agg = agg_mask.sum() + if n_agg == 0: + return puf + + reg = puf[puf.MARS != 0].copy() + + # Use available income variables as predictors for MARS + predictors = [c for c in MARS_IMPUTATION_PREDICTORS if c in puf.columns] + + # Train on a sample of regular records (sample() returns a copy) + train = reg.sample(n=min(10_000, len(reg)), random_state=42)[ + predictors + ["MARS"] + ].fillna(0) + + qrf = QRF() + fitted = qrf.fit( + X_train=train, + predictors=predictors, + imputed_variables=["MARS"], + ) + + # Predict MARS for aggregate records + agg_data = puf.loc[agg_mask, predictors].fillna(0) + predicted = fitted.predict(X_test=agg_data) + # QRF outputs continuous values; round to nearest valid MARS + mars_imputed = predicted["MARS"].values.round().astype(int) + mars_imputed = np.clip(mars_imputed, 1, 4) + puf.loc[agg_mask, "MARS"] = mars_imputed + + # Adjust XTOT for joint filers (need at least 2 exemptions) + joint_agg = agg_mask & (puf.MARS == 2) + puf.loc[joint_agg, "XTOT"] = puf.loc[joint_agg, "XTOT"].clip(lower=2) + + # DSI and EIC are predictors in the downstream demographic QRF: + # ultra-high-income filers are never dependents and never get EITC + if "DSI" in puf.columns: + puf.loc[agg_mask, "DSI"] = 0 + if "EIC" in puf.columns: + puf.loc[agg_mask, "EIC"] = 0 + + # AGERANGE, GENDER, EARNSPLIT, AGEDP1-3 are left unset — + # impute_missing_demographics() handles them via QRF using + # [E00200, MARS, DSI, EIC, XTOT] as predictors. + + return puf + + def preprocess_puf(puf: pd.DataFrame) -> pd.DataFrame: # Add variable renames puf.S006 = puf.S006 / 100 @@ -552,7 +627,7 @@ def generate(self): self.save_dataset(arrays) return - puf = puf[puf.MARS != 0] # Remove aggregate records + puf = impute_aggregate_mars(puf) original_recid = puf.RECID.values.copy() puf = preprocess_puf(puf) diff --git a/policyengine_us_data/tests/test_datasets/test_enhanced_cps.py b/policyengine_us_data/tests/test_datasets/test_enhanced_cps.py index 298de5a4..e6673fbd 100644 --- a/policyengine_us_data/tests/test_datasets/test_enhanced_cps.py +++ b/policyengine_us_data/tests/test_datasets/test_enhanced_cps.py @@ -216,9 +216,10 @@ def test_immigration_status_diversity(): "Immigration status not being read from data." ) - # Also check that we have a reasonable percentage of citizens (should be 85-90%) - assert 80 < citizen_pct < 95, ( - f"Citizen percentage ({citizen_pct:.1f}%) outside expected range (80-95%)" + # Check reasonable percentage of citizens (85-90% for CPS, higher with + # injected PUF records since tax filers are almost all citizens) + assert 80 < citizen_pct < 98, ( + f"Citizen percentage ({citizen_pct:.1f}%) outside expected range (80-98%)" ) # Check that we have some non-citizens