Skip to content

Commit 2c60307

Browse files
authored
Release version 0.0.41 (#273)
* Replace content summary api call with user side processing * Move token check under retry * Raise exception on repeated zero reads * Expose timeout for AdlDownloader and AdlUploader * Refactor walk
1 parent d929cf3 commit 2c60307

File tree

112 files changed

+117618
-24
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

112 files changed

+117618
-24
lines changed

HISTORY.rst

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,13 @@
33
Release History
44
===============
55

6+
0.0.41 (2019-01-31)
7+
+++++++++++++++++++
8+
* Remove GetContentSummary api call
9+
* Move check_token() under retry block
10+
* Expose timeout parameter for AdlDownloader and AdlUploader
11+
* Raise an exception instead of silently break for zero length reads
12+
613
0.0.40 (2019-01-08)
714
+++++++++++++++++++
815
* Fix zero length read

azure/datalake/store/__init__.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,7 @@
66
# license information.
77
# --------------------------------------------------------------------------
88

9-
__version__ = "0.0.40"
9+
__version__ = "0.0.41"
1010

1111
from .core import AzureDLFileSystem
1212
from .multithread import ADLDownloader

azure/datalake/store/core.py

Lines changed: 52 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,7 @@
2323

2424

2525
# local imports
26-
from .exceptions import DatalakeBadOffsetException
26+
from .exceptions import DatalakeBadOffsetException, DatalakeIncompleteTransferException
2727
from .exceptions import FileNotFoundError, PermissionError
2828
from .lib import DatalakeRESTInterface
2929
from .utils import ensure_writable, read_block
@@ -195,17 +195,31 @@ def info(self, path, invalidate_cache=True, expected_error_code=None):
195195

196196
raise FileNotFoundError(path)
197197

198-
def _walk(self, path, invalidate_cache=True):
199-
fi = list(self._ls(path, invalidate_cache))
198+
def _walk(self, path, invalidate_cache=True, include_dirs=False):
199+
ret = list(self._ls(path, invalidate_cache))
200200
self._emptyDirs = []
201-
for apath in fi:
202-
if apath['type'] == 'DIRECTORY':
203-
sub_elements = self._ls(apath['name'], invalidate_cache)
201+
current_subdirs = [f for f in ret if f['type'] != 'FILE']
202+
while current_subdirs:
203+
dirs_below_current_level = []
204+
for apath in current_subdirs:
205+
try:
206+
sub_elements = self._ls(apath['name'], invalidate_cache)
207+
except FileNotFoundError:
208+
# Folder may have been deleted while walk is going on. Infrequent so we can take the linear hit
209+
ret.remove(apath)
210+
continue
204211
if not sub_elements:
205212
self._emptyDirs.append(apath)
206213
else:
207-
fi.extend(sub_elements)
208-
return [f for f in fi if f['type'] == 'FILE']
214+
ret.extend(sub_elements)
215+
dirs_below_current_level.extend([f for f in sub_elements if f['type'] != 'FILE'])
216+
current_subdirs = dirs_below_current_level
217+
218+
if include_dirs:
219+
return ret
220+
else:
221+
return [f for f in ret if f['type'] == 'FILE']
222+
209223

210224
def _empty_dirs_to_add(self):
211225
""" Returns directories found empty during walk. Only for internal use"""
@@ -240,9 +254,31 @@ def du(self, path, total=False, deep=False, invalidate_cache=True):
240254
return {p['name']: p['length'] for p in files}
241255

242256
def df(self, path):
243-
""" Resource summary of path """
257+
""" Resource summary of path
258+
Parameters
259+
----------
260+
path: str
261+
Location
262+
"""
244263
path = AzureDLPath(path).trim()
245-
return self.azure.call('GETCONTENTSUMMARY', path.as_posix())['ContentSummary']
264+
current_path_info = self.info(path, invalidate_cache=False)
265+
if current_path_info['type'] == 'FILE':
266+
return {'directoryCount': 0, 'fileCount': 1, 'length': current_path_info['length'], 'quota': -1,
267+
'spaceConsumed': current_path_info['length'], 'spaceQuota': -1}
268+
else:
269+
all_files_and_dirs = self._walk(path, include_dirs=True)
270+
dir_count = 1 # 1 as walk doesn't return current directory
271+
length = file_count = 0
272+
for item in all_files_and_dirs:
273+
length += item['length']
274+
if item['type'] == 'FILE':
275+
file_count += 1
276+
else:
277+
dir_count += 1
278+
279+
return {'directoryCount': dir_count, 'fileCount': file_count, 'length': length, 'quota': -1,
280+
'spaceConsumed': length, 'spaceQuota': -1}
281+
246282

247283
def chmod(self, path, mod):
248284
""" Change access mode of path
@@ -858,14 +894,18 @@ def read(self, length=-1):
858894
length = self.size
859895
if self.closed:
860896
raise ValueError('I/O operation on closed file.')
861-
897+
flag = 0
862898
out = b""
863899
while length > 0:
864900
self._read_blocksize()
865901
data_read = self.cache[self.loc - self.start:
866902
min(self.loc - self.start + length, self.end - self.start)]
867903
if not data_read: # Check to catch possible server errors. Ideally shouldn't happen.
868-
break
904+
flag += 1
905+
if flag >= 5:
906+
raise DatalakeIncompleteTransferException('Could not read data: {}. '
907+
'Repeated zero byte reads. '
908+
'Possible file corruption'.format(self.path))
869909
out += data_read
870910
self.loc += len(data_read)
871911
length -= len(data_read)

azure/datalake/store/lib.py

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -107,6 +107,7 @@ def auth(tenant_id=None, username=None,
107107
if not authority:
108108
authority = 'https://login.microsoftonline.com/'
109109

110+
110111
if not tenant_id:
111112
tenant_id = os.environ.get('azure_tenant_id', "common")
112113

@@ -373,7 +374,6 @@ def call(self, op, path='', is_extended=False, expected_error_code=None, retry_p
373374
retry_policy = ExponentialRetryPolicy() if retry_policy is None else retry_policy
374375
if op not in self.ends:
375376
raise ValueError("No such op: %s", op)
376-
self._check_token()
377377
method, required, allowed = self.ends[op]
378378
allowed.add('api-version')
379379
data = kwargs.pop('data', b'')
@@ -402,6 +402,7 @@ def call(self, op, path='', is_extended=False, expected_error_code=None, retry_p
402402
while True:
403403
retry_count += 1
404404
last_exception = None
405+
self._check_token()
405406
try:
406407
response = self.__call_once(method=method,
407408
url=url,

azure/datalake/store/multithread.py

Lines changed: 12 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -101,14 +101,17 @@ class ADLDownloader(object):
101101
Callback for progress with signature function(current, total) where
102102
current is the number of bytes transfered so far, and total is the
103103
size of the blob, or None if the total size is unknown.
104+
timeout: int (0)
105+
Default value 0 means infinite timeout. Otherwise time in seconds before the
106+
process will stop and raise an exception if transfer is still in progress
104107
105108
See Also
106109
--------
107110
azure.datalake.store.transfer.ADLTransferClient
108111
"""
109112
def __init__(self, adlfs, rpath, lpath, nthreads=None, chunksize=2**28,
110113
buffersize=2**22, blocksize=2**22, client=None, run=True,
111-
overwrite=False, verbose=False, progress_callback=None):
114+
overwrite=False, verbose=False, progress_callback=None, timeout=0):
112115

