-
-
Notifications
You must be signed in to change notification settings - Fork 0
/
airtable_upload.py
112 lines (93 loc) · 4.92 KB
/
airtable_upload.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
import logging
import concurrent.futures
from airlift.airtable_client import new_client
from typing import Any, Dict, Iterable, Iterator, List, Optional
from queue import Queue, Empty
from airlift.dropbox_client import dropbox_client
import os
from tqdm import tqdm
from icecream import ic
from airlift.dropbox_client import dropbox_client
from airlift.utils_exceptions import CriticalError
logger = logging.getLogger(__name__)
ATDATA = List[Dict[str, Dict[str, str]]]
class Upload:
def __init__(self,client: new_client, new_data:ATDATA,dbx:dropbox_client,args:dict):
self.dbx = dbx
self.new_data = new_data
self.client = client
self.dirname = os.path.dirname(args.csv_file)
self.basename = os.path.basename(args.csv_file)
self.attachment_columns=args.attachment_columns
self.attachment_columns_map=args.attachment_columns_map
self.columns_copy=args.columns_copy
self.rename_key_column=args.rename_key_column
self.workers = args.workers if args.workers else 5
self.log = args.log
def write_log(self,file_path, line):
with open(file_path, 'a') as file:
file.write(line + '\n')
def upload_data(self) -> None:
logger.info("Uploding data now!")
progress_bar = tqdm(total=len(self.new_data),leave=False)
try:
data_queue = Queue()
for data in self.new_data:
data_queue.put(data)
with concurrent.futures.ThreadPoolExecutor(max_workers=self.workers) as executor:
futures = [executor.submit(self._worker,data_queue, progress_bar) for _ in
range(self.workers)]
for future in concurrent.futures.as_completed(futures):
try:
future.result() # This will re-raise any exception caught in the worker
except CriticalError as e:
logger.error('A critical error occurred in one of the worker threads: %s', str(e))
self.stop_event.set() # Signal other workers to stop
break
#concurrent.futures.wait(futures, timeout=None)
except Exception as e:
#logger.error('Something went wrong while uploading the data: %s', str(e))
raise CriticalError('Something went wrong while uploading the data')
def _worker(self,data_queue: Queue, progress_bar) -> None:
while True:
try:
data = data_queue.get_nowait()
for key, value in data['fields'].items():
if self.attachment_columns:
if self.dbx:
if key in self.attachment_columns:
try:
if self.dirname:
data['fields'][key] = [{"url": self.dbx.upload_to_dropbox(f"{self.dirname}/{value}")}]
else:
data['fields'][key] = [{"url": self.dbx.upload_to_dropbox(f"{value}")}]
except Exception as e:
self.write_log(self.log,f"{value} Could not be found!")
tqdm.write(f"{value} Could not be found!")
data['fields'][key] = ""
if self.attachment_columns_map:
if self.dbx:
for attachments in self.attachment_columns_map:
if key == attachments[0]:
try:
if self.dirname:
data['fields'][attachments[1]] = [
{"url": self.dbx.upload_to_dropbox(f"{self.dirname}/{value}")}]
else:
data['fields'][attachments[1]] = [
{"url": self.dbx.upload_to_dropbox(f"{value}")}]
except Exception as e:
self.write_log(self.log,f"{value} Could not be found!")
tqdm.write(f"{value} Could not be found!")
data['fields'][attachments[1]] = ""
#ic(data['fields'])
else:
logger.error("Dropbox token not provided! Aborting the upload!")
self.client.single_upload(data)
progress_bar.update(1)
except Exception as e:
if data_queue.empty():
break
else:
logger.error(e)
raise CriticalError