From b5033636d55c836d749e86de5852702903206920 Mon Sep 17 00:00:00 2001 From: Jonathan Langlois Date: Sat, 2 Sep 2023 18:33:07 +0900 Subject: [PATCH] fix: avoid expanding paths when source and dest are lists --- fsspec/asyn.py | 176 +++++++++++++++++++++++++------------------- fsspec/spec.py | 194 +++++++++++++++++++++++++++---------------------- 2 files changed, 207 insertions(+), 163 deletions(-) diff --git a/fsspec/asyn.py b/fsspec/asyn.py index 03b3fcc39..8571efbc0 100644 --- a/fsspec/asyn.py +++ b/fsspec/asyn.py @@ -344,31 +344,42 @@ async def _copy( elif on_error is None: on_error = "raise" - source_is_str = isinstance(path1, str) - paths = await self._expand_path(path1, maxdepth=maxdepth, recursive=recursive) - if source_is_str and (not recursive or maxdepth is not None): - # Non-recursive glob does not copy directories - paths = [p for p in paths if not (trailing_sep(p) or await self._isdir(p))] - if not paths: - return + if isinstance(path1, list) and isinstance(path2, list): + # No need to expand paths when both source and destination + # are provided as string + paths1 = path1 + paths2 = path2 + else: + source_is_str = isinstance(path1, str) + paths1 = await self._expand_path( + path1, maxdepth=maxdepth, recursive=recursive + ) + if source_is_str and (not recursive or maxdepth is not None): + # Non-recursive glob does not copy directories + paths1 = [ + p for p in paths1 if not (trailing_sep(p) or await self._isdir(p)) + ] + if not paths1: + return + + source_is_file = len(paths1) == 1 + dest_is_dir = isinstance(path2, str) and ( + trailing_sep(path2) or await self._isdir(path2) + ) - source_is_file = len(paths) == 1 - dest_is_dir = isinstance(path2, str) and ( - trailing_sep(path2) or await self._isdir(path2) - ) + exists = source_is_str and ( + (has_magic(path1) and source_is_file) + or (not has_magic(path1) and dest_is_dir and not trailing_sep(path1)) + ) + paths2 = other_paths( + paths1, + path2, + exists=exists, + flatten=not source_is_str, + ) - exists = source_is_str and ( - (has_magic(path1) and source_is_file) - or (not has_magic(path1) and dest_is_dir and not trailing_sep(path1)) - ) - path2 = other_paths( - paths, - path2, - exists=exists, - flatten=not source_is_str, - ) batch_size = batch_size or self.batch_size - coros = [self._cp_file(p1, p2, **kwargs) for p1, p2 in zip(paths, path2)] + coros = [self._cp_file(p1, p2, **kwargs) for p1, p2 in zip(paths1, paths2)] result = await _run_coros_in_chunks( coros, batch_size=batch_size, return_exceptions=True, nofiles=True ) @@ -503,33 +514,39 @@ async def _put( constructor, or for all instances by setting the "gather_batch_size" key in ``fsspec.config.conf``, falling back to 1/8th of the system limit . """ - source_is_str = isinstance(lpath, str) - if source_is_str: - lpath = make_path_posix(lpath) - fs = LocalFileSystem() - lpaths = fs.expand_path(lpath, recursive=recursive, maxdepth=maxdepth) - if source_is_str and (not recursive or maxdepth is not None): - # Non-recursive glob does not copy directories - lpaths = [p for p in lpaths if not (trailing_sep(p) or fs.isdir(p))] - if not lpaths: - return - - source_is_file = len(lpaths) == 1 - dest_is_dir = isinstance(rpath, str) and ( - trailing_sep(rpath) or await self._isdir(rpath) - ) + if isinstance(lpath, list) and isinstance(rpath, list): + # No need to expand paths when both source and destination + # are provided as string + rpaths = rpath + lpaths = lpath + else: + source_is_str = isinstance(lpath, str) + if source_is_str: + lpath = make_path_posix(lpath) + fs = LocalFileSystem() + lpaths = fs.expand_path(lpath, recursive=recursive, maxdepth=maxdepth) + if source_is_str and (not recursive or maxdepth is not None): + # Non-recursive glob does not copy directories + lpaths = [p for p in lpaths if not (trailing_sep(p) or fs.isdir(p))] + if not lpaths: + return + + source_is_file = len(lpaths) == 1 + dest_is_dir = isinstance(rpath, str) and ( + trailing_sep(rpath) or await self._isdir(rpath) + ) - rpath = self._strip_protocol(rpath) - exists = source_is_str and ( - (has_magic(lpath) and source_is_file) - or (not has_magic(lpath) and dest_is_dir and not trailing_sep(lpath)) - ) - rpaths = other_paths( - lpaths, - rpath, - exists=exists, - flatten=not source_is_str, - ) + rpath = self._strip_protocol(rpath) + exists = source_is_str and ( + (has_magic(lpath) and source_is_file) + or (not has_magic(lpath) and dest_is_dir and not trailing_sep(lpath)) + ) + rpaths = other_paths( + lpaths, + rpath, + exists=exists, + flatten=not source_is_str, + ) is_dir = {l: os.path.isdir(l) for l in lpaths} rdirs = [r for l, r in zip(lpaths, rpaths) if is_dir[l]] @@ -574,35 +591,44 @@ async def _get( constructor, or for all instances by setting the "gather_batch_size" key in ``fsspec.config.conf``, falling back to 1/8th of the system limit . """ - source_is_str = isinstance(rpath, str) - # First check for rpath trailing slash as _strip_protocol removes it. - source_not_trailing_sep = source_is_str and not trailing_sep(rpath) - rpath = self._strip_protocol(rpath) - rpaths = await self._expand_path(rpath, recursive=recursive, maxdepth=maxdepth) - if source_is_str and (not recursive or maxdepth is not None): - # Non-recursive glob does not copy directories - rpaths = [ - p for p in rpaths if not (trailing_sep(p) or await self._isdir(p)) - ] - if not rpaths: - return + if isinstance(lpath, list) and isinstance(rpath, list): + # No need to expand paths when both source and destination + # are provided as string + rpaths = rpath + lpaths = lpath + else: + source_is_str = isinstance(rpath, str) + # First check for rpath trailing slash as _strip_protocol removes it. + source_not_trailing_sep = source_is_str and not trailing_sep(rpath) + rpath = self._strip_protocol(rpath) + rpaths = await self._expand_path( + rpath, recursive=recursive, maxdepth=maxdepth + ) + if source_is_str and (not recursive or maxdepth is not None): + # Non-recursive glob does not copy directories + rpaths = [ + p for p in rpaths if not (trailing_sep(p) or await self._isdir(p)) + ] + if not rpaths: + return - lpath = make_path_posix(lpath) - source_is_file = len(rpaths) == 1 - dest_is_dir = isinstance(lpath, str) and ( - trailing_sep(lpath) or LocalFileSystem().isdir(lpath) - ) + lpath = make_path_posix(lpath) + source_is_file = len(rpaths) == 1 + dest_is_dir = isinstance(lpath, str) and ( + trailing_sep(lpath) or LocalFileSystem().isdir(lpath) + ) + + exists = source_is_str and ( + (has_magic(rpath) and source_is_file) + or (not has_magic(rpath) and dest_is_dir and source_not_trailing_sep) + ) + lpaths = other_paths( + rpaths, + lpath, + exists=exists, + flatten=not source_is_str, + ) - exists = source_is_str and ( - (has_magic(rpath) and source_is_file) - or (not has_magic(rpath) and dest_is_dir and source_not_trailing_sep) - ) - lpaths = other_paths( - rpaths, - lpath, - exists=exists, - flatten=not source_is_str, - ) [os.makedirs(os.path.dirname(lp), exist_ok=True) for lp in lpaths] batch_size = kwargs.pop("batch_size", self.batch_size) diff --git a/fsspec/spec.py b/fsspec/spec.py index ea6949c26..504a1aa05 100644 --- a/fsspec/spec.py +++ b/fsspec/spec.py @@ -938,38 +938,44 @@ def get( Calls get_file for each source. """ - from .implementations.local import ( - LocalFileSystem, - make_path_posix, - trailing_sep, - ) + if isinstance(lpath, list) and isinstance(rpath, list): + # No need to expand paths when both source and destination + # are provided as string + rpaths = rpath + lpaths = lpath + else: + from .implementations.local import ( + LocalFileSystem, + make_path_posix, + trailing_sep, + ) - source_is_str = isinstance(rpath, str) - rpaths = self.expand_path(rpath, recursive=recursive, maxdepth=maxdepth) - if source_is_str and (not recursive or maxdepth is not None): - # Non-recursive glob does not copy directories - rpaths = [p for p in rpaths if not (trailing_sep(p) or self.isdir(p))] - if not rpaths: - return + source_is_str = isinstance(rpath, str) + rpaths = self.expand_path(rpath, recursive=recursive, maxdepth=maxdepth) + if source_is_str and (not recursive or maxdepth is not None): + # Non-recursive glob does not copy directories + rpaths = [p for p in rpaths if not (trailing_sep(p) or self.isdir(p))] + if not rpaths: + return - if isinstance(lpath, str): - lpath = make_path_posix(lpath) + if isinstance(lpath, str): + lpath = make_path_posix(lpath) - source_is_file = len(rpaths) == 1 - dest_is_dir = isinstance(lpath, str) and ( - trailing_sep(lpath) or LocalFileSystem().isdir(lpath) - ) + source_is_file = len(rpaths) == 1 + dest_is_dir = isinstance(lpath, str) and ( + trailing_sep(lpath) or LocalFileSystem().isdir(lpath) + ) - exists = source_is_str and ( - (has_magic(rpath) and source_is_file) - or (not has_magic(rpath) and dest_is_dir and not trailing_sep(rpath)) - ) - lpaths = other_paths( - rpaths, - lpath, - exists=exists, - flatten=not source_is_str, - ) + exists = source_is_str and ( + (has_magic(rpath) and source_is_file) + or (not has_magic(rpath) and dest_is_dir and not trailing_sep(rpath)) + ) + lpaths = other_paths( + rpaths, + lpath, + exists=exists, + flatten=not source_is_str, + ) callback.set_size(len(lpaths)) for lpath, rpath in callback.wrap(zip(lpaths, rpaths)): @@ -1013,43 +1019,49 @@ def put( Calls put_file for each source. """ - from .implementations.local import ( - LocalFileSystem, - make_path_posix, - trailing_sep, - ) - - source_is_str = isinstance(lpath, str) - if source_is_str: - lpath = make_path_posix(lpath) - fs = LocalFileSystem() - lpaths = fs.expand_path(lpath, recursive=recursive, maxdepth=maxdepth) - if source_is_str and (not recursive or maxdepth is not None): - # Non-recursive glob does not copy directories - lpaths = [p for p in lpaths if not (trailing_sep(p) or fs.isdir(p))] - if not lpaths: - return + if isinstance(lpath, list) and isinstance(rpath, list): + # No need to expand paths when both source and destination + # are provided as string + rpaths = rpath + lpaths = lpath + else: + from .implementations.local import ( + LocalFileSystem, + make_path_posix, + trailing_sep, + ) - source_is_file = len(lpaths) == 1 - dest_is_dir = isinstance(rpath, str) and ( - trailing_sep(rpath) or self.isdir(rpath) - ) + source_is_str = isinstance(lpath, str) + if source_is_str: + lpath = make_path_posix(lpath) + fs = LocalFileSystem() + lpaths = fs.expand_path(lpath, recursive=recursive, maxdepth=maxdepth) + if source_is_str and (not recursive or maxdepth is not None): + # Non-recursive glob does not copy directories + lpaths = [p for p in lpaths if not (trailing_sep(p) or fs.isdir(p))] + if not lpaths: + return + + source_is_file = len(lpaths) == 1 + dest_is_dir = isinstance(rpath, str) and ( + trailing_sep(rpath) or self.isdir(rpath) + ) - rpath = ( - self._strip_protocol(rpath) - if isinstance(rpath, str) - else [self._strip_protocol(p) for p in rpath] - ) - exists = source_is_str and ( - (has_magic(lpath) and source_is_file) - or (not has_magic(lpath) and dest_is_dir and not trailing_sep(lpath)) - ) - rpaths = other_paths( - lpaths, - rpath, - exists=exists, - flatten=not source_is_str, - ) + rpath = ( + self._strip_protocol(rpath) + if isinstance(rpath, str) + else [self._strip_protocol(p) for p in rpath] + ) + exists = source_is_str and ( + (has_magic(lpath) and source_is_file) + or (not has_magic(lpath) and dest_is_dir and not trailing_sep(lpath)) + ) + rpaths = other_paths( + lpaths, + rpath, + exists=exists, + flatten=not source_is_str, + ) callback.set_size(len(rpaths)) for lpath, rpath in callback.wrap(zip(lpaths, rpaths)): @@ -1080,38 +1092,44 @@ def copy( not-found exceptions will cause the path to be skipped; defaults to raise unless recursive is true, where the default is ignore """ - from .implementations.local import trailing_sep - if on_error is None and recursive: on_error = "ignore" elif on_error is None: on_error = "raise" - source_is_str = isinstance(path1, str) - paths = self.expand_path(path1, recursive=recursive, maxdepth=maxdepth) - if source_is_str and (not recursive or maxdepth is not None): - # Non-recursive glob does not copy directories - paths = [p for p in paths if not (trailing_sep(p) or self.isdir(p))] - if not paths: - return - - source_is_file = len(paths) == 1 - dest_is_dir = isinstance(path2, str) and ( - trailing_sep(path2) or self.isdir(path2) - ) + if isinstance(path1, list) and isinstance(path2, list): + # No need to expand paths when both source and destination + # are provided as string + paths1 = path1 + paths2 = path2 + else: + from .implementations.local import trailing_sep + + source_is_str = isinstance(path1, str) + paths1 = self.expand_path(path1, recursive=recursive, maxdepth=maxdepth) + if source_is_str and (not recursive or maxdepth is not None): + # Non-recursive glob does not copy directories + paths1 = [p for p in paths1 if not (trailing_sep(p) or self.isdir(p))] + if not paths1: + return + + source_is_file = len(paths1) == 1 + dest_is_dir = isinstance(path2, str) and ( + trailing_sep(path2) or self.isdir(path2) + ) - exists = source_is_str and ( - (has_magic(path1) and source_is_file) - or (not has_magic(path1) and dest_is_dir and not trailing_sep(path1)) - ) - path2 = other_paths( - paths, - path2, - exists=exists, - flatten=not source_is_str, - ) + exists = source_is_str and ( + (has_magic(path1) and source_is_file) + or (not has_magic(path1) and dest_is_dir and not trailing_sep(path1)) + ) + paths2 = other_paths( + paths1, + path2, + exists=exists, + flatten=not source_is_str, + ) - for p1, p2 in zip(paths, path2): + for p1, p2 in zip(paths1, paths2): try: self.cp_file(p1, p2, **kwargs) except FileNotFoundError: