Skip to content

Commit db2a5e0

Browse files
committed
Refactoring
1 parent 60dab54 commit db2a5e0

File tree

2 files changed

+40
-96
lines changed

2 files changed

+40
-96
lines changed

azure/datalake/store/multiprocessor.py

Lines changed: 40 additions & 42 deletions
Original file line numberDiff line numberDiff line change
@@ -6,33 +6,32 @@
66
import os
77
import logging.handlers
88
from .exceptions import FileNotFoundError
9-
10-
119
try:
1210
from queue import Empty # Python 3
1311
except ImportError:
1412
from Queue import Empty # Python 2
15-
end_queue_sentinel = [None, None]
1613

17-
exception = None
18-
exception_lock = threading.Lock()
14+
WORKER_THREAD_PER_PROCESS = 50
15+
QUEUE_BUCKET_SIZE = 10
16+
END_QUEUE_SENTINEL = [None, None]
17+
GLOBAL_EXCEPTION = None
18+
GLOBAL_EXCEPTION_LOCK = threading.Lock()
1919

2020

21-
threading
2221
def monitor_exception(exception_queue, process_ids):
23-
global exception
22+
global GLOBAL_EXCEPTION
2423
logger = logging.getLogger(__name__)
2524
logger.setLevel(logging.DEBUG)
2625

2726
while True:
2827
try:
29-
excep = exception_queue.get(timeout=0.1)
30-
if excep == end_queue_sentinel:
28+
local_exception = exception_queue.get(timeout=0.1)
29+
if local_exception == END_QUEUE_SENTINEL:
3130
break
3231
logger.log(logging.DEBUG, "Setting global exception")
33-
exception_lock.acquire()
34-
exception = excep
35-
exception_lock.release()
32+
GLOBAL_EXCEPTION_LOCK.acquire()
33+
GLOBAL_EXCEPTION = local_exception
34+
GLOBAL_EXCEPTION_LOCK.release()
3635
logger.log(logging.DEBUG, "Closing processes")
3736
for p in process_ids:
3837
p.terminate()
@@ -41,7 +40,7 @@ def monitor_exception(exception_queue, process_ids):
4140
p.join()
4241
import thread
4342
logger.log(logging.DEBUG, "Interrupting main")
44-
raise Exception(excep)
43+
raise Exception(local_exception)
4544
except Empty:
4645
pass
4746

@@ -51,11 +50,11 @@ def log_listener_process(queue):
5150
try:
5251
record = queue.get(timeout=0.1)
5352
queue.task_done()
54-
if record == end_queue_sentinel: # We send this as a sentinel to tell the listener to quit.
53+
if record == END_QUEUE_SENTINEL: # We send this as a sentinel to tell the listener to quit.
5554
break
5655
logger = logging.getLogger(record.name)
5756
logger.handlers.clear()
58-
logger.handle(record) # No level or filter logic applied - just do it!
57+
logger.handle(record) # No level or filter logic applied - just do it!
5958
except Empty: # Try again
6059
pass
6160
except Exception as e:
@@ -65,14 +64,12 @@ def log_listener_process(queue):
6564

6665

6766
def multi_processor_change_acl(adl, path=None, method_name="", acl_spec="", number_of_sub_process=None):
68-
log_queue = multiprocessing.JoinableQueue()
69-
exception_queue = multiprocessing.Queue()
7067
logger = logging.getLogger(__name__)
7168
logger.setLevel(logging.DEBUG)
72-
queue_bucket_size = 10
73-
worker_thread_num_per_process = 50
7469

