Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
31 changes: 24 additions & 7 deletions azure/datalake/store/multithread.py
Original file line number Diff line number Diff line change
Expand Up @@ -195,26 +195,42 @@ def clear_saved():
def hash(self):
return self._name



def _setup(self):
""" Create set of parameters to loop over
"""
if "*" not in self.rpath:
rfiles = self.client._adlfs.walk(self.rpath, details=True, invalidate_cache=True)
else:

def is_glob_path(path):
path = AzureDLPath(path).trim()
prefix = path.globless_prefix
return not path == prefix

if is_glob_path(self.rpath):
rfiles = self.client._adlfs.glob(self.rpath, details=True, invalidate_cache=True)
else:
rfiles = self.client._adlfs.walk(self.rpath, details=True, invalidate_cache=True)

if len(rfiles) == 1 and self.client._adlfs.info(self.rpath)['type'] == 'FILE':

if len(rfiles) == 0:
raise ValueError('No files to download')

# If only one file is returned we are not sure whether user specified a dir or a file to download,
# since walk gives the same result for both i.e walk("DirWithsingleFile") == walk("DirWithSingleFile\SingleFile)
# If user specified a file in rpath,
# then we want to download the file into lpath directly and not create another subdir for that.
# If user specified a dir that happens to contain only one file, we want to create the dir as well under lpath.
if len(rfiles) == 1 and not is_glob_path(self.rpath) and self.client._adlfs.info(self.rpath)['type'] == 'FILE':
if os.path.exists(self.lpath) and os.path.isdir(self.lpath):
file_pairs = [(os.path.join(self.lpath, os.path.basename(rfiles[0]['name'] + '.inprogress')),
rfiles[0])]
else:
file_pairs = [(self.lpath, rfiles[0])]
elif len(rfiles) >= 1:
else:
local_rel_rpath = str(AzureDLPath(self.rpath).trim().globless_prefix)
file_pairs = [(os.path.join(self.lpath, os.path.relpath(f['name'] +'.inprogress', local_rel_rpath)), f)
for f in rfiles]
else:
raise ValueError('No files to download')


# this property is used for internal validation
# and should not be referenced directly by public callers
Expand All @@ -231,6 +247,7 @@ def _setup(self):
self.client.submit(rfile['name'], lfile, rfile['length'])

return existing_files

def run(self, nthreads=None, monitor=True):
""" Populate transfer queue and execute downloads

Expand Down
Loading