99 from queue import Empty # Python 3
1010except ImportError :
1111 from Queue import Empty # Python 2
12- log_sentinel = [None , None ]
13-
14-
15- def log_listener_process (queue ):
16- while True :
17- try :
18- record = queue .get (timeout = 0.1 )
19- queue .task_done ()
20- if record == log_sentinel : # We send this as a sentinel to tell the listener to quit.
21- break
22- logger = logging .getLogger (record .name )
23- logger .handlers .clear ()
24- logger .handle (record ) # No level or filter logic applied - just do it!
25- except Empty : # Try again
26- pass
27- except Exception as e :
28- import sys , traceback
29- print ('Problems in logging' )
30- traceback .print_exc (file = sys .stderr )
3112
3213
3314def multi_processor_change_acl (adl , path = None , method_name = "" , acl_spec = "" ):
34- log_queue = multiprocessing .JoinableQueue ()
3515 logger = logging .getLogger (__name__ )
3616 logger .setLevel (logging .DEBUG )
3717 queue_bucket_size = 10
@@ -42,7 +22,7 @@ def launch_processes(number_of_processes):
4222 for i in range (number_of_processes ):
4323 process_list .append (multiprocessing .Process (target = processor ,
4424 args = (adl , file_path_queue , finish_queue_processing_flag ,
45- method_name , acl_spec , log_queue )))
25+ method_name , acl_spec , )))
4626 process_list [- 1 ].start ()
4727 return process_list
4828
@@ -65,15 +45,13 @@ def walk(walk_path):
6545 file_path_queue = multiprocessing .JoinableQueue ()
6646 cpu_count = multiprocessing .cpu_count ()
6747 child_processes = launch_processes (2 )
68- log_listener = threading .Thread (target = log_listener_process , args = (log_queue ,))
69- log_listener .start ()
7048
7149 dir_processed_counter = CountUpDownLatch ()
7250 walk_thread_pool = ThreadPoolExecutor (max_workers = worker_thread_num_per_process )
7351
74- file_path_queue .put ([path ]) # Root directory to initialize walk
52+ file_path_queue .put ([path ]) # Root directory to initialize walk
7553 dir_processed_counter .increment ()
76- walk (path ) # Start processing root directory
54+ walk (path ) # Start processing root directory
7755
7856 if dir_processed_counter .is_zero (): # Done processing all directories. Blocking call.
7957 file_path_queue .join () # Wait for operations to be done
@@ -83,29 +61,15 @@ def walk(walk_path):
8361 child .join ()
8462
8563 # Cleanup
86- logger .log (logging .DEBUG , "Sending logger sentinel" )
87- log_queue .put (log_sentinel )
88- log_queue .join ()
89- log_queue .close ()
90- logger .log (logging .DEBUG , "Log queue closed" )
91- log_listener .join ()
92- logger .log (logging .DEBUG , "Log thread finished" )
9364 walk_thread_pool .shutdown ()
9465 logger .log (logging .DEBUG , "Thread pool for worked threads for walk shut down" )
9566 file_path_queue .close ()
9667 logger .log (logging .DEBUG , "File path queue closed" )
9768
9869
99- def processor (adl , file_path_queue , finish_queue_processing_flag , method_name , acl_spec , log_queue ):
70+ def processor (adl , file_path_queue , finish_queue_processing_flag , method_name , acl_spec ):
10071
10172 logger = logging .getLogger (__name__ )
102-
103- try :
104- logger .addHandler (logging .handlers .QueueHandler (log_queue ))
105- logger .propagate = False # Prevents double logging
106- except AttributeError :
107- # Python 2 doesn't have Queue Handler. Default to best effort logging.
108- pass
10973 logger .setLevel (logging .DEBUG )
11074
11175 try :
0 commit comments