Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Rev fr rules #32

Merged
merged 15 commits into from
Feb 21, 2024
Prev Previous commit
Next Next commit
rename file
  • Loading branch information
mfebrizio committed Feb 17, 2024
commit 96d6572ce3934609fb0e65436d24d2192c6d4ecf
Original file line number Diff line number Diff line change
Expand Up @@ -6,23 +6,23 @@
from pandas import DataFrame, to_datetime, concat
from numpy import array

from federal_register_api import query_endpoint_documents
from fr_toolbelt.api_requests import get_documents_by_date

#from federal_register_api import query_endpoint_documents
from search_columns import search_columns


# set file paths
p = Path(__file__)
MAIN_DIR = p.parents[1] # main folder for Reg Stats chart; store output data here
API_DIR = p.parents[1].joinpath("API") # folder for storing retrieved API data
API_DIR = p.parents[1].joinpath("_api") # folder for storing retrieved API data

# set constants
YEAR_RANGE = list(map(str, range(1995, date.today().year)))
YEAR_RANGE = [f"{yr}" for yr in range(1995, date.today().year)]
FIELDS = ['action', 'agencies', 'agency_names', 'citation', 'correction_of', 'corrections',
'docket_ids', 'document_number', 'json_url', 'page_length', 'president', 'publication_date',
'regulation_id_numbers', 'significant', 'title', 'type'
'document_number', 'json_url', 'president', 'publication_date', 'title', 'type',
]
SAVE_NAME_CSV = "federal_register_rules_presidential_year.csv"
#REPLACE_EXISTING = True
SAVE_NAME_CSV = "federal_register_rules_by_presidential_year.csv"


