Skip to content
1 change: 1 addition & 0 deletions changelog.d/top-tail-income-representation.added.md
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Include PUF aggregate records and inject high-income PUF records into ExtendedCPS to improve top-tail income representation.
152 changes: 152 additions & 0 deletions policyengine_us_data/datasets/cps/extended_cps.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,9 @@

logger = logging.getLogger(__name__)

# Minimum AGI for PUF records to be injected into the ExtendedCPS
AGI_INJECTION_THRESHOLD = 1_000_000

# CPS-only variables that should be QRF-imputed for the PUF clone half
# instead of naively duplicated from the CPS donor. These are
# income-correlated variables that exist only in the CPS; demographics,
Expand Down Expand Up @@ -446,8 +449,157 @@ def generate(self):

new_data = self._rename_imputed_to_inputs(new_data)
new_data = self._drop_formula_variables(new_data)
new_data = self._inject_high_income_puf_records(new_data)
self.save_dataset(new_data)

def _inject_high_income_puf_records(self, new_data):
"""Inject ultra-high-income PUF records into the extended CPS.

The CPS severely under-represents the top of the income
distribution ($5M+ AGI brackets have -95% to -99% calibration
errors). This directly appends PUF records with AGI above
AGI_INJECTION_THRESHOLD as additional households, giving the
reweighter actual high-income observations.

Uses raw PUF arrays for speed — only creates a Microsimulation
briefly to identify high-AGI households, then reads stored
arrays directly instead of calling calculate() per variable.
"""
from policyengine_us import Microsimulation

logger.info("Loading PUF for high-income record injection...")
puf_sim = Microsimulation(dataset=self.puf)

# Identify high-AGI households (only calculation needed)
hh_agi = puf_sim.calculate("adjusted_gross_income", map_to="household").values
tbs = puf_sim.tax_benefit_system
del puf_sim # Free simulation memory immediately

# Load raw PUF arrays (fast, no simulation)
puf_data = self.puf(require=True).load_dataset()
all_hh_ids = puf_data["household_id"]
high_mask_hh = hh_agi > AGI_INJECTION_THRESHOLD
high_hh_id_set = set(all_hh_ids[high_mask_hh])

n_high_hh = int(high_mask_hh.sum())
if n_high_hh == 0:
logger.info("No high-income PUF households found")
return new_data

logger.info(
"Injecting %d PUF households with AGI > $%d",
n_high_hh,
AGI_INJECTION_THRESHOLD,
)

# Build entity-level masks from raw arrays
high_hh_id_arr = np.fromiter(high_hh_id_set, dtype=float)
person_mask = np.isin(
puf_data["person_household_id"],
high_hh_id_arr,
)
# In PUF, household = tax_unit = spm_unit = family
group_mask = np.isin(
puf_data["household_id"],
high_hh_id_arr,
)
# Marital units: find which belong to high-income persons
high_marital_ids = np.unique(puf_data["person_marital_unit_id"][person_mask])
marital_mask = np.isin(
puf_data["marital_unit_id"],
high_marital_ids,
)

entity_masks = {
"person": person_mask,
"household": group_mask,
"tax_unit": group_mask,
"spm_unit": group_mask,
"family": group_mask,
"marital_unit": marital_mask,
}

logger.info(
"High-income record counts: persons=%d, households=%d, marital_units=%d",
person_mask.sum(),
group_mask.sum(),
marital_mask.sum(),
)

# Compute ID offset to avoid collisions with existing data
id_offset = 0
for key in new_data:
if key.endswith("_id"):
vals = new_data[key][self.time_period]
if vals.dtype.kind in ("f", "i", "u"):
id_offset = max(id_offset, int(vals.max()))
id_offset += 1_000_000

# For each variable in new_data, read from raw PUF arrays
# (no calculate() calls — uses stored values directly)
n_appended = 0
for variable in list(new_data.keys()):
var_meta = tbs.variables.get(variable)
if var_meta is None:
continue

entity = var_meta.entity.key
if entity not in entity_masks:
continue

mask = entity_masks[entity]

# Read from raw PUF arrays if available; pad with
# defaults for variables not in PUF (CPS-only variables)
existing = new_data[variable][self.time_period]
n_inject = int(mask.sum())

if variable in puf_data:
puf_values = puf_data[variable]
if hasattr(puf_values, "__array__"):
puf_values = np.asarray(puf_values)
else:
puf_values = np.zeros(n_inject, dtype=existing.dtype)

if len(puf_values) != len(mask):
# Variable has wrong entity length — pad instead
puf_values = np.zeros(n_inject, dtype=existing.dtype)

puf_subset = (
puf_values[mask]
if len(puf_values) > len(mask) or len(puf_values) == len(mask)
else puf_values
)

# Offset IDs to avoid collisions
if variable.endswith("_id") and puf_subset.dtype.kind in (
"f",
"i",
"u",
):
puf_subset = puf_subset + id_offset

