Skip to content

Commit 359c2e4

Browse files
authored
Merge pull request #189 from clehene/master
Fixed Multi chunk transfer hangs as merging chunks fails (#187)
2 parents 4ef1e75 + 4c6c152 commit 359c2e4

File tree

3 files changed

+33
-3
lines changed

3 files changed

+33
-3
lines changed

HISTORY.rst

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,10 @@
22
33
Release History
44
===============
5+
unreleased
6+
----------
7+
* Fixed Multi chunk transfer hangs as merging chunks fails #187
8+
59
0.0.15 (2017-07-26)
610
-------------------
711
* Enable Data Lake Store progress controller callback #174
@@ -87,4 +91,4 @@ Release History
8791
0.0.1 (2016-11-21)
8892
------------------
8993
* Initial preview release. Based on API version 2016-11-01.
90-
* Includes initial ADLS filesystem functionality and extended upload and download support.
94+
* Includes initial ADLS filesystem functionality and extended upload and download support.

azure/datalake/store/transfer.py

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -431,12 +431,14 @@ def _update(self, future):
431431
if self._merge and len(cstates.objects) > 1:
432432
logger.debug("Merging file: %s", self._fstates[parent])
433433
self._fstates[parent] = 'merging'
434-
merge_future = self._submit(
434+
merge_future = self._pool.submit(
435435
self._merge, self._adlfs, dst,
436436
[chunk for chunk, _ in sorted(cstates.objects,
437437
key=operator.itemgetter(1))],
438-
overwrite=self._parent._overwrite)
438+
overwrite=self._parent._overwrite,
439+
shutdown_event=self._shutdown_event)
439440
self._ffutures[merge_future] = parent
441+
merge_future.add_done_callback(self._update)
440442
else:
441443
if not self._chunked and str(dst).endswith('.inprogress'):
442444
logger.debug("Renaming file to remove .inprogress: %s", self._fstates[parent])

tests/test_transfer.py

Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -82,6 +82,30 @@ def transfer(adlfs, src, dst, offset, size, blocksize, buffersize, shutdown_even
8282
assert calls == [(8, 32), (16, 32), (24, 32), (32, 32)]
8383

8484

85+
def test_merge(azure):
86+
87+
calls = []
88+
89+
def merge(adlfs, outfile, files, shutdown_event=None, overwrite=False):
90+
calls.append(files)
91+
92+
def transfer(adlfs, src, dst, offset, size, blocksize, buffersize, shutdown_event=None):
93+
return size, None
94+
95+
class XLoaderMock(object):
96+
_overwrite = False
97+
98+
file_size = 32
99+
chunk_size = 8
100+
client = ADLTransferClient(azure, parent=XLoaderMock(), transfer=transfer, merge=merge,
101+
chunksize=chunk_size, chunked=True)
102+
103+
client.submit('foo', AzureDLPath('bar'), file_size)
104+
client.run()
105+
106+
assert len(calls[0]) == file_size / chunk_size
107+
108+
85109
def test_temporary_path(azure):
86110
def transfer(adlfs, src, dst, offset, size, blocksize, buffersize):
87111
return size, None

0 commit comments

Comments
 (0)