Skip to content

Commit 2628e49

Browse files
authored
Recursive ACL functions (#247)
* Added CountUpDownLatch to utilities * Basic Acl tests * Added tests for recursive acl functions * Added recursive acl functionality
1 parent 96c064b commit 2628e49

File tree

6 files changed

+389
-14
lines changed

6 files changed

+389
-14
lines changed

azure/datalake/store/core.py

Lines changed: 25 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,6 @@
1818
import io
1919
import logging
2020
import sys
21-
import time
2221
import uuid
2322
import json
2423

@@ -30,6 +29,7 @@
3029
from .utils import ensure_writable, read_block
3130
from .enums import ExpiryOptionType
3231
from .retry import ExponentialRetryPolicy
32+
from .multiprocessor import multi_processor_change_acl
3333

3434
if sys.version_info >= (3, 4):
3535
import pathlib
@@ -329,11 +329,11 @@ def _acl_call(self, action, path, acl_spec=None, invalidate_cache=False):
329329

330330
return to_return
331331

332-
def set_acl(self, path, acl_spec):
332+
def set_acl(self, path, acl_spec, recursive=False, number_of_sub_process=None):
333333
"""
334334
Sets the Access Control List (ACL) for a file or folder.
335335
336-
Note: this is not recursive, and applies only to the file or folder specified.
336+
Note: this is by default not recursive, and applies only to the file or folder specified.
337337
338338
Parameters
339339
----------
@@ -342,18 +342,21 @@ def set_acl(self, path, acl_spec):
342342
acl_spec: str
343343
The ACL specification to set on the path in the format
344344
'[default:]user|group|other:[entity id or UPN]:r|-w|-x|-,[default:]user|group|other:[entity id or UPN]:r|-w|-x|-,...'
345+
recursive: bool
346+
Specifies whether to set ACLs recursively or not
345347
"""
348+
if recursive:
349+
multi_processor_change_acl(adl=self, path=path, method_name="set_acl", acl_spec=acl_spec, number_of_sub_process=number_of_sub_process)
350+
else:
351+
self._acl_call('SETACL', path, acl_spec, invalidate_cache=True)
346352

347-
self._acl_call('SETACL', path, acl_spec, invalidate_cache=True)
348-
349-
350-
def modify_acl_entries(self, path, acl_spec):
353+
def modify_acl_entries(self, path, acl_spec, recursive=False, number_of_sub_process=None):
351354
"""
352355
Modifies existing Access Control List (ACL) entries on a file or folder.
353356
If the entry does not exist it is added, otherwise it is updated based on the spec passed in.
354357
No entries are removed by this process (unlike set_acl).
355358
356-
Note: this is not recursive, and applies only to the file or folder specified.
359+
Note: this is by default not recursive, and applies only to the file or folder specified.
357360
358361
Parameters
359362
----------
@@ -362,18 +365,22 @@ def modify_acl_entries(self, path, acl_spec):
362365
acl_spec: str
363366
The ACL specification to use in modifying the ACL at the path in the format
364367
'[default:]user|group|other:[entity id or UPN]:r|-w|-x|-,[default:]user|group|other:[entity id or UPN]:r|-w|-x|-,...'
368+
recursive: bool
369+
Specifies whether to modify ACLs recursively or not
365370
"""
366-
self._acl_call('MODIFYACLENTRIES', path, acl_spec, invalidate_cache=True)
367-
371+
if recursive:
372+
multi_processor_change_acl(adl=self, path=path, method_name="mod_acl", acl_spec=acl_spec, number_of_sub_process=number_of_sub_process)
373+
else:
374+
self._acl_call('MODIFYACLENTRIES', path, acl_spec, invalidate_cache=True)
368375

369-
def remove_acl_entries(self, path, acl_spec):
376+
def remove_acl_entries(self, path, acl_spec, recursive=False, number_of_sub_process=None):
370377
"""
371378
Removes existing, named, Access Control List (ACL) entries on a file or folder.
372379
If the entry does not exist already it is ignored.
373380
Default entries cannot be removed this way, please use remove_default_acl for that.
374381
Unnamed entries cannot be removed in this way, please use remove_acl for that.
375382
376-
Note: this is not recursive, and applies only to the file or folder specified.
383+
Note: this is by default not recursive, and applies only to the file or folder specified.
377384
378385
Parameters
379386
----------
@@ -382,8 +389,13 @@ def remove_acl_entries(self, path, acl_spec):
382389
acl_spec: str
383390
The ACL specification to remove from the ACL at the path in the format (note that the permission portion is missing)
384391
'[default:]user|group|other:[entity id or UPN],[default:]user|group|other:[entity id or UPN],...'
392+
recursive: bool
393+
Specifies whether to remove ACLs recursively or not
385394
"""
386-
self._acl_call('REMOVEACLENTRIES', path, acl_spec, invalidate_cache=True)
395+
if recursive:
396+
multi_processor_change_acl(adl=self, path=path, method_name="rem_acl", acl_spec=acl_spec, number_of_sub_process=number_of_sub_process)
397+
else:
398+
self._acl_call('REMOVEACLENTRIES', path, acl_spec, invalidate_cache=True)
387399

388400

389401
def get_acl_status(self, path):
Lines changed: 197 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,197 @@
1+
from concurrent.futures import ThreadPoolExecutor
2+
from .utils import CountUpDownLatch
3+
import threading
4+
import logging
5+
import multiprocessing
6+
import os
7+
import logging.handlers
8+
from .exceptions import FileNotFoundError
9+
10+
11+
try:
12+
from queue import Empty # Python 3
13+
except ImportError:
14+
from Queue import Empty # Python 2
15+
end_queue_sentinel = [None, None]
16+
17+
exception = None
18+
exception_lock = threading.Lock()
19+
20+
21+
threading
22+
def monitor_exception(exception_queue, process_ids):
23+
global exception
24+
logger = logging.getLogger(__name__)
25+
logger.setLevel(logging.DEBUG)
26+
27+
while True:
28+
try:
29+
excep = exception_queue.get(timeout=0.1)
30+
if excep == end_queue_sentinel:
31+
break
32+
logger.log(logging.DEBUG, "Setting global exception")
33+
exception_lock.acquire()
34+
exception = excep
35+
exception_lock.release()
36+
logger.log(logging.DEBUG, "Closing processes")
37+
for p in process_ids:
38+
p.terminate()
39+
logger.log(logging.DEBUG, "Joining processes")
40+
for p in process_ids:
41+
p.join()
42+
import thread
43+
logger.log(logging.DEBUG, "Interrupting main")
44+
raise Exception(excep)
45+
except Empty:
46+
pass
47+
48+
49+
def log_listener_process(queue):
50+
while True:
51+
try:
52+
record = queue.get(timeout=0.1)
53+
queue.task_done()
54+
if record == end_queue_sentinel: # We send this as a sentinel to tell the listener to quit.
55+
break
56+
logger = logging.getLogger(record.name)
57+
logger.handlers.clear()
58+
logger.handle(record) # No level or filter logic applied - just do it!
59+
except Empty: # Try again
60+
pass
61+
except Exception as e:
62+
import sys, traceback
63+
print('Problems in logging')
64+
traceback.print_exc(file=sys.stderr)
65+
66+
67+
def multi_processor_change_acl(adl, path=None, method_name="", acl_spec="", number_of_sub_process=None):
68+
log_queue = multiprocessing.JoinableQueue()
69+
exception_queue = multiprocessing.Queue()
70+
logger = logging.getLogger(__name__)
71+
logger.setLevel(logging.DEBUG)
72+
queue_bucket_size = 10
73+
worker_thread_num_per_process = 50
74+
75+
def launch_processes(number_of_processes):
76+
process_list = []
77+
for i in range(number_of_processes):
78+
process_list.append(multiprocessing.Process(target=processor,
79+
args=(adl, file_path_queue, finish_queue_processing_flag,
80+
method_name, acl_spec, log_queue, exception_queue)))
81+
process_list[-1].start()
82+
return process_list
83+
84+
def walk(walk_path):
85+
try:
86+
paths = []
87+
all_files = adl._ls(path=walk_path)
88+
89+
for files in all_files:
90+
if files['type'] == 'DIRECTORY':
91+
dir_processed_counter.increment() # A new directory to process
92+
walk_thread_pool.submit(walk, files['name'])
93+
paths.append(files['name'])
94+
if len(paths) == queue_bucket_size:
95+
file_path_queue.put(list(paths))
96+
paths = []
97+
if paths != []:
98+
file_path_queue.put(list(paths)) # For leftover paths < bucket_size
99+
except FileNotFoundError:
100+
pass # Continue in case the file was deleted in between
101+
except:
102+
import traceback
103+
logger.exception("Failed to walk for path: " + str(walk_path) + ". Exiting!")
104+
exception_queue.put(traceback.format_exc())
105+
finally:
106+
dir_processed_counter.decrement() # Processing complete for this directory
107+
108+
finish_queue_processing_flag = multiprocessing.Event()
109+
file_path_queue = multiprocessing.JoinableQueue()
110+
if number_of_sub_process == None:
111+
number_of_sub_process = max(2, multiprocessing.cpu_count()-1)
112+
113+
child_processes = launch_processes(number_of_sub_process)
114+
exception_monitor_thread = threading.Thread(target=monitor_exception, args=(exception_queue, child_processes))
115+
exception_monitor_thread.start()
116+
log_listener = threading.Thread(target=log_listener_process, args=(log_queue,))
117+
log_listener.start()
118+
119+
dir_processed_counter = CountUpDownLatch()
120+
walk_thread_pool = ThreadPoolExecutor(max_workers=worker_thread_num_per_process)
121+
122+
file_path_queue.put([path]) # Root directory needs to be passed
123+
dir_processed_counter.increment()
124+
walk(path) # Start processing root directory
125+
126+
if dir_processed_counter.is_zero(): # Done processing all directories. Blocking call.
127+
walk_thread_pool.shutdown()
128+
file_path_queue.close() # No new elements to add
129+
file_path_queue.join() # Wait for operations to be done
130+
logger.log(logging.DEBUG, "file path queue closed")
131+
finish_queue_processing_flag.set() # Set flag to break loop of child processes
132+
for child in child_processes: # Wait for all child process to finish
133+
logger.log(logging.DEBUG, "Joining process: "+str(child.pid))
134+
child.join()
135+
136+
# Cleanup
137+
logger.log(logging.DEBUG, "Sending exception sentinel")
138+
exception_queue.put(end_queue_sentinel)
139+
exception_monitor_thread.join()
140+
logger.log(logging.DEBUG, "Exception monitor thread finished")
141+
logger.log(logging.DEBUG, "Sending logger sentinel")
142+
log_queue.put(end_queue_sentinel)
143+
log_queue.join()
144+
log_queue.close()
145+
logger.log(logging.DEBUG, "Log queue closed")
146+
log_listener.join()
147+
logger.log(logging.DEBUG, "Log thread finished")
148+
149+
150+
def processor(adl, file_path_queue, finish_queue_processing_flag, method_name, acl_spec, log_queue, exception_queue):
151+
logger = logging.getLogger(__name__)
152+
153+
try:
154+
logger.addHandler(logging.handlers.QueueHandler(log_queue))
155+
logger.propagate = False # Prevents double logging
156+
except AttributeError:
157+
# Python 2 doesn't have Queue Handler. Default to best effort logging.
158+
pass
159+
logger.setLevel(logging.DEBUG)
160+
161+
try:
162+
worker_thread_num_per_process = 50
163+
func_table = {"mod_acl": adl.modify_acl_entries, "set_acl": adl.set_acl, "rem_acl": adl.remove_acl_entries}
164+
function_thread_pool = ThreadPoolExecutor(max_workers=worker_thread_num_per_process)
165+
adl_function = func_table[method_name]
166+
logger.log(logging.DEBUG, "Started processor pid:"+str(os.getpid()))
167+
168+
def func_wrapper(func, path, spec):
169+
try:
170+
func(path=path, acl_spec=spec)
171+
except FileNotFoundError as e:
172+
logger.exception("File "+str(path)+" not found")
173+
pass # Exception is being logged in the relevant acl method. Do nothing here
174+
except:
175+
# TODO Raise to parent process
176+
pass
177+
178+
logger.log(logging.DEBUG, "Completed running on path:" + str(path))
179+
180+
while finish_queue_processing_flag.is_set() == False:
181+
try:
182+
file_paths = file_path_queue.get(timeout=0.1)
183+
file_path_queue.task_done() # Will not be called if empty
184+
for file_path in file_paths:
185+
logger.log(logging.DEBUG, "Starting on path:" + str(file_path))
186+
function_thread_pool.submit(func_wrapper, adl_function, file_path, acl_spec)
187+
except Empty:
188+
pass
189+
190+
except Exception as e:
191+
import traceback
192+
# TODO Raise to parent process
193+
logger.exception("Exception in pid "+str(os.getpid())+"Exception: " + str(e))
194+
exception_queue.put(traceback.format_exc())
195+
finally:
196+
function_thread_pool.shutdown() # Blocking call. Will wait till all threads are done executing.
197+
logger.log(logging.DEBUG, "Finished processor pid: " + str(os.getpid()))

azure/datalake/store/utils.py

Lines changed: 36 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@
1111
import os
1212
import platform
1313
import sys
14+
import threading
1415

1516
PY2 = sys.version_info.major == 2
1617

@@ -158,3 +159,38 @@ def clamp(n, smallest, largest):
158159
32
159160
"""
160161
return max(smallest, min(n, largest))
162+
163+
164+
class CountUpDownLatch:
165+
"""CountUpDownLatch provides a thread safe implementation of Up Down latch
166+
"""
167+
def __init__(self):
168+
self.lock = threading.Condition()
169+
self.val = 0
170+
self.total = 0
171+
172+
def increment(self):
173+
self.lock.acquire()
174+
self.val += 1
175+
self.total += 1
176+
self.lock.release()
177+
178+
def decrement(self):
179+
self.lock.acquire()
180+
self.val -= 1
181+
if self.val <= 0:
182+
self.lock.notifyAll()
183+
self.lock.release()
184+
185+
def total_processed(self):
186+
self.lock.acquire()
187+
temp = self.total
188+
self.lock.release()
189+
return temp
190+
191+
def is_zero(self):
192+
self.lock.acquire()
193+
while self.val > 0:
194+
self.lock.wait()
195+
self.lock.release()
196+
return True

tests/settings.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@
1919
SUBSCRIPTION_ID = fake_settings.SUBSCRIPTION_ID
2020
RESOURCE_GROUP_NAME = fake_settings.RESOURCE_GROUP_NAME
2121
RECORD_MODE = os.environ.get('RECORD_MODE', 'all').lower()
22+
AZURE_ACL_TEST_APPID = os.environ.get('AZURE_ACL_TEST_APPID')
2223
CLIENT_ID = os.environ['azure_service_principal']
2324
'''
2425
RECORD_MODE = os.environ.get('RECORD_MODE', 'none').lower()

0 commit comments

Comments
 (0)