# Match dtypes to avoid object arrays that HDF5 can't save
try:
puf_subset = np.array(puf_subset).astype(existing.dtype)
except (ValueError, TypeError):
# Can't cast — pad with zeros to keep lengths aligned
logger.warning(
"Padding %s with defaults: cannot cast PUF dtype %s to %s",
variable,
puf_subset.dtype,
existing.dtype,
)
puf_subset = np.zeros(len(puf_subset), dtype=existing.dtype)

new_data[variable][self.time_period] = np.concatenate(
[existing, puf_subset]
)
n_appended += 1

logger.info("Appended PUF values for %d variables", n_appended)
return new_data

@classmethod
def _rename_imputed_to_inputs(cls, data):
"""Rename QRF-imputed formula vars to their leaf inputs.
Expand Down
77 changes: 76 additions & 1 deletion policyengine_us_data/datasets/puf/puf.py
Original file line number Diff line number Diff line change
Expand Up @@ -300,6 +300,81 @@ def decode_age_dependent(age_range: int) -> int:
return rng.integers(low=lower, high=upper, endpoint=False)


MARS_IMPUTATION_PREDICTORS = [
"E00200",
"E00300",
"E00600",
"E01000",
"E00900",
"E26270",
"E02400",
"E01500",
"XTOT",
]


def impute_aggregate_mars(puf: pd.DataFrame) -> pd.DataFrame:
"""Impute MARS for aggregate records using QRF on income variables.

PUF aggregate records (MARS=0) have complete income/deduction data
but no demographics. MARS is needed as a predictor for the
downstream demographic QRF in impute_missing_demographics().

We train a QRF on regular records' income profiles to predict
MARS, then impute it for the aggregate records. All other
demographics (AGERANGE, GENDER, EARNSPLIT, AGEDP1-3) are left
for impute_missing_demographics() to handle.
"""
from microimpute.models.qrf import QRF

agg_mask = puf.MARS == 0
n_agg = agg_mask.sum()
if n_agg == 0:
return puf

reg = puf[puf.MARS != 0].copy()

# Use available income variables as predictors for MARS
predictors = [c for c in MARS_IMPUTATION_PREDICTORS if c in puf.columns]

# Train on a sample of regular records (sample() returns a copy)
train = reg.sample(n=min(10_000, len(reg)), random_state=42)[
predictors + ["MARS"]
].fillna(0)

qrf = QRF()
fitted = qrf.fit(
X_train=train,
predictors=predictors,
imputed_variables=["MARS"],
)

# Predict MARS for aggregate records
agg_data = puf.loc[agg_mask, predictors].fillna(0)
predicted = fitted.predict(X_test=agg_data)
# QRF outputs continuous values; round to nearest valid MARS
mars_imputed = predicted["MARS"].values.round().astype(int)
mars_imputed = np.clip(mars_imputed, 1, 4)
puf.loc[agg_mask, "MARS"] = mars_imputed

# Adjust XTOT for joint filers (need at least 2 exemptions)
joint_agg = agg_mask & (puf.MARS == 2)
puf.loc[joint_agg, "XTOT"] = puf.loc[joint_agg, "XTOT"].clip(lower=2)

# DSI and EIC are predictors in the downstream demographic QRF:
# ultra-high-income filers are never dependents and never get EITC
if "DSI" in puf.columns:
puf.loc[agg_mask, "DSI"] = 0
if "EIC" in puf.columns:
puf.loc[agg_mask, "EIC"] = 0

# AGERANGE, GENDER, EARNSPLIT, AGEDP1-3 are left unset —
# impute_missing_demographics() handles them via QRF using
# [E00200, MARS, DSI, EIC, XTOT] as predictors.

return puf


def preprocess_puf(puf: pd.DataFrame) -> pd.DataFrame:
# Add variable renames
puf.S006 = puf.S006 / 100
Expand Down Expand Up @@ -552,7 +627,7 @@ def generate(self):
self.save_dataset(arrays)
return

puf = puf[puf.MARS != 0] # Remove aggregate records
puf = impute_aggregate_mars(puf)

original_recid = puf.RECID.values.copy()
puf = preprocess_puf(puf)
Expand Down
7 changes: 4 additions & 3 deletions policyengine_us_data/tests/test_datasets/test_enhanced_cps.py
Original file line number Diff line number Diff line change
Expand Up @@ -216,9 +216,10 @@ def test_immigration_status_diversity():
"Immigration status not being read from data."
)

# Also check that we have a reasonable percentage of citizens (should be 85-90%)
assert 80 < citizen_pct < 95, (
f"Citizen percentage ({citizen_pct:.1f}%) outside expected range (80-95%)"
# Check reasonable percentage of citizens (85-90% for CPS, higher with
# injected PUF records since tax filers are almost all citizens)
assert 80 < citizen_pct < 98, (
f"Citizen percentage ({citizen_pct:.1f}%) outside expected range (80-98%)"
)

# Check that we have some non-citizens
Expand Down
Loading