Skip to content

Commit

Permalink
improved error handling
Browse files Browse the repository at this point in the history
  • Loading branch information
arjunprakash027 committed May 18, 2024
1 parent 2586f42 commit 920a94a
Show file tree
Hide file tree
Showing 2 changed files with 23 additions and 11 deletions.
1 change: 1 addition & 0 deletions airlift/airtable_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
import requests
from typing import Any, Dict, Iterable, Iterator, List, Optional
from airlift.airtable_error_handling import ClientError
from airlift.utils_exceptions import CriticalError
from airlift.utils_exceptions import AirtableError
from airlift.csv_data import CSVRowType
from icecream import ic
Expand Down
33 changes: 22 additions & 11 deletions airlift/airtable_upload.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
from tqdm import tqdm
from icecream import ic
from airlift.dropbox_client import dropbox_client
from airlift.utils_exceptions import CriticalError

logger = logging.getLogger(__name__)
ATDATA = List[Dict[str, Dict[str, str]]]
Expand Down Expand Up @@ -42,11 +43,20 @@ def upload_data(self) -> None:

with concurrent.futures.ThreadPoolExecutor(max_workers=self.workers) as executor:
futures = [executor.submit(self._worker,data_queue, progress_bar) for _ in
range(self.workers)]
concurrent.futures.wait(futures, timeout=None)
range(self.workers)]

for future in concurrent.futures.as_completed(futures):
try:
future.result() # This will re-raise any exception caught in the worker
except CriticalError as e:
logger.error('A critical error occurred in one of the worker threads: %s', str(e))
self.stop_event.set() # Signal other workers to stop
break
#concurrent.futures.wait(futures, timeout=None)

except Exception as e:
logger.error('Something went wrong while uploading the data: %s', str(e))
#logger.error('Something went wrong while uploading the data: %s', str(e))
raise CriticalError('Something went wrong while uploading the data')



Expand All @@ -64,18 +74,14 @@ def _worker(self,data_queue: Queue, progress_bar) -> None:
if self.client.missing_field_single(column):
data['fields'][column] = data['fields'][self.columns_copy[0]]
else:
logger.warning(
f"The Column {column} is not present in airtable! Please create it and try again")
pass
raise CriticalError(f"The Column {column} is not present in airtable! Please create it and try again")

if self.rename_key_column:
if self.client.missing_field_single(self.rename_key_column[1]):
data['fields'][self.rename_key_column[1]] = data['fields'][self.rename_key_column[0]]
del data['fields'][self.rename_key_column[0]]
else:
logger.warning(
f"The Key Column {column} is not present in airtable! Please create it and try again")
pass
raise CriticalError(f"The Key Column {self.rename_key_column[1]} is not present in airtable! Please create it and try again")

try:
for key, value in data['fields'].items():
Expand Down Expand Up @@ -118,6 +124,11 @@ def _worker(self,data_queue: Queue, progress_bar) -> None:
except Exception as e:
logger.error(e)

except Empty:
break
except Exception as e:

if data_queue.empty():
break
else:
logger.error(e)
raise CriticalError

0 comments on commit 920a94a

Please sign in to comment.