Skip to content

Commit c80bc96

Browse files
authored
Various bug fixes. (#243)
* Fix for test case failure by adding randomized file path * Fix for empty folder upload issue * Fix chunked downloader to make block size requests
1 parent d64d9d8 commit c80bc96

File tree

5 files changed

+50
-11
lines changed

5 files changed

+50
-11
lines changed

HISTORY.rst

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,12 @@
33
Release History
44
===============
55

6+
0.0.32 (2018-10-04)
7+
+++++++++++++++++++
8+
* Fixed test bug
9+
* Fixed empty folder upload bug
10+
* Fixed ADL Downloader block size bug
11+
612
0.0.31 (2018-09-10)
713
+++++++++++++++++++
814
* Added support for batched ls

azure/datalake/store/__init__.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,7 @@
66
# license information.
77
# --------------------------------------------------------------------------
88

9-
__version__ = "0.0.31"
9+
__version__ = "0.0.32"
1010

1111
from .core import AzureDLFileSystem
1212
from .multithread import ADLDownloader

azure/datalake/store/multithread.py

Lines changed: 22 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -289,17 +289,23 @@ def get_chunk(adlfs, src, dst, offset, size, buffersize, blocksize,
289289
exponential_factor=backoff)
290290
try:
291291
nbytes = 0
292-
with closing(_fetch_range(adlfs.azure, src, start=offset,
293-
end=offset+size, stream=True, retry_policy=retry_policy)) as response:
294-
with open(dst, 'rb+') as fout:
295-
fout.seek(offset)
296-
for chunk in response.iter_content(chunk_size=blocksize):
292+
start = offset
293+
294+
with open(dst, 'rb+') as fout:
295+
fout.seek(start)
296+
while start < offset+size:
297+
with closing(_fetch_range(adlfs.azure, src, start=start,
298+
end=min(start+blocksize, offset+size), stream=True, retry_policy=retry_policy)) as response:
299+
chunk = response.content
297300
if shutdown_event and shutdown_event.is_set():
298301
return total_bytes_downloaded, None
299302
if chunk:
300303
nwritten = fout.write(chunk)
301304
if nwritten:
302305
nbytes += nwritten
306+
start += nwritten
307+
else:
308+
raise IOError("Failed to write to disk for {0} at location {1} with blocksize {2}".format(dst, start, blocksize))
303309
logger.debug('Downloaded %s bytes to %s, byte offset %s', nbytes, dst, offset)
304310

305311
# There are certain cases where we will be throttled and recieve less than the expected amount of data.
@@ -456,9 +462,12 @@ def _setup(self):
456462
"""
457463
is_path_walk_empty = False
458464
if "*" not in self.lpath:
459-
out = os.walk(self.lpath)
460-
lfiles = sum(([os.path.join(dir, f) for f in fnames] for
461-
(dir, _, fnames) in out), [])
465+
lfiles = []
466+
for directory, subdir, fnames in os.walk(self.lpath):
467+
lfiles.extend([os.path.join(directory, f) for f in fnames])
468+
if not subdir and not fnames: # Empty Directory
469+
self.client._adlfs._emptyDirs.append(directory)
470+
462471
if (not lfiles and os.path.exists(self.lpath) and
463472
not os.path.isdir(self.lpath)):
464473
lfiles = [self.lpath]
@@ -502,6 +511,11 @@ def run(self, nthreads=None, monitor=True):
502511
monitor: bool [True]
503512
To watch and wait (block) until completion.
504513
"""
514+
for empty_directory in self.client._adlfs._empty_dirs_to_add():
515+
local_rel_path = os.path.relpath(empty_directory, self.lpath)
516+
rel_rpath = str(AzureDLPath(self.rpath).trim().globless_prefix / local_rel_path)
517+
self.client._adlfs.mkdir(rel_rpath)
518+
505519
self.client.run(nthreads, monitor)
506520

507521
def active(self):

tests/test_core.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -836,7 +836,7 @@ def test_tail_head(azure):
836836

837837
@my_vcr.use_cassette
838838
def test_read_delimited_block(azure):
839-
fn = '/tmp/test/a'
839+
fn = a
840840
delimiter = b'\n'
841841
data = delimiter.join([b'123', b'456', b'789'])
842842
with azure_teardown(azure):

tests/test_multithread.py

Lines changed: 20 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -412,4 +412,23 @@ def test_download_root_folder(azure, tempdir):
412412
rpath = AzureDLPath('/'/test_dir / 'data/single/single'/ 'single.txt')
413413
ADLDownloader(azure, rpath=rpath, lpath=tempdir)
414414
assert os.path.isfile(os.path.join(tempdir, 'single.txt'))
415-
415+
416+
@my_vcr.use_cassette
417+
def test_upload_empty_folder(tempdir, azure):
418+
with azure_teardown(azure):
419+
os.mkdir(os.path.join(tempdir, "dir1"))
420+
os.mkdir(os.path.join(tempdir, "dir1", "b"))
421+
422+
with open(os.path.join(tempdir, "dir1", "file.txt"), 'wb') as f:
423+
f.write(b'0123456789')
424+
425+
# transfer client w/ deterministic temporary directory
426+
from azure.datalake.store.multithread import put_chunk
427+
client = ADLTransferClient(azure, transfer=put_chunk,
428+
unique_temporary=False)
429+
430+
# single chunk, empty file
431+
up = ADLUploader(azure, test_dir / "dir1", os.path.join(tempdir, "dir1") , nthreads=1,
432+
overwrite=True)
433+
assert azure.info(test_dir / "dir1" /"b")['type'] == 'DIRECTORY'
434+
azure.rm(test_dir / "dir1", recursive=True)

0 commit comments

Comments
 (0)