Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions MANIFEST.in
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
recursive-include azure/datalake/store *.py
recursive-include azure/datalake/store/*.py
recursive-include docs *.rst

include setup.py
Expand All @@ -7,6 +7,6 @@ include LICENSE.txt
include MANIFEST.in
include HISTORY.rst
include requirements.txt
include azure_bdist_wheel.py
include azure/__init__.py

prune docs/_build
2 changes: 1 addition & 1 deletion azure/__init__.py
Original file line number Diff line number Diff line change
@@ -1 +1 @@
__import__('pkg_resources').declare_namespace(__name__)
__path__ = __import__('pkgutil').extend_path(__path__, __name__)
2 changes: 1 addition & 1 deletion azure/datalake/__init__.py
Original file line number Diff line number Diff line change
@@ -1 +1 @@
__import__('pkg_resources').declare_namespace(__name__)
__path__ = __import__('pkgutil').extend_path(__path__, __name__)
2 changes: 2 additions & 0 deletions azure/datalake/store/core.py
Original file line number Diff line number Diff line change
Expand Up @@ -864,6 +864,8 @@ def read(self, length=-1):
self._read_blocksize()
data_read = self.cache[self.loc - self.start:
min(self.loc - self.start + length, self.end - self.start)]
if not data_read: # Check to catch possible server errors. Ideally shouldn't happen.
break
out += data_read
self.loc += len(data_read)
length -= len(data_read)
Expand Down
82 changes: 40 additions & 42 deletions azure/datalake/store/multiprocessor.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,33 +6,32 @@
import os
import logging.handlers
from .exceptions import FileNotFoundError


try:
from queue import Empty # Python 3
except ImportError:
from Queue import Empty # Python 2
end_queue_sentinel = [None, None]

exception = None
exception_lock = threading.Lock()
WORKER_THREAD_PER_PROCESS = 50
QUEUE_BUCKET_SIZE = 10
END_QUEUE_SENTINEL = [None, None]
GLOBAL_EXCEPTION = None
GLOBAL_EXCEPTION_LOCK = threading.Lock()


threading
def monitor_exception(exception_queue, process_ids):
global exception
global GLOBAL_EXCEPTION
logger = logging.getLogger(__name__)
logger.setLevel(logging.DEBUG)

while True:
try:
excep = exception_queue.get(timeout=0.1)
if excep == end_queue_sentinel:
local_exception = exception_queue.get(timeout=0.1)
if local_exception == END_QUEUE_SENTINEL:
break
logger.log(logging.DEBUG, "Setting global exception")
exception_lock.acquire()
exception = excep
exception_lock.release()
GLOBAL_EXCEPTION_LOCK.acquire()
GLOBAL_EXCEPTION = local_exception
GLOBAL_EXCEPTION_LOCK.release()
logger.log(logging.DEBUG, "Closing processes")
for p in process_ids:
p.terminate()
Expand All @@ -41,7 +40,7 @@ def monitor_exception(exception_queue, process_ids):
p.join()
import thread
logger.log(logging.DEBUG, "Interrupting main")
raise Exception(excep)
raise Exception(local_exception)
except Empty:
pass

Expand All @@ -51,11 +50,11 @@ def log_listener_process(queue):
try:
record = queue.get(timeout=0.1)
queue.task_done()
if record == end_queue_sentinel: # We send this as a sentinel to tell the listener to quit.
if record == END_QUEUE_SENTINEL: # We send this as a sentinel to tell the listener to quit.
break
logger = logging.getLogger(record.name)
logger.handlers.clear()
logger.handle(record) # No level or filter logic applied - just do it!
logger.handle(record) # No level or filter logic applied - just do it!
except Empty: # Try again
pass
except Exception as e:
Expand All @@ -65,14 +64,12 @@ def log_listener_process(queue):


def multi_processor_change_acl(adl, path=None, method_name="", acl_spec="", number_of_sub_process=None):
log_queue = multiprocessing.JoinableQueue()
exception_queue = multiprocessing.Queue()
logger = logging.getLogger(__name__)
logger.setLevel(logging.DEBUG)
queue_bucket_size = 10
worker_thread_num_per_process = 50

def launch_processes(number_of_processes):
if number_of_processes is None:
number_of_processes = max(2, multiprocessing.cpu_count() - 1)
process_list = []
for i in range(number_of_processes):
process_list.append(multiprocessing.Process(target=processor,
Expand All @@ -84,46 +81,50 @@ def launch_processes(number_of_processes):
def walk(walk_path):
try:
paths = []
all_files = adl._ls(path=walk_path)
all_files = adl.ls(path=walk_path, detail=True)

for files in all_files:
if files['type'] == 'DIRECTORY':
dir_processed_counter.increment() # A new directory to process
walk_thread_pool.submit(walk, files['name'])
paths.append(files['name'])
if len(paths) == queue_bucket_size:
if len(paths) == QUEUE_BUCKET_SIZE:
file_path_queue.put(list(paths))
paths = []
if paths != []:
file_path_queue.put(list(paths)) # For leftover paths < bucket_size
except FileNotFoundError:
pass # Continue in case the file was deleted in between
except:
except Exception:
import traceback
logger.exception("Failed to walk for path: " + str(walk_path) + ". Exiting!")
exception_queue.put(traceback.format_exc())
finally:
dir_processed_counter.decrement() # Processing complete for this directory

# Initialize concurrency primitives
log_queue = multiprocessing.JoinableQueue()
exception_queue = multiprocessing.Queue()
finish_queue_processing_flag = multiprocessing.Event()
file_path_queue = multiprocessing.JoinableQueue()
if number_of_sub_process == None:
number_of_sub_process = max(2, multiprocessing.cpu_count()-1)
dir_processed_counter = CountUpDownLatch()

# Start relevant threads and processes
log_listener = threading.Thread(target=log_listener_process, args=(log_queue,))
log_listener.start()
child_processes = launch_processes(number_of_sub_process)
exception_monitor_thread = threading.Thread(target=monitor_exception, args=(exception_queue, child_processes))
exception_monitor_thread.start()
log_listener = threading.Thread(target=log_listener_process, args=(log_queue,))
log_listener.start()
walk_thread_pool = ThreadPoolExecutor(max_workers=WORKER_THREAD_PER_PROCESS)

dir_processed_counter = CountUpDownLatch()
walk_thread_pool = ThreadPoolExecutor(max_workers=worker_thread_num_per_process)

file_path_queue.put([path]) # Root directory needs to be passed
# Root directory needs to be explicitly passed
file_path_queue.put([path])
dir_processed_counter.increment()
walk(path) # Start processing root directory

if dir_processed_counter.is_zero(): # Done processing all directories. Blocking call.
# Processing starts here
walk(path)

if dir_processed_counter.is_zero(): # Done processing all directories. Blocking call.
walk_thread_pool.shutdown()
file_path_queue.close() # No new elements to add
file_path_queue.join() # Wait for operations to be done
Expand All @@ -135,11 +136,11 @@ def walk(walk_path):

# Cleanup
logger.log(logging.DEBUG, "Sending exception sentinel")
exception_queue.put(end_queue_sentinel)
exception_queue.put(END_QUEUE_SENTINEL)
exception_monitor_thread.join()
logger.log(logging.DEBUG, "Exception monitor thread finished")
logger.log(logging.DEBUG, "Sending logger sentinel")
log_queue.put(end_queue_sentinel)
log_queue.put(END_QUEUE_SENTINEL)
log_queue.join()
log_queue.close()
logger.log(logging.DEBUG, "Log queue closed")
Expand All @@ -159,21 +160,19 @@ def processor(adl, file_path_queue, finish_queue_processing_flag, method_name, a
logger.setLevel(logging.DEBUG)

try:
worker_thread_num_per_process = 50
func_table = {"mod_acl": adl.modify_acl_entries, "set_acl": adl.set_acl, "rem_acl": adl.remove_acl_entries}
function_thread_pool = ThreadPoolExecutor(max_workers=worker_thread_num_per_process)
function_thread_pool = ThreadPoolExecutor(max_workers=WORKER_THREAD_PER_PROCESS)
adl_function = func_table[method_name]
logger.log(logging.DEBUG, "Started processor pid:"+str(os.getpid()))

def func_wrapper(func, path, spec):
try:
func(path=path, acl_spec=spec)
except FileNotFoundError as e:
except FileNotFoundError:
logger.exception("File "+str(path)+" not found")
pass # Exception is being logged in the relevant acl method. Do nothing here
except:
# TODO Raise to parent process
pass
# Complete Exception is being logged in the relevant acl method. Don't print exception here
except Exception as e:
logger.exception("File " + str(path) + " not set. Exception "+str(e))

logger.log(logging.DEBUG, "Completed running on path:" + str(path))

Expand All @@ -189,7 +188,6 @@ def func_wrapper(func, path, spec):

except Exception as e:
import traceback
# TODO Raise to parent process
logger.exception("Exception in pid "+str(os.getpid())+"Exception: " + str(e))
exception_queue.put(traceback.format_exc())
finally:
Expand Down
54 changes: 0 additions & 54 deletions azure_bdist_wheel.py

This file was deleted.

1 change: 0 additions & 1 deletion setup.cfg
Original file line number Diff line number Diff line change
@@ -1,3 +1,2 @@
[bdist_wheel]
universal=1
azure-namespace-package=azure-nspkg
16 changes: 6 additions & 10 deletions setup.py
Original file line number Diff line number Diff line change
@@ -1,15 +1,8 @@
#!/usr/bin/env python

import os
from setuptools import find_packages, setup
from io import open
import re
try:
from azure_bdist_wheel import cmdclass
except ImportError:
from distutils import log as logger
logger.warn("Wheel is not available, disabling bdist_wheel hook")
cmdclass = {}

with open('README.rst', encoding='utf-8') as f:
readme = f.read()
Expand Down Expand Up @@ -44,17 +37,20 @@
'Programming Language :: Python :: 3.6',
'License :: OSI Approved :: MIT License',
],
packages=find_packages(exclude=['tests']),
packages=find_packages(exclude=['tests',
# Exclude packages that will be covered by PEP420 or nspkg
'azure',
]),
install_requires=[
'cffi',
'adal>=0.4.2',
'requests>=2.20.0'
'requests>=2.20.0',
],
extras_require={
":python_version<'3.4'": ['pathlib2'],
":python_version<='2.7'": ['futures'],
":python_version<'3.0'": ['azure-nspkg'],
},
long_description=readme + '\n\n' + history,
zip_safe=False,
cmdclass=cmdclass
)