Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support cache mapper that is basename plus fixed number of parent directories #1318

Merged
merged 6 commits into from
Aug 17, 2023
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
32 changes: 28 additions & 4 deletions fsspec/implementations/cache_mapper.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,14 +30,38 @@ def __hash__(self) -> int:


class BasenameCacheMapper(AbstractCacheMapper):
"""Cache mapper that uses the basename of the remote URL.
"""Cache mapper that uses the basename of the remote URL and a fixed number
of directory levels above this.

Different paths with the same basename will therefore have the same cached
basename.
The default is zero directory levels, meaning different paths with the same
basename will have the same cached basename.
"""

def __init__(self, directory_levels: int = 0):
if directory_levels < 0:
raise ValueError(
"BasenameCacheMapper requires zero or positive directory_levels"
)
self.directory_levels = directory_levels

# Separator for directories when encoded as strings.
self._separator = "_^_"

def __call__(self, path: str) -> str:
return os.path.basename(path)
dirname, basename = os.path.split(path)

if self.directory_levels > 0:
dirs = dirname.split(os.sep)[-self.directory_levels :]
dirs.append(basename)
basename = self._separator.join(dirs)

return basename
martindurant marked this conversation as resolved.
Show resolved Hide resolved

def __eq__(self, other: Any) -> bool:
return super().__eq__(other) and self.directory_levels == other.directory_levels

def __hash__(self) -> int:
return super().__hash__() ^ hash(self.directory_levels)
martindurant marked this conversation as resolved.
Show resolved Hide resolved


class HashCacheMapper(AbstractCacheMapper):
Expand Down
36 changes: 29 additions & 7 deletions fsspec/implementations/cached.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@
import tempfile
import time
from shutil import rmtree
from typing import Any, ClassVar
from typing import TYPE_CHECKING, Any, ClassVar

from fsspec import AbstractFileSystem, filesystem
from fsspec.callbacks import _DEFAULT_CALLBACK
Expand All @@ -19,6 +19,9 @@
from fsspec.spec import AbstractBufferedFile
from fsspec.utils import infer_compression

if TYPE_CHECKING:
from fsspec.implementations.cache_mapper import AbstractCacheMapper

logger = logging.getLogger("fsspec.cached")


Expand Down Expand Up @@ -53,8 +56,9 @@ def __init__(
expiry_time=604800,
target_options=None,
fs=None,
same_names=False,
same_names: bool | None = None,
compression=None,
cache_mapper: AbstractCacheMapper | None = None,
**kwargs,
):
"""
Expand Down Expand Up @@ -84,13 +88,19 @@ def __init__(
fs: filesystem instance
The target filesystem to run against. Provide this or ``protocol``.
same_names: bool (optional)
By default, target URLs are hashed, so that files from different backends
with the same basename do not conflict. If this is true, the original
basename is used.
By default, target URLs are hashed using a ``HashCacheMapper`` so
that files from different backends with the same basename do not
conflict. If this argument is ``true``, a ``BasenameCacheMapper``
is used instead. Other cache mapper options are available by using
the ``cache_mapper`` keyword argument. Only one of this and
``cache_mapper`` should be specified.
compression: str (optional)
To decompress on download. Can be 'infer' (guess from the URL name),
one of the entries in ``fsspec.compression.compr``, or None for no
decompression.
cache_mapper: AbstractCacheMapper (optional)
The object use to map from original filenames to cached filenames.
Only one of this and ``same_names`` should be specified.
"""
super().__init__(**kwargs)
if fs is None and target_protocol is None:
Expand All @@ -115,7 +125,19 @@ def __init__(
self.check_files = check_files
self.expiry = expiry_time
self.compression = compression
self._mapper = create_cache_mapper(same_names)

if same_names is not None and cache_mapper is not None:
raise ValueError(
"Cannot specify both same_names and cache_mapper in "
"CachingFileSystem.__init__"
)
if cache_mapper is not None:
self._mapper = cache_mapper
else:
self._mapper = create_cache_mapper(
same_names if same_names is not None else False
)

self.target_protocol = (
target_protocol
if isinstance(target_protocol, str)
Expand All @@ -128,7 +150,7 @@ def _strip_protocol(path):
# acts as a method, since each instance has a difference target
return self.fs._strip_protocol(type(self)._strip_protocol(path))

self._strip_protocol = _strip_protocol
self._strip_protocol = _strip_protocol # type: ignore[method-assign]
martindurant marked this conversation as resolved.
Show resolved Hide resolved

def _mkcache(self):
os.makedirs(self.storage[-1], exist_ok=True)
Expand Down
65 changes: 59 additions & 6 deletions fsspec/implementations/tests/test_cached.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,11 @@
import fsspec
from fsspec.compression import compr
from fsspec.exceptions import BlocksizeMismatchError
from fsspec.implementations.cache_mapper import create_cache_mapper
from fsspec.implementations.cache_mapper import (
BasenameCacheMapper,
HashCacheMapper,
create_cache_mapper,
)
from fsspec.implementations.cached import CachingFileSystem, LocalTempFile
from fsspec.implementations.local import make_path_posix

Expand Down Expand Up @@ -57,9 +61,33 @@ def test_mapper():
assert hash(create_cache_mapper(True)) == hash(mapper0)
assert hash(create_cache_mapper(False)) == hash(mapper1)

with pytest.raises(
ValueError,
match="BasenameCacheMapper requires zero or positive directory_levels",
):
BasenameCacheMapper(-1)

@pytest.mark.parametrize("same_names", [False, True])
def test_metadata(tmpdir, same_names):
mapper2 = BasenameCacheMapper(1)
assert mapper2("/somedir/somefile") == "somedir_^_somefile"
assert mapper2("/otherdir/somefile") == "otherdir_^_somefile"

mapper2 = BasenameCacheMapper(2)
assert mapper2("/somedir/somefile") == "_^_somedir_^_somefile"
assert mapper2("/otherdir/somefile") == "_^_otherdir_^_somefile"

assert mapper2 != mapper0
assert mapper2 != mapper1
assert BasenameCacheMapper(2) == mapper2

assert hash(mapper2) != hash(mapper0)
assert hash(mapper2) != hash(mapper1)
assert hash(BasenameCacheMapper(2)) == hash(mapper2)


@pytest.mark.parametrize(
"cache_mapper", [BasenameCacheMapper(), BasenameCacheMapper(1), HashCacheMapper()]
)
def test_metadata(tmpdir, cache_mapper):
source = os.path.join(tmpdir, "source")
afile = os.path.join(source, "afile")
os.mkdir(source)
Expand All @@ -69,7 +97,7 @@ def test_metadata(tmpdir, same_names):
"filecache",
target_protocol="file",
cache_storage=os.path.join(tmpdir, "cache"),
same_names=same_names,
cache_mapper=cache_mapper,
)

with fs.open(afile, "rb") as f:
Expand All @@ -85,8 +113,33 @@ def test_metadata(tmpdir, same_names):

assert detail["original"] == afile_posix
assert detail["fn"] == fs._mapper(afile_posix)
if same_names:
assert detail["fn"] == "afile"

if isinstance(cache_mapper, BasenameCacheMapper):
if cache_mapper.directory_levels == 0:
assert detail["fn"] == "afile"
else:
assert detail["fn"] == "source_^_afile"


def test_constructor_kwargs(tmpdir):
fs = fsspec.filesystem("filecache", target_protocol="file", same_names=True)
assert isinstance(fs._mapper, BasenameCacheMapper)

fs = fsspec.filesystem("filecache", target_protocol="file", same_names=False)
assert isinstance(fs._mapper, HashCacheMapper)

fs = fsspec.filesystem("filecache", target_protocol="file")
assert isinstance(fs._mapper, HashCacheMapper)

with pytest.raises(
ValueError, match="Cannot specify both same_names and cache_mapper"
):
fs = fsspec.filesystem(
"filecache",
target_protocol="file",
cache_mapper=HashCacheMapper(),
same_names=True,
)


def test_idempotent():
Expand Down
Loading