Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions changelog_entry.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
- bump: minor
changes:
added:
- weeks_unemployed variable from CPS ASEC LKWEEKS
- QRF-based imputation of weeks_unemployed for Extended CPS PUF copy
1 change: 1 addition & 0 deletions policyengine_us_data/datasets/cps/census_cps.py
Original file line number Diff line number Diff line change
Expand Up @@ -261,6 +261,7 @@ class CensusCPS_2018(CensusCPS):
"RNT_VAL",
"SS_VAL",
"UC_VAL",
"LKWEEKS", # Weeks looking for work during the year (Census variable)
"ANN_VAL",
"PNSN_VAL",
"OI_OFF",
Expand Down
4 changes: 4 additions & 0 deletions policyengine_us_data/datasets/cps/cps.py
Original file line number Diff line number Diff line change
Expand Up @@ -423,6 +423,10 @@ def add_personal_income_variables(
cps["social_security_retirement"]
)
cps["unemployment_compensation"] = person.UC_VAL
# Weeks looking for work during the year (Census variable LKWEEKS)
# LKWEEKS: -1 = NIU (Not In Universe), 0 = not looking, 1-52 = weeks
weeks_raw = person.LKWEEKS
cps["weeks_unemployed"] = np.where(weeks_raw == -1, 0, weeks_raw)
# Add pensions and annuities.
cps_pensions = person.PNSN_VAL + person.ANN_VAL
# Assume a constant fraction of pension income is taxable.
Expand Down
109 changes: 109 additions & 0 deletions policyengine_us_data/datasets/cps/extended_cps.py
Original file line number Diff line number Diff line change
Expand Up @@ -176,6 +176,12 @@ def generate(self):
data = cps_sim.dataset.load_dataset()
new_data = {}

# Pre-compute weeks_unemployed imputation for PUF copy
# Preserve relationship between UC and weeks from CPS
puf_weeks_unemployed = impute_weeks_unemployed_for_puf(
cps_sim, y_full_imputations
)

for variable in list(data) + IMPUTED_VARIABLES:
variable_metadata = cps_sim.tax_benefit_system.variables.get(
variable
Expand Down Expand Up @@ -206,6 +212,9 @@ def generate(self):
values = np.concatenate([values, values + values.max()])
elif "_weight" in variable:
values = np.concatenate([values, values * 0])
elif variable == "weeks_unemployed":
# Use imputed weeks for PUF copy to preserve UC relationship
values = np.concatenate([values, puf_weeks_unemployed])
else:
values = np.concatenate([values, values])
new_data[variable] = {
Expand Down Expand Up @@ -320,6 +329,106 @@ def impute_income_variables(
return result


def impute_weeks_unemployed_for_puf(cps_sim, puf_imputations):
"""
Impute weeks_unemployed for the PUF copy using QRF from CPS data.

Uses microimpute's Quantile Random Forest to impute weeks_unemployed
for PUF records based on CPS data, preserving the joint distribution
of weeks with UC, age, and other predictors.

This is the reverse of the income imputation (CPS → PUF instead of
PUF → CPS) because weeks_unemployed exists in CPS but not in PUF.
"""
# Get CPS weeks
try:
cps_weeks = cps_sim.calculate("weeks_unemployed").values
except (ValueError, KeyError):
logging.warning(
"weeks_unemployed not available in CPS, "
"returning zeros for PUF copy"
)
n_persons = len(puf_imputations.index)
return np.zeros(n_persons)

# Predictors available in both CPS and imputed PUF data
WEEKS_PREDICTORS = [
"age",
"is_male",
"tax_unit_is_joint",
"is_tax_unit_head",
"is_tax_unit_spouse",
"is_tax_unit_dependent",
]

# Build training data from CPS
X_train = cps_sim.calculate_dataframe(WEEKS_PREDICTORS)
X_train["weeks_unemployed"] = cps_weeks

# Add UC as predictor if available in imputations (strong predictor)
if "taxable_unemployment_compensation" in puf_imputations.columns:
cps_uc = cps_sim.calculate("unemployment_compensation").values
X_train["unemployment_compensation"] = cps_uc
WEEKS_PREDICTORS = WEEKS_PREDICTORS + ["unemployment_compensation"]

# Build test data for PUF copy
# Use CPS sim to get demographics (same as CPS portion)
X_test = cps_sim.calculate_dataframe(
[p for p in WEEKS_PREDICTORS if p != "unemployment_compensation"]
)

# Add imputed UC if available
if "taxable_unemployment_compensation" in puf_imputations.columns:
X_test["unemployment_compensation"] = puf_imputations[
"taxable_unemployment_compensation"
].values

logging.info(
f"Imputing weeks_unemployed using QRF with "
f"predictors: {WEEKS_PREDICTORS}"
)

# Use QRF to impute weeks
qrf = QRF(
log_level="INFO",
memory_efficient=True,
)

# Sample training data for efficiency
sample_size = min(5000, len(X_train))
if len(X_train) > sample_size:
X_train_sampled = X_train.sample(n=sample_size, random_state=42)
else:
X_train_sampled = X_train

fitted_model = qrf.fit(
X_train=X_train_sampled,
predictors=WEEKS_PREDICTORS,
imputed_variables=["weeks_unemployed"],
n_jobs=1,
)

predictions = fitted_model.predict(X_test=X_test)
imputed_weeks = predictions["weeks_unemployed"].values

# Enforce constraints: 0-52 weeks, 0 if no UC
imputed_weeks = np.clip(imputed_weeks, 0, 52)
if "unemployment_compensation" in X_test.columns:
imputed_weeks = np.where(
X_test["unemployment_compensation"].values > 0,
imputed_weeks,
0,
)

logging.info(
f"Imputed weeks_unemployed for PUF: "
f"{(imputed_weeks > 0).sum()} with weeks > 0, "
f"mean = {imputed_weeks[imputed_weeks > 0].mean():.1f} weeks"
)

return imputed_weeks


class ExtendedCPS_2024(ExtendedCPS):
cps = CPS_2024_Full
puf = PUF_2024
Expand Down
86 changes: 86 additions & 0 deletions tests/test_weeks_unemployed.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,86 @@
"""
Tests for weeks_unemployed variable extraction from CPS ASEC.

The Census CPS ASEC uses LKWEEKS (not IPUMS's WKSUNEM1) for weeks looking for work.
"""

import numpy as np
from pathlib import Path


class TestWeeksUnemployed:
"""Test suite for weeks_unemployed variable."""

def test_lkweeks_in_person_columns(self):
"""Test that LKWEEKS is in PERSON_COLUMNS, not WKSUNEM."""
# Read the source file directly to check column names
census_cps_path = Path(__file__).parent.parent / (
"policyengine_us_data/datasets/cps/census_cps.py"
)
content = census_cps_path.read_text()

# Check for correct variable
assert '"LKWEEKS"' in content, "LKWEEKS should be in PERSON_COLUMNS"
assert (
'"WKSUNEM"' not in content
), "WKSUNEM should not be in PERSON_COLUMNS (Census uses LKWEEKS)"

def test_cps_uses_lkweeks(self):
"""Test that cps.py uses LKWEEKS, not WKSUNEM."""
cps_path = Path(__file__).parent.parent / (
"policyengine_us_data/datasets/cps/cps.py"
)
content = cps_path.read_text()

# Check for correct variable reference
assert "LKWEEKS" in content, "cps.py should reference LKWEEKS"
assert "WKSUNEM" not in content, "cps.py should not reference WKSUNEM"

def test_weeks_unemployed_value_range(self):
"""Test that weeks_unemployed values are in valid range (0-52)."""
# LKWEEKS values: 0 = not unemployed, 1-52 = weeks, -1 = NIU
# After processing, should be 0-52 (NIU mapped to 0)

raw_values = np.array([-1, 0, 1, 26, 52, -1])
processed = np.where(raw_values == -1, 0, raw_values)

assert processed.min() >= 0, "Minimum should be >= 0"
assert processed.max() <= 52, "Maximum should be <= 52"
assert processed[0] == 0, "NIU (-1) should map to 0"
assert processed[1] == 0, "Not unemployed (0) should stay 0"
assert processed[3] == 26, "26 weeks should stay 26"

def test_puf_weeks_imputation_constraints(self):
"""Test the weeks imputation constraints for PUF copy."""
# The QRF-based imputation should respect these constraints:
# 1. weeks should be in [0, 52]
# 2. weeks should be 0 when UC is 0

# Test constraint enforcement
raw_imputed = np.array([-5, 0, 25, 60, 100])
uc_values = np.array([100, 0, 5000, 10000, 0])

# Apply constraints like the function does
constrained = np.clip(raw_imputed, 0, 52)
constrained = np.where(uc_values > 0, constrained, 0)

assert constrained.min() >= 0, "Should be non-negative"
assert constrained.max() <= 52, "Should be capped at 52 weeks"
assert constrained[1] == 0, "No UC should mean 0 weeks"
assert constrained[4] == 0, "No UC should mean 0 weeks"
assert constrained[2] == 25, "Valid weeks with UC should be preserved"

def test_extended_cps_handles_weeks_unemployed(self):
"""Test that extended_cps.py has special handling for weeks_unemployed."""
ecps_path = Path(__file__).parent.parent / (
"policyengine_us_data/datasets/cps/extended_cps.py"
)
content = ecps_path.read_text()

# Check for weeks_unemployed handling
assert (
"weeks_unemployed" in content
), "extended_cps.py should handle weeks_unemployed"
assert (
"impute_weeks_unemployed_for_puf" in content
), "Should have imputation function for PUF weeks"