# ------------------
Expand All @@ -44,36 +44,35 @@ def retrieve_documents(years: list, doctype: str, fields: list, save_path: Path,
# set file names
file_name = save_path / f"documents_endpoint_{doctype}_{years[0]}_{years[-1]}.json"
last_year = int(years[-1]) + 1
file_name_jan = save_path / f"documents_endpoint_{doctype}_{last_year}_Jan.json"

# check if file already exists
# don't overwrite unless replace_existing is True
if not (file_name.is_file() and file_name_jan.is_file()) or replace_existing:
if (not file_name.is_file()) or replace_existing:
type_list = [doctype.upper()] # API parameter accepts all uppercase

# query endpoint for non-Jan documents
# query endpoint for documents
print(f"Retrieving type = {doctype}...")
documents = query_endpoint_documents(years, doctypeList=type_list, fieldsList=fields)

start_date = f"{years[0]}-01-01"
end_date = f"{last_year}-01-31"
documents, _ = get_documents_by_date(
start_date,
end_date,
document_types=type_list,
fields=fields,
handle_duplicates="flag"
)

dups = len(list(r for r in documents if r.get("duplicate") == True))
if dups > 0:
print(f"Flagged {dups} duplicate documents.")

# save json file
with open(file_name, 'w', encoding='utf-8') as f:
json.dump(documents, f, indent=4)

# query endpoint for non-Jan documents

date_range = (f"{last_year}-01-01", f"{last_year}-01-31")
documents_jan = query_endpoint_documents(date_range,
doctypeList=doctype,
fieldsList=fields,
by_year=False)

# save json file
with open(file_name_jan, 'w', encoding='utf-8') as f:
json.dump(documents_jan, f, indent=4)

# return list of documents
print("Retrieved documents and exported as JSON!")
return documents["results"] + documents_jan["results"]
return documents


def format_documents(documents: list[dict]):
Expand All @@ -87,7 +86,6 @@ def format_documents(documents: list[dict]):
"""
# create dataframe
df = DataFrame(documents)
#df.info()

# convert publication date to datetime format
df['publication_dt'] = to_datetime(df['publication_date'])
Expand Down Expand Up @@ -117,20 +115,21 @@ def filter_documents(df: DataFrame):
"""
# get original column names
cols = df.columns.tolist()
#print(cols)

# filter out corrections
# 1. Using correction fields
bool_na = array(df["correction_of"].isna())
#df_filtered = df.loc[bool_na, :] # keep when correction_of is missing
print(f"correction_of missing: {sum(bool_na)}")
#print(f"correction_of missing: {sum(bool_na)}")

# 2. Searching other fields
search_1 = search_columns(df, [r"^C[\d]"], ["document_number"],
search_1 = search_columns(df, [r"^[a-z][\d]-[\w]{2,4}-"], ["document_number"],
return_column="indicator1")
search_2 = search_columns(df, [r"(?:;\scorrection\b)|(?:\bcorrecting\samend[\w]+\b)"], ["title", "action"],
return_column="indicator2")
bool_search = array(search_1["indicator1"] == 1) | array(search_2["indicator2"] == 1)
print(f"Flagged documents: {sum(bool_search)}")
#print(f"Flagged documents: {sum(bool_search)}")
#bool_search = array(search_2["indicator2"] == 1)

# separate corrections from non-corrections
Expand All @@ -147,7 +146,7 @@ def filter_documents(df: DataFrame):
sep="\n")


def group_documents(df: DataFrame, return_column: str):
def group_documents(df: DataFrame, group_column: str = 'presidential_year', value_column: str = 'document_number', return_column: str = None):
"""Group Federal Register documents by presidential year.
A [presidential year](https://regulatorystudies.columbian.gwu.edu/reg-stats) is defined as Feb. 1 to Jan. 31.

Expand All @@ -158,39 +157,15 @@ def group_documents(df: DataFrame, return_column: str):
Returns:
DataFrame: Documents grouped by presidential year.
"""
grouped_df = df.loc[:, ['presidential_year', 'document_number']].groupby('presidential_year').agg('count')
grouped_df = grouped_df.rename(columns = {'document_number': return_column})
grouped_df = df.loc[:, [
group_column,
value_column,
]].groupby(group_column).agg('count') #['count', 'nunique'])
if return_column is not None:
grouped_df = grouped_df.rename(columns = {value_column: return_column})
return grouped_df


def load_combine_documents(years: list, doctype: str, load_path: Path):
"""Load documents and combine with January data.

Args:
years (list): List of years.
doctype (str): Document type (e.g., RULE, PRORULE, NOTICE, PRESDOCU).
load_path (Path): Path to documents.

Returns:
list[dict]: List of retrieved documents.
"""
# define start, end, and last Jan. year
start, end, last_year = years[0], years[-1], int(years[-1]) + 1

# load main set of documents
file_name = f"documents_endpoint_{doctype}_{start}_{end}.json"
with open(load_path / file_name, "r") as f:
set_a = json.load(f)

# load last year January documents
file_name = f"documents_endpoint_{doctype}_{last_year}_Jan.json"
with open(load_path / file_name, "r") as f:
set_b = json.load(f)

# combine and return
return set_a["results"] + set_b["results"]


def main(years: list, fields: list, raw_path: Path, processed_path: Path, processed_file_name: str, replace_existing: bool = True):
"""Runs entire pipeline to retrieve and process documents by presidential year.

Expand All @@ -205,34 +180,21 @@ def main(years: list, fields: list, raw_path: Path, processed_path: Path, proces
# print directory paths
print(f"Main folder for processed data: {processed_path}", f"Folder for API data: {raw_path}", sep="\n")

# final rules
doctype = "RULE"
final_rules = retrieve_documents(years, doctype, fields, raw_path, replace_existing=replace_existing)
# get and process documents
df_list, corrections_list = [], []
doctypes = {"RULE": "final_rules", "PRORULE": "proposed_rules"}
for doctype, fieldname in doctypes.items():
documents = retrieve_documents(years, doctype, fields, raw_path, replace_existing=replace_existing)
df = format_documents(documents)
df, corrections = filter_documents(df)
corrections.loc[:, "type"] = doctype
df_grouped = group_documents(df, fieldname)
df_list.append(df_grouped)
corrections_list.append(corrections)

try: # block handles when documents have already been retrieved
dfRules = format_documents(final_rules)
except:
final_rules = load_combine_documents(years, doctype, raw_path)
dfRules = format_documents(final_rules)

dfRules, dfCorrections_rule = filter_documents(dfRules)
dfRules_grouped = group_documents(dfRules, "final_rules")

# proposed rules
doctype = "PRORULE"
proposed_rules = retrieve_documents(years, doctype, fields, raw_path, replace_existing=replace_existing)

try: # block handles when documents have already been retrieved
dfProps = format_documents(proposed_rules)
except:
proposed_rules = load_combine_documents(years, doctype, raw_path)
dfProps = format_documents(proposed_rules)

dfProps, dfCorrections_prop = filter_documents(dfProps)
dfProps_grouped = group_documents(dfProps, "proposed_rules")

# join dataframes: non-corrections
dfPrez = dfRules_grouped.join(dfProps_grouped)
print(df_list[0], df_list[1])
dfPrez = df_list[0].join(df_list[1])
dfPrez = dfPrez.drop(index=1994, errors='ignore') # drop partial data from 1994 presidential year
dfPrez = dfPrez.rename_axis('presidential_year', axis=0) # rename axis
dfPrez = dfPrez.reset_index() # reset index so presidential year becomes a column
Expand All @@ -244,12 +206,12 @@ def main(years: list, fields: list, raw_path: Path, processed_path: Path, proces
dfPrez.to_csv(f, index=False, lineterminator='\n')

# append dataframes: corrections
dfCorrections_rule["type"] = "RULE"
dfCorrections_prop["type"] = "PRORULE"
dfCorrections = concat([dfCorrections_rule, dfCorrections_prop],
axis=0,
ignore_index=True,
verify_integrity=True)
dfCorrections = concat(
corrections_list,
axis=0,
ignore_index=True,
verify_integrity=True
)

# save csv file
file_name = processed_path / "federal_register_corrections.csv"
Expand All @@ -258,6 +220,6 @@ def main(years: list, fields: list, raw_path: Path, processed_path: Path, proces


if __name__ == "__main__":

# run the pipeline
main(YEAR_RANGE, FIELDS, API_DIR, MAIN_DIR, SAVE_NAME_CSV)