7570
def launch_processes(number_of_processes):
71+
if number_of_processes is None:
72+
number_of_processes = max(2, multiprocessing.cpu_count() - 1)
7673
process_list = []
7774
for i in range(number_of_processes):
7875
process_list.append(multiprocessing.Process(target=processor,
@@ -84,46 +81,50 @@ def launch_processes(number_of_processes):
8481
def walk(walk_path):
8582
try:
8683
paths = []
87-
all_files = adl._ls(path=walk_path)
84+
all_files = adl.ls(path=walk_path, detail=True)
8885

8986
for files in all_files:
9087
if files['type'] == 'DIRECTORY':
9188
dir_processed_counter.increment() # A new directory to process
9289
walk_thread_pool.submit(walk, files['name'])
9390
paths.append(files['name'])
94-
if len(paths) == queue_bucket_size:
91+
if len(paths) == QUEUE_BUCKET_SIZE:
9592
file_path_queue.put(list(paths))
9693
paths = []
9794
if paths != []:
9895
file_path_queue.put(list(paths)) # For leftover paths < bucket_size
9996
except FileNotFoundError:
10097
pass # Continue in case the file was deleted in between
101-
except:
98+
except Exception:
10299
import traceback
103100
logger.exception("Failed to walk for path: " + str(walk_path) + ". Exiting!")
104101
exception_queue.put(traceback.format_exc())
105102
finally:
106103
dir_processed_counter.decrement() # Processing complete for this directory
107104

105+
# Initialize concurrency primitives
106+
log_queue = multiprocessing.JoinableQueue()
107+
exception_queue = multiprocessing.Queue()
108108
finish_queue_processing_flag = multiprocessing.Event()
109109
file_path_queue = multiprocessing.JoinableQueue()
110-
if number_of_sub_process == None:
111-
number_of_sub_process = max(2, multiprocessing.cpu_count()-1)
110+
dir_processed_counter = CountUpDownLatch()
112111

112+
# Start relevant threads and processes
113+
log_listener = threading.Thread(target=log_listener_process, args=(log_queue,))
114+
log_listener.start()
113115
child_processes = launch_processes(number_of_sub_process)
114116
exception_monitor_thread = threading.Thread(target=monitor_exception, args=(exception_queue, child_processes))
115117
exception_monitor_thread.start()
116-
log_listener = threading.Thread(target=log_listener_process, args=(log_queue,))
117-
log_listener.start()
118+
walk_thread_pool = ThreadPoolExecutor(max_workers=WORKER_THREAD_PER_PROCESS)
118119

119-
dir_processed_counter = CountUpDownLatch()
120-
walk_thread_pool = ThreadPoolExecutor(max_workers=worker_thread_num_per_process)
121-
122-
file_path_queue.put([path]) # Root directory needs to be passed
120+
# Root directory needs to be explicitly passed
121+
file_path_queue.put([path])
123122
dir_processed_counter.increment()
124-
walk(path) # Start processing root directory
125123

126-
if dir_processed_counter.is_zero(): # Done processing all directories. Blocking call.
124+
# Processing starts here
125+
walk(path)
126+
127+
if dir_processed_counter.is_zero(): # Done processing all directories. Blocking call.
127128
walk_thread_pool.shutdown()
128129
file_path_queue.close() # No new elements to add
129130
file_path_queue.join() # Wait for operations to be done
@@ -135,11 +136,11 @@ def walk(walk_path):
135136

136137
# Cleanup
137138
logger.log(logging.DEBUG, "Sending exception sentinel")
138-
exception_queue.put(end_queue_sentinel)
139+
exception_queue.put(END_QUEUE_SENTINEL)
139140
exception_monitor_thread.join()
140141
logger.log(logging.DEBUG, "Exception monitor thread finished")
141142
logger.log(logging.DEBUG, "Sending logger sentinel")
142-
log_queue.put(end_queue_sentinel)
143+
log_queue.put(END_QUEUE_SENTINEL)
143144
log_queue.join()
144145
log_queue.close()
145146
logger.log(logging.DEBUG, "Log queue closed")
@@ -159,21 +160,19 @@ def processor(adl, file_path_queue, finish_queue_processing_flag, method_name, a
159160
logger.setLevel(logging.DEBUG)
160161

161162
try:
162-
worker_thread_num_per_process = 50
163163
func_table = {"mod_acl": adl.modify_acl_entries, "set_acl": adl.set_acl, "rem_acl": adl.remove_acl_entries}
164-
function_thread_pool = ThreadPoolExecutor(max_workers=worker_thread_num_per_process)
164+
function_thread_pool = ThreadPoolExecutor(max_workers=WORKER_THREAD_PER_PROCESS)
165165
adl_function = func_table[method_name]
166166
logger.log(logging.DEBUG, "Started processor pid:"+str(os.getpid()))
167167

168168
def func_wrapper(func, path, spec):
169169
try:
170170
func(path=path, acl_spec=spec)
171-
except FileNotFoundError as e:
171+
except FileNotFoundError:
172172
logger.exception("File "+str(path)+" not found")
173-
pass # Exception is being logged in the relevant acl method. Do nothing here
174-
except:
175-
# TODO Raise to parent process
176-
pass
173+
# Complete Exception is being logged in the relevant acl method. Don't print exception here
174+
except Exception as e:
175+
logger.exception("File " + str(path) + " not set. Exception "+str(e))
177176

178177
logger.log(logging.DEBUG, "Completed running on path:" + str(path))
179178

@@ -189,7 +188,6 @@ def func_wrapper(func, path, spec):
189188

190189
except Exception as e:
191190
import traceback
192-
# TODO Raise to parent process
193191
logger.exception("Exception in pid "+str(os.getpid())+"Exception: " + str(e))
194192
exception_queue.put(traceback.format_exc())
195193
finally:

azure_bdist_wheel.py

Lines changed: 0 additions & 54 deletions
This file was deleted.

0 commit comments

Comments
 (0)