Skip to content

Commit e535970

Browse files
authored
Don't pass default acls for files when doing recrusive operations (#323)
* Don't pass default acls for files when doing recrusive operations * Remove check for dir. And upgrade version
1 parent 224633f commit e535970

File tree

6 files changed

+32286
-15802
lines changed

6 files changed

+32286
-15802
lines changed

HISTORY.rst

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@ Release History
66
0.0.52 (2020-11-25)
77
+++++++++++++++++++
88
* Changed logging verbosity when closing a stream
9+
* Filter out default acl for files when using recursive acl operations
910

1011
0.0.51 (2020-10-15)
1112
+++++++++++++++++++

azure/datalake/store/__init__.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,7 @@
66
# license information.
77
# --------------------------------------------------------------------------
88

9-
__version__ = "0.0.51"
9+
__version__ = "0.0.52"
1010

1111
from .core import AzureDLFileSystem
1212
from .multithread import ADLDownloader

azure/datalake/store/multiprocessor.py

Lines changed: 17 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -8,8 +8,10 @@
88
from .exceptions import FileNotFoundError
99
try:
1010
from queue import Empty # Python 3
11+
import _thread
1112
except ImportError:
1213
from Queue import Empty # Python 2
14+
import thread
1315

1416
WORKER_THREAD_PER_PROCESS = 50
1517
QUEUE_BUCKET_SIZE = 10
@@ -37,7 +39,7 @@ def monitor_exception(exception_queue, process_ids):
3739
logger.log(logging.DEBUG, "Joining processes")
3840
for p in process_ids:
3941
p.join()
40-
import thread
42+
4143
logger.log(logging.DEBUG, "Interrupting main")
4244
raise Exception(local_exception)
4345
except Empty:
@@ -85,10 +87,13 @@ def walk(walk_path):
8587
if files['type'] == 'DIRECTORY':
8688
dir_processed_counter.increment() # A new directory to process
8789
walk_thread_pool.submit(walk, files['name'])
88-
paths.append(files['name'])
90+
91+
paths.append((files['name'], files['type'] == 'FILE'))
92+
8993
if len(paths) == QUEUE_BUCKET_SIZE:
9094
file_path_queue.put(list(paths))
9195
paths = []
96+
9297
if paths != []:
9398
file_path_queue.put(list(paths)) # For leftover paths < bucket_size
9499
except FileNotFoundError:
@@ -116,7 +121,7 @@ def walk(walk_path):
116121
walk_thread_pool = ThreadPoolExecutor(max_workers=WORKER_THREAD_PER_PROCESS)
117122

118123
# Root directory needs to be explicitly passed
119-
file_path_queue.put([path])
124+
file_path_queue.put([(path, False)])
120125
dir_processed_counter.increment()
121126

122127
# Processing starts here
@@ -149,6 +154,8 @@ def walk(walk_path):
149154
def processor(adl, file_path_queue, finish_queue_processing_flag, method_name, acl_spec, log_queue, exception_queue):
150155
logger = logging.getLogger(__name__)
151156

157+
removed_default_acl_spec = ",".join([x for x in acl_spec.split(',') if not x.lower().startswith("default")])
158+
152159
try:
153160
logger.addHandler(logging.handlers.QueueHandler(log_queue))
154161
logger.propagate = False # Prevents double logging
@@ -178,8 +185,14 @@ def func_wrapper(func, path, spec):
178185
file_paths = file_path_queue.get(timeout=0.1)
179186
file_path_queue.task_done() # Will not be called if empty
180187
for file_path in file_paths:
188+
is_file = file_path[1]
189+
if is_file:
190+
spec = removed_default_acl_spec
191+
else:
192+
spec = acl_spec
193+
181194
logger.log(logging.DEBUG, "Starting on path:" + str(file_path))
182-
function_thread_pool.submit(func_wrapper, adl_function, file_path, acl_spec)
195+
function_thread_pool.submit(func_wrapper, adl_function, file_path[0], spec)
183196
except Empty:
184197
pass
185198

0 commit comments

Comments
 (0)