113116
# validate that the src exists and the current user has access to it
114117
# this only validates access to the top level folder. If there are files
@@ -133,7 +136,8 @@ def __init__(self, adlfs, rpath, lpath, nthreads=None, chunksize=2**28,
133136
chunked=False,
134137
verbose=verbose,
135138
parent=self,
136-
progress_callback=progress_callback)
139+
progress_callback=progress_callback,
140+
timeout=timeout)
137141
self._name = tokenize(adlfs, rpath, lpath, chunksize, blocksize)
138142
self.rpath = rpath
139143
self.lpath = lpath
@@ -378,14 +382,17 @@ class ADLUploader(object):
378382
Callback for progress with signature function(current, total) where
379383
current is the number of bytes transfered so far, and total is the
380384
size of the blob, or None if the total size is unknown.
385+
timeout: int (0)
386+
Default value 0 means infinite timeout. Otherwise time in seconds before the
387+
process will stop and raise an exception if transfer is still in progress
381388
382389
See Also
383390
--------
384391
azure.datalake.store.transfer.ADLTransferClient
385392
"""
386393
def __init__(self, adlfs, rpath, lpath, nthreads=None, chunksize=2**28,
387394
buffersize=2**22, blocksize=2**22, client=None, run=True,
388-
overwrite=False, verbose=False, progress_callback=None):
395+
overwrite=False, verbose=False, progress_callback=None, timeout=0):
389396

390397
if client:
391398
self.client = client
@@ -402,7 +409,8 @@ def __init__(self, adlfs, rpath, lpath, nthreads=None, chunksize=2**28,
402409
parent=self,
403410
verbose=verbose,
404411
unique_temporary=True,
405-
progress_callback=progress_callback)
412+
progress_callback=progress_callback,
413+
timeout=timeout)
406414
self._name = tokenize(adlfs, rpath, lpath, chunksize, blocksize)
407415
self.rpath = AzureDLPath(rpath)
408416
self.lpath = lpath

azure/datalake/store/transfer.py

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -164,6 +164,9 @@ class ADLTransferClient(object):
164164
Callback for progress with signature function(current, total) where
165165
current is the number of bytes transferred so far, and total is the
166166
size of the blob, or None if the total size is unknown.
167+
timeout: int (0)
168+
Default value 0 means infinite timeout. Otherwise time in seconds before the
169+
process will stop and raise an exception if transfer is still in progress
167170
168171
Temporary Files
169172
---------------
@@ -233,7 +236,7 @@ def __init__(self, adlfs, transfer, merge=None, nthreads=None,
233236
chunksize=2**28, blocksize=2**25, chunked=True,
234237
unique_temporary=True, delimiter=None,
235238
parent=None, verbose=False, buffersize=2**25,
236-
progress_callback=None):
239+
progress_callback=None, timeout=0):
237240
self._adlfs = adlfs
238241
self._parent = parent
239242
self._transfer = transfer
@@ -247,6 +250,7 @@ def __init__(self, adlfs, transfer, merge=None, nthreads=None,
247250
self._unique_str = uuid.uuid4().hex
248251
self._progress_callback=progress_callback
249252
self._progress_lock = threading.Lock()
253+
self._timeout = timeout
250254
self.verbose = verbose
251255

252256
# Internal state tracking files/chunks/futures
@@ -496,7 +500,7 @@ def run(self, nthreads=None, monitor=True, before_start=None):
496500
self._start(src, dst)
497501

498502
if monitor:
499-
self.monitor()
503+
self.monitor(timeout=self._timeout)
500504
has_errors = False
501505
error_list = []
502506
for f in self.progress:

0 commit comments

Comments
 (0)