Skip to content

Commit

Permalink
Merge pull request PSLmodels#380 from jdebacker/multi_taxcalc
Browse files Browse the repository at this point in the history
Parallelize taxcalc increment year
  • Loading branch information
jdebacker authored May 8, 2018
2 parents cd535ca + 120c67f commit 45af601
Show file tree
Hide file tree
Showing 2 changed files with 49 additions and 38 deletions.
83 changes: 46 additions & 37 deletions ogusa/get_micro_data.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,14 +16,12 @@
./TAX_ESTIMATE_PATH/TxFuncEst_policy{}.pkl
------------------------------------------------------------------------
'''
import sys
import taxcalc
from taxcalc import *
import pandas as pd
from pandas import DataFrame
from dask.distributed import Client
from dask import compute, delayed
import dask.multiprocessing
import numpy as np
import copy
import numba
import pickle
from ogusa.utils import DEFAULT_START_YEAR, TC_LAST_YEAR
from ogusa.utils import RECORDS_START_YEAR
Expand Down Expand Up @@ -92,7 +90,7 @@ def get_calculator(baseline, calculator_start_year, reform=None,


def get_data(baseline=False, start_year=DEFAULT_START_YEAR, reform={},
data=None):
data=None, client=None, num_workers=1):
'''
--------------------------------------------------------------------
This function creates dataframes of micro data from the
Expand Down Expand Up @@ -164,37 +162,23 @@ def get_data(baseline=False, start_year=DEFAULT_START_YEAR, reform={},

# Repeat the process for each year
# Increment years into the future but not beyond TC_LAST_YEAR
for i in range(TC_LAST_YEAR - start_year):
calc1.increment_year()
[mtr_fica, mtr_iit, mtr_combined] = calc1.mtr('e00200p')
[mtr_fica_sey, mtr_iit_sey, mtr_combined_sey] =\
calc1.mtr('e00900p')
# find mtr on capital income
mtr_combined_capinc = cap_inc_mtr(calc1)


temp = np.empty((length, 11))
temp[:, 0] = mtr_combined
temp[:, 1] = mtr_combined_sey
temp[:, 2] = mtr_combined_capinc
temp[:, 3] = calc1.array('age_head')
temp[:, 4] = calc1.array('e00200')
temp[:, 5] = calc1.array('sey')
temp[:, 6] = calc1.array('sey') + calc1.array('e00200')
temp[:, 7] = calc1.array('expanded_income')
temp[:, 8] = calc1.array('combined')
temp[:, 9] = calc1.current_year * np.ones(length)
temp[:, 10] = calc1.array('s006')

micro_data_dict[str(calc1.current_year)] = DataFrame(
data=temp, columns=['MTR wage income', 'MTR SE income',
'MTR capital income', 'Age',
'Wage income', 'SE income',
'Wage + SE income',
'Adjusted total income',
'Total tax liability', 'Year',
'Weights'])
print('year: ', str(calc1.current_year))
lazy_values = []
for year in range(start_year + 1, TC_LAST_YEAR + 1):
lazy_values.append(
delayed(taxcalc_advance)(calc1, year, length))
results = compute(*lazy_values, get=dask.multiprocessing.get,
num_workers=num_workers)
# for i, result in results.items():
for i, result in enumerate(results):
year = start_year + 1 + i
micro_data_dict[str(year)] = DataFrame(
data=result, columns=['MTR wage income', 'MTR SE income',
'MTR capital income', 'Age',
'Wage income', 'SE income',
'Wage + SE income',
'Adjusted total income',
'Total tax liability', 'Year',
'Weights'])

if reform:
pkl_path = "micro_data_policy.pkl"
Expand All @@ -205,6 +189,31 @@ def get_data(baseline=False, start_year=DEFAULT_START_YEAR, reform={},
return micro_data_dict


def taxcalc_advance(calc1, year, length):
calc1.advance_to_year(year)
print('year: ', str(calc1.current_year))
[mtr_fica, mtr_iit, mtr_combined] = calc1.mtr('e00200p')
[mtr_fica_sey, mtr_iit_sey, mtr_combined_sey] =\
calc1.mtr('e00900p')
# find mtr on capital income
mtr_combined_capinc = cap_inc_mtr(calc1)

temp = np.empty((length, 11))
temp[:, 0] = mtr_combined
temp[:, 1] = mtr_combined_sey
temp[:, 2] = mtr_combined_capinc
temp[:, 3] = calc1.array('age_head')
temp[:, 4] = calc1.array('e00200')
temp[:, 5] = calc1.array('sey')
temp[:, 6] = calc1.array('sey') + calc1.array('e00200')
temp[:, 7] = calc1.array('expanded_income')
temp[:, 8] = calc1.array('combined')
temp[:, 9] = calc1.current_year * np.ones(length)
temp[:, 10] = calc1.array('s006')

return temp


def cap_inc_mtr(calc1):
# find mtr on capital income
capital_income_sources_taxed = (
Expand Down
4 changes: 3 additions & 1 deletion ogusa/txfunc.py
Original file line number Diff line number Diff line change
Expand Up @@ -1528,7 +1528,9 @@ def tax_func_estimate(BW, S, starting_age, ending_age,
# call tax caculator and get microdata
micro_data = get_micro_data.get_data(baseline=baseline,
start_year=beg_yr,
reform=reform, data=data)
reform=reform, data=data,
client=client,
num_workers=num_workers)

lazy_values = []
for t in years_list:
Expand Down

0 comments on commit 45af601

Please sign in to comment.