diff --git a/fsspec/generic.py b/fsspec/generic.py index bee559ec4..15151f330 100644 --- a/fsspec/generic.py +++ b/fsspec/generic.py @@ -1,7 +1,13 @@ +from __future__ import annotations + import inspect import logging +import os +import shutil +import uuid +from typing import Optional -from .asyn import AsyncFileSystem +from .asyn import AsyncFileSystem, _run_coros_in_chunks, sync_wrapper from .callbacks import _DEFAULT_CALLBACK from .core import filesystem, get_filesystem_class, split_protocol, url_to_fs @@ -52,19 +58,23 @@ def rsync( Parameters ---------- source: str - Root of the directory tree to take files from. + Root of the directory tree to take files from. This must be a directory, but + do not include any terminating "/" character destination: str Root path to copy into. The contents of this location should be - identical to the contents of ``source`` when done. + identical to the contents of ``source`` when done. This will be made a + directory, and the terminal "/" should not be included. delete_missing: bool If there are paths in the destination that don't exist in the source and this is True, delete them. Otherwise, leave them alone. - source_field: str + source_field: str | callable If ``update_field`` is "different", this is the key in the info - of source files to consider for difference. - dest_field: str + of source files to consider for difference. Maybe a function of the + info dict. + dest_field: str | callable If ``update_field`` is "different", this is the key in the info - of destination files to consider for difference. + of destination files to consider for difference. May be a function of + the info dict. update_cond: "different"|"always"|"never" If "always", every file is copied, regardless of whether it exists in the destination. If "never", files that exist in the destination are @@ -91,9 +101,10 @@ def rsync( if v["type"] == "directory" and a.replace(source, destination) not in otherfiles ] logger.debug(f"{len(dirs)} directories to create") - for dirn in dirs: - # no async - fs.mkdirs(dirn.replace(source, destination), exist_ok=True) + if dirs: + fs.make_many_dirs( + [dirn.replace(source, destination) for dirn in dirs], exist_ok=True + ) allfiles = {a: v for a, v in allfiles.items() if v["type"] == "file"} logger.debug(f"{len(allfiles)} files to consider for copy") to_delete = [ @@ -107,7 +118,10 @@ def rsync( if update_cond == "always": allfiles[k] = otherfile elif update_cond == "different": - if v[source_field] != otherfiles[otherfile][dest_field]: + inf1 = source_field(v) if callable(source_field) else v[source_field] + v2 = otherfiles[otherfile] + inf2 = dest_field(v2) if callable(dest_field) else v2[dest_field] + if inf1 != inf2: # details mismatch, make copy allfiles[k] = otherfile else: @@ -116,12 +130,12 @@ def rsync( else: # file not in target yet allfiles[k] = otherfile + logger.debug(f"{len(allfiles)} files to copy") if allfiles: source_files, target_files = zip(*allfiles.items()) - logger.debug(f"{len(source_files)} files to copy") fs.cp(source_files, target_files, **kwargs) + logger.debug(f"{len(to_delete)} files to delete") if delete_missing: - logger.debug(f"{len(to_delete)} files to delete") fs.rm(to_delete) @@ -166,11 +180,11 @@ async def _find(self, path, maxdepth=None, withdirs=False, detail=False, **kwarg fs = _resolve_fs(path, self.method) if fs.async_impl: out = await fs._find( - path, maxdepth=maxdepth, withdirs=withdirs, detail=detail, **kwargs + path, maxdepth=maxdepth, withdirs=withdirs, detail=True, **kwargs ) else: out = fs.find( - path, maxdepth=maxdepth, withdirs=withdirs, detail=detail, **kwargs + path, maxdepth=maxdepth, withdirs=withdirs, detail=True, **kwargs ) result = {} for k, v in out.items(): @@ -239,6 +253,7 @@ async def _rm(self, url, **kwargs): fs.rm(url, **kwargs) async def _makedirs(self, path, exist_ok=False): + logger.debug("Make dir %s", path) fs = _resolve_fs(path, self.method) if fs.async_impl: await fs._makedirs(path, exist_ok=exist_ok) @@ -295,6 +310,84 @@ async def _cp_file( # fail while opening f1 or f2 pass + async def _make_many_dirs(self, urls, exist_ok=True): + fs = _resolve_fs(urls[0], self.method) + if fs.async_impl: + coros = [fs._makedirs(u, exist_ok=exist_ok) for u in urls] + await _run_coros_in_chunks(coros) + else: + for u in urls: + fs.makedirs(u, exist_ok=exist_ok) + + make_many_dirs = sync_wrapper(_make_many_dirs) + + async def _copy( + self, + path1: list[str], + path2: list[str], + recursive: bool = False, + on_error: str = "ignore", + maxdepth: Optional[int] = None, + batch_size: Optional[int] = None, + tempdir: Optional[str] = None, + **kwargs, + ): + if recursive: + raise NotImplementedError + fs = _resolve_fs(path1[0], self.method) + fs2 = _resolve_fs(path2[0], self.method) + # not expanding paths atm., assume call is from rsync() + if fs is fs2: + # pure remote + if fs.async_impl: + return await fs._copy(path1, path2, **kwargs) + else: + return fs.copy(path1, path2, **kwargs) + await copy_file_op( + fs, path1, fs2, path2, tempdir, batch_size, on_error=on_error + ) + + +async def copy_file_op( + fs1, url1, fs2, url2, tempdir=None, batch_size=20, on_error="ignore" +): + import tempfile + + tempdir = tempdir or tempfile.mkdtemp() + try: + coros = [ + _copy_file_op( + fs1, + u1, + fs2, + u2, + os.path.join(tempdir, uuid.uuid4().hex), + on_error=on_error, + ) + for u1, u2 in zip(url1, url2) + ] + await _run_coros_in_chunks(coros, batch_size=batch_size) + finally: + shutil.rmtree(tempdir) + + +async def _copy_file_op(fs1, url1, fs2, url2, local, on_error="ignore"): + ex = () if on_error == "raise" else Exception + logger.debug("Copy %s -> %s", url1, url2) + try: + if fs1.async_impl: + await fs1._get_file(url1, local) + else: + fs1.get_file(url1, local) + if fs2.async_impl: + await fs2._put_file(local, url2) + else: + fs2.put_file(local, url2) + os.unlink(local) + logger.debug("Copy %s -> %s; done", url1, url2) + except ex as e: + logger.debug("ignoring cp exception for %s: %s", url1, e) + async def maybe_await(cor): if inspect.iscoroutine(cor): diff --git a/fsspec/tests/test_generic.py b/fsspec/tests/test_generic.py index aa8d9bfc3..fc4c8bf01 100644 --- a/fsspec/tests/test_generic.py +++ b/fsspec/tests/test_generic.py @@ -29,7 +29,7 @@ def test_touch_rm(m): def test_cp_async_to_sync(server, m): fsspec.filesystem("http", headers={"give_length": "true", "head_ok": "true"}) fs = fsspec.filesystem("generic", default_method="current") - fs.cp(server + "/index/realfile", "memory://realfile") + fs.cp([server + "/index/realfile"], ["memory://realfile"]) assert m.cat("realfile") == data fs.rm("memory://realfile")