Skip to content

Commit 5ad9bf3

Browse files
Remove concat operation from multi-part upload
2 parents 693ed62 + 4e69833 commit 5ad9bf3

File tree

4 files changed

+16
-35
lines changed

4 files changed

+16
-35
lines changed

HISTORY.rst

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,9 @@
22
33
Release History
44
===============
5+
1.0.1 (2025-05-16)
6+
+++++++++++++++++++
7+
* Remove concat operation from multi-part upload. Upload of large fies will now be done in single chunk.
58

69
1.0.0-alpha0 (2024-07-12)
710
+++++++++++++++++++++++++

azure/datalake/store/multithread.py

Lines changed: 6 additions & 31 deletions
Original file line numberDiff line numberDiff line change
@@ -376,7 +376,7 @@ class ADLUploader(object):
376376
not supported.
377377
nthreads: int [None]
378378
Number of threads to use. If None, uses the number of cores.
379-
chunksize: int [2**28]
379+
chunksize: int [None]
380380
Number of bytes for a chunk. Large files are split into chunks. Files
381381
smaller than this number will always be transferred in a single thread.
382382
buffersize: int [2**22]
@@ -408,7 +408,7 @@ class ADLUploader(object):
408408
--------
409409
azure.datalake.store.transfer.ADLTransferClient
410410
"""
411-
def __init__(self, adlfs, rpath, lpath, nthreads=None, chunksize=2**28,
411+
def __init__(self, adlfs, rpath, lpath, nthreads=None, chunksize=None,
412412
buffersize=2**22, blocksize=2**22, client=None, run=True,
413413
overwrite=False, verbose=False, progress_callback=None, timeout=0):
414414

@@ -418,11 +418,11 @@ def __init__(self, adlfs, rpath, lpath, nthreads=None, chunksize=2**28,
418418
self.client = ADLTransferClient(
419419
adlfs,
420420
transfer=put_chunk,
421-
merge=merge_chunks,
422421
nthreads=nthreads,
423-
chunksize=chunksize,
422+
chunksize=None,
424423
buffersize=buffersize,
425424
blocksize=blocksize,
425+
chunked=False,
426426
delimiter=None, # TODO: see utils.cs for what is required to support delimiters.
427427
parent=self,
428428
verbose=verbose,
@@ -585,35 +585,10 @@ def put_chunk(adlfs, src, dst, offset, size, buffersize, blocksize, delimiter=No
585585
return nbytes, None
586586
data = read_block(fin, o, miniblock, delimiter)
587587
nbytes += fout.write(data)
588-
588+
589589
except Exception as e:
590590
exception = repr(e)
591591
logger.error('Upload failed %s; %s', src, exception)
592592
return nbytes, exception
593593
logger.debug('Uploaded from %s, byte offset %s', src, offset)
594-
return nbytes, None
595-
596-
597-
def merge_chunks(adlfs, outfile, files, shutdown_event=None, overwrite=False):
598-
try:
599-
# note that it is assumed that only temp files from this run are in the segment folder created.
600-
# so this call is optimized to instantly delete the temp folder on concat.
601-
# if somehow the target file was created between the beginning of upload
602-
# and concat, we will remove it if the user specified overwrite.
603-
# here we must get the most up to date information from the service,
604-
# instead of relying on the local cache to ensure that we know if
605-
# the merge target already exists.
606-
if adlfs.exists(outfile, invalidate_cache=True):
607-
if overwrite:
608-
adlfs.remove(outfile, True)
609-
else:
610-
raise FileExistsError(outfile)
611-
612-
adlfs.concat(outfile, files, delete_source=True)
613-
except Exception as e:
614-
exception = repr(e)
615-
logger.error('Merged failed %s; %s', outfile, exception)
616-
return exception
617-
logger.debug('Merged %s', outfile)
618-
adlfs.invalidate_cache(outfile)
619-
return None
594+
return nbytes, None

azure/datalake/store/transfer.py

Lines changed: 6 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -285,8 +285,11 @@ def submit(self, src, dst, length):
285285
else:
286286
tmpdir = None
287287

288-
# TODO: might need xrange support for py2
289-
offsets = range(0, length, self._chunksize)
288+
if self._chunksize is None:
289+
offsets = [0] # Treat the entire file as a single chunk
290+
else:
291+
# TODO: might need xrange support for py2
292+
offsets = range(0, length, self._chunksize)
290293

291294
# in the case of empty files, ensure that the initial offset of 0 is properly added.
292295
if not offsets:
@@ -304,7 +307,7 @@ def submit(self, src, dst, length):
304307
cstates[(name, offset)] = 'pending'
305308
self._chunks[(name, offset)] = {
306309
"parent": (src, dst),
307-
"expected": min(length - offset, self._chunksize),
310+
"expected": min(length - offset, self._chunksize or length),
308311
"actual": 0,
309312
"exception": None}
310313
logger.debug("Submitted %s, byte offset %d", name, offset)

tests/test_multithread.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -300,7 +300,7 @@ def test_upload_one(local_files, azure):
300300
# multiple chunks, one thread
301301
size = 10000
302302
up = ADLUploader(azure, test_dir / 'bigfile', bigfile, nthreads=1,
303-
chunksize=size//5, client=client, run=False,
303+
client=client, run=False,
304304
overwrite=True)
305305
up.run()
306306

0 commit comments

Comments
 (0)