diff --git a/airlift/airtable_client.py b/airlift/airtable_client.py index 05be517..a52da0b 100644 --- a/airlift/airtable_client.py +++ b/airlift/airtable_client.py @@ -3,6 +3,7 @@ import requests from typing import Any, Dict, Iterable, Iterator, List, Optional from airlift.airtable_error_handling import ClientError +from airlift.utils_exceptions import CriticalError from airlift.utils_exceptions import AirtableError from airlift.csv_data import CSVRowType from icecream import ic diff --git a/airlift/airtable_upload.py b/airlift/airtable_upload.py index 31e2a5d..9364338 100644 --- a/airlift/airtable_upload.py +++ b/airlift/airtable_upload.py @@ -8,6 +8,7 @@ from tqdm import tqdm from icecream import ic from airlift.dropbox_client import dropbox_client +from airlift.utils_exceptions import CriticalError logger = logging.getLogger(__name__) ATDATA = List[Dict[str, Dict[str, str]]] @@ -42,11 +43,20 @@ def upload_data(self) -> None: with concurrent.futures.ThreadPoolExecutor(max_workers=self.workers) as executor: futures = [executor.submit(self._worker,data_queue, progress_bar) for _ in - range(self.workers)] - concurrent.futures.wait(futures, timeout=None) + range(self.workers)] + + for future in concurrent.futures.as_completed(futures): + try: + future.result() # This will re-raise any exception caught in the worker + except CriticalError as e: + logger.error('A critical error occurred in one of the worker threads: %s', str(e)) + self.stop_event.set() # Signal other workers to stop + break + #concurrent.futures.wait(futures, timeout=None) except Exception as e: - logger.error('Something went wrong while uploading the data: %s', str(e)) + #logger.error('Something went wrong while uploading the data: %s', str(e)) + raise CriticalError('Something went wrong while uploading the data') @@ -64,18 +74,14 @@ def _worker(self,data_queue: Queue, progress_bar) -> None: if self.client.missing_field_single(column): data['fields'][column] = data['fields'][self.columns_copy[0]] else: - logger.warning( - f"The Column {column} is not present in airtable! Please create it and try again") - pass + raise CriticalError(f"The Column {column} is not present in airtable! Please create it and try again") if self.rename_key_column: if self.client.missing_field_single(self.rename_key_column[1]): data['fields'][self.rename_key_column[1]] = data['fields'][self.rename_key_column[0]] del data['fields'][self.rename_key_column[0]] else: - logger.warning( - f"The Key Column {column} is not present in airtable! Please create it and try again") - pass + raise CriticalError(f"The Key Column {self.rename_key_column[1]} is not present in airtable! Please create it and try again") try: for key, value in data['fields'].items(): @@ -118,6 +124,11 @@ def _worker(self,data_queue: Queue, progress_bar) -> None: except Exception as e: logger.error(e) - except Empty: - break + except Exception as e: + + if data_queue.empty(): + break + else: + logger.error(e) + raise CriticalError