Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Improved error handling #18

Merged
merged 4 commits into from
Aug 10, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
51 changes: 46 additions & 5 deletions src/fr_toolbelt/api_requests/get_documents.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
from datetime import date
from pathlib import Path
import re
import time

from pandas import DataFrame, read_csv, read_excel
import requests
Expand Down Expand Up @@ -52,6 +53,44 @@ class InputFileError(Exception):
pass


def sleep_retry(timeout: int, retry: int = 3):
"""Decorator to sleep and retry request when receiving an error
(source: [RealPython](https://realpython.com/python-sleep/#adding-a-python-sleep-call-with-decorators)).

Args:
timeout (int): Number of seconds to sleep after error.
retry (int, optional): Number of times to retry. Defaults to 3.
"""
def retry_decorator(function):
def wrapper(*args, **kwargs):
retries = 0
while retries < retry:
try:
value = function(*args, **kwargs)
if value is not None:
return value
else:
raise QueryError
except (requests.HTTPError, requests.JSONDecodeError, ):
#print(f'Sleeping for {timeout} seconds')
time.sleep(timeout)
retries += 1
return wrapper
return retry_decorator


def _ensure_json_response(response: requests.Response):
"""Ensure request response is valid JSON by checking for 200 status code.
Returns JSON response or empty dictionary.
"""
if response.status_code == 200:
res_json = response.json()
else:
res_json = {}
return res_json


@sleep_retry(60)
def _retrieve_results_by_page_range(num_pages: int, endpoint_url: str, dict_params: dict) -> list:
"""Retrieve documents by looping over a given number of pages.

Expand All @@ -67,14 +106,16 @@ def _retrieve_results_by_page_range(num_pages: int, endpoint_url: str, dict_para
for page in range(1, num_pages + 1): # grab results from each page
dict_params.update({"page": page})
response = requests.get(endpoint_url, params=dict_params)
results_this_page = response.json()["results"]
response = _ensure_json_response(response)
results_this_page = response.get("results", [])
results.extend(results_this_page)
tally += len(results_this_page)
count = response.json()["count"]
print(count, tally)
return results, count


@sleep_retry(60)
def _retrieve_results_by_next_page(endpoint_url: str, dict_params: dict) -> list:
"""Retrieve documents by accessing "next_page_url" returned by each request.

Expand All @@ -89,19 +130,20 @@ def _retrieve_results_by_next_page(endpoint_url: str, dict_params: dict) -> list
list: Documents retrieved from the API.
"""
results = []
response = requests.get(endpoint_url, params=dict_params).json()
response = requests.get(endpoint_url, params=dict_params)
response = _ensure_json_response(response)
pages = response.get("total_pages", 1)
next_page_url = response.get("next_page_url")
counter = 0
while next_page_url is not None:
counter += 1
results_this_page = response["results"]
results_this_page = response.get("results", [])
results.extend(results_this_page)
response = requests.get(next_page_url).json()
next_page_url = response.get("next_page_url")
else:
counter += 1
results_this_page = response["results"]
results_this_page = response.get("results", [])
results.extend(results_this_page)

# raise exception if failed to access all pages
Expand All @@ -115,7 +157,6 @@ def _query_documents_endpoint(
endpoint_url: str,
dict_params: dict,
handle_duplicates: bool | str = False,
#show_progress: bool = False,
**kwargs
) -> tuple[list, int]:
"""GET request for documents endpoint.
Expand Down
4 changes: 2 additions & 2 deletions src/fr_toolbelt/preprocessing/agencies.py
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ class AgencyMetadata:
Args:
data (dict, optional): Accepts a JSON object of structure iterable[dict]. Defaults to None.
"""
def __init__(self, data: list[dict] = None):
def __init__(self, data: list[dict] | None = None):
if data is not None:
self.data = data
else:
Expand Down Expand Up @@ -65,7 +65,7 @@ def __extract_metadata(
# return response as json
return agencies_response.json()

def __extract_schema(self, metadata: dict[dict] = None):
def __extract_schema(self, metadata: dict[dict] | None = None):
"""Get Agency schema of agencies available from API.

Args:
Expand Down
2 changes: 1 addition & 1 deletion src/fr_toolbelt/preprocessing/dockets.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ def __init__(self,
documents: list[dict],
field_key: str = "regulations_dot_gov_info",
subfield_key: str = "docket_id",
value_key: str = None
value_key: str | None = None
) -> None:
super().__init__(documents=documents, field_key=field_key, subfield_key=subfield_key)
if value_key is None:
Expand Down
2 changes: 1 addition & 1 deletion src/fr_toolbelt/preprocessing/documents.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ def process_documents(
documents: list[dict],
which: str | list | tuple = "all",
docket_data_source: str = "dockets",
del_keys: str | list | tuple = None,
del_keys: str | list | tuple | None = None,
**kwargs
) -> list[dict]:
"""Process one or more fields in each document.
Expand Down
14 changes: 7 additions & 7 deletions src/fr_toolbelt/preprocessing/fields.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,10 +6,10 @@ class FieldData(ABC):
"""Base class for processing Federal Register fields."""
def __init__(self,
documents: list[dict],
field_key: str = None,
subfield_key: str = None,
field_key: str | None = None,
subfield_key: str | None = None,
subfield_keys: tuple[str] = (),
value_key: str = None,
value_key: str | None = None,
value_keys: tuple[str] = ()
) -> None:
self.documents = documents
Expand All @@ -23,14 +23,14 @@ def __init__(self,
def _extract_field_info(self, document: dict):
pass

def _create_value_key(self, document: dict, values: str = None) -> dict:
def _create_value_key(self, document: dict, values: str | None = None) -> dict:
document_copy = document.copy()
document_copy.update(
{self.value_key: values, }
)
return document_copy

def _create_value_keys(self, document: dict, values: tuple = None) -> dict:
def _create_value_keys(self, document: dict, values: tuple | None = None) -> dict:

document_copy = document.copy()
# values: rin_info tuples (RIN, Priority, UA issue)
Expand All @@ -44,7 +44,7 @@ def _create_value_keys(self, document: dict, values: tuple = None) -> dict:
)
return document_copy

def _del_field_key(self, document: dict, add_keys: str | tuple | list = None):
def _del_field_key(self, document: dict, add_keys: str | tuple | list | None = None):
document_copy = document.copy()
if add_keys is not None:
if isinstance(add_keys, str):
Expand All @@ -59,7 +59,7 @@ def _del_field_key(self, document: dict, add_keys: str | tuple | list = None):
document_copy.pop(self.field_key, None)
return document_copy

def process_data(self, del_keys: str | tuple | list = None) -> list[dict]:
def process_data(self, del_keys: str | tuple | list | None = None) -> list[dict]:

if self.value_key is not None:
return [self._del_field_key(self._create_value_key(doc, values=self._extract_field_info(doc)), add_keys=del_keys) for doc in self.documents]
Expand Down
Loading