Skip to content

Commit

Permalink
Save cache metadata in JSON format (#1353)
Browse files Browse the repository at this point in the history
  • Loading branch information
ianthomas23 authored Sep 15, 2023
1 parent 302b7cc commit d99979d
Show file tree
Hide file tree
Showing 2 changed files with 112 additions and 27 deletions.
54 changes: 40 additions & 14 deletions fsspec/implementations/cache_metadata.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,12 @@

from fsspec.utils import atomic_write

try:
import ujson as json
except ImportError:
if not TYPE_CHECKING:
import json

if TYPE_CHECKING:
from typing import Any, Dict, Iterator, Literal

Expand All @@ -23,7 +29,9 @@ class CacheMetadata:
All reading and writing of cache metadata is performed by this class,
accessing the cached files and blocks is not.
Metadata is stored in a single file per storage directory, pickled.
Metadata is stored in a single file per storage directory in JSON format.
For backward compatibility, also reads metadata stored in pickle format
which is converted to JSON when next saved.
"""

def __init__(self, storage: list[str]):
Expand All @@ -41,6 +49,28 @@ def __init__(self, storage: list[str]):
self._storage = storage
self.cached_files: list[Detail] = [{}]

# Private attribute to force saving of metadata in pickle format rather than
# JSON for use in tests to confirm can read both pickle and JSON formats.
self._force_save_pickle = False

def _load(self, fn: str) -> Detail:
"""Low-level function to load metadata from specific file"""
try:
with open(fn, "r") as f:
return json.load(f)
except ValueError:
with open(fn, "rb") as f:
return pickle.load(f)

def _save(self, metadata_to_save: Detail, fn: str) -> None:
"""Low-level function to save metadata to specific file"""
if self._force_save_pickle:
with atomic_write(fn) as f:
pickle.dump(metadata_to_save, f)
else:
with atomic_write(fn, mode="w") as f:
json.dump(metadata_to_save, f)

def _scan_locations(
self, writable_only: bool = False
) -> Iterator[tuple[str, str, bool]]:
Expand Down Expand Up @@ -111,8 +141,7 @@ def clear_expired(self, expiry_time: int) -> tuple[list[str], bool]:

if self.cached_files[-1]:
cache_path = os.path.join(self._storage[-1], "cache")
with atomic_write(cache_path) as fc:
pickle.dump(self.cached_files[-1], fc)
self._save(self.cached_files[-1], cache_path)

writable_cache_empty = not self.cached_files[-1]
return expired_files, writable_cache_empty
Expand All @@ -122,13 +151,12 @@ def load(self) -> None:
cached_files = []
for fn, _, _ in self._scan_locations():
if os.path.exists(fn):
with open(fn, "rb") as f:
# TODO: consolidate blocks here
loaded_cached_files = pickle.load(f)
for c in loaded_cached_files.values():
if isinstance(c["blocks"], list):
c["blocks"] = set(c["blocks"])
cached_files.append(loaded_cached_files)
# TODO: consolidate blocks here
loaded_cached_files = self._load(fn)
for c in loaded_cached_files.values():
if isinstance(c["blocks"], list):
c["blocks"] = set(c["blocks"])
cached_files.append(loaded_cached_files)
else:
cached_files.append({})
self.cached_files = cached_files or [{}]
Expand Down Expand Up @@ -170,8 +198,7 @@ def save(self) -> None:
continue

if os.path.exists(fn):
with open(fn, "rb") as f:
cached_files = pickle.load(f)
cached_files = self._load(fn)
for k, c in cached_files.items():
if k in cache:
if c["blocks"] is True or cache[k]["blocks"] is True:
Expand All @@ -197,8 +224,7 @@ def save(self) -> None:
for c in cache.values():
if isinstance(c["blocks"], set):
c["blocks"] = list(c["blocks"])
with atomic_write(fn) as f:
pickle.dump(cache, f)
self._save(cache, fn)
self.cached_files[-1] = cached_files

def update_file(self, path: str, detail: Detail) -> None:
Expand Down
85 changes: 72 additions & 13 deletions fsspec/implementations/tests/test_cached.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
import json
import os
import pickle
import shutil
Expand Down Expand Up @@ -111,7 +112,8 @@ def test_mapper():
@pytest.mark.parametrize(
"cache_mapper", [BasenameCacheMapper(), BasenameCacheMapper(1), HashCacheMapper()]
)
def test_metadata(tmpdir, cache_mapper):
@pytest.mark.parametrize("force_save_pickle", [True, False])
def test_metadata(tmpdir, cache_mapper, force_save_pickle):
source = os.path.join(tmpdir, "source")
afile = os.path.join(source, "afile")
os.mkdir(source)
Expand All @@ -123,6 +125,7 @@ def test_metadata(tmpdir, cache_mapper):
cache_storage=os.path.join(tmpdir, "cache"),
cache_mapper=cache_mapper,
)
fs._metadata._force_save_pickle = force_save_pickle

with fs.open(afile, "rb") as f:
assert f.read(5) == b"test"
Expand All @@ -145,6 +148,42 @@ def test_metadata(tmpdir, cache_mapper):
assert detail["fn"] == "source_@_afile"


def test_metadata_replace_pickle_with_json(tmpdir):
# For backward compatibility will allow reading of old pickled metadata.
# When the metadata is next saved, it is in json format.
source = os.path.join(tmpdir, "source")
afile = os.path.join(source, "afile")
os.mkdir(source)
open(afile, "w").write("test")

# Save metadata in pickle format, to simulate old metadata
fs = fsspec.filesystem(
"filecache",
target_protocol="file",
cache_storage=os.path.join(tmpdir, "cache"),
)
fs._metadata._force_save_pickle = True
with fs.open(afile, "rb") as f:
assert f.read(5) == b"test"

# Confirm metadata is in pickle format
cache_fn = os.path.join(fs.storage[-1], "cache")
with open(cache_fn, "rb") as f:
metadata = pickle.load(f)
assert list(metadata.keys()) == [make_path_posix(afile)]

# Force rewrite of metadata, now in json format
fs._metadata._force_save_pickle = False
fs.pop_from_cache(afile)
with fs.open(afile, "rb") as f:
assert f.read(5) == b"test"

# Confirm metadata is in json format
with open(cache_fn, "r") as f:
metadata = json.load(f)
assert list(metadata.keys()) == [make_path_posix(afile)]


def test_constructor_kwargs(tmpdir):
fs = fsspec.filesystem("filecache", target_protocol="file", same_names=True)
assert isinstance(fs._mapper, BasenameCacheMapper)
Expand Down Expand Up @@ -174,7 +213,8 @@ def test_idempotent():
assert fs3.storage == fs.storage


def test_blockcache_workflow(ftp_writable, tmp_path):
@pytest.mark.parametrize("force_save_pickle", [True, False])
def test_blockcache_workflow(ftp_writable, tmp_path, force_save_pickle):
host, port, user, pw = ftp_writable
fs = FTPFileSystem(host, port, user, pw)
with fs.open("/out", "wb") as f:
Expand All @@ -194,6 +234,7 @@ def test_blockcache_workflow(ftp_writable, tmp_path):

# Open the blockcache and read a little bit of the data
fs = fsspec.filesystem("blockcache", **fs_kwargs)
fs._metadata._force_save_pickle = force_save_pickle
with fs.open("/out", "rb", block_size=5) as f:
assert f.read(5) == b"test\n"

Expand All @@ -202,13 +243,18 @@ def test_blockcache_workflow(ftp_writable, tmp_path):
del fs

# Check that cache file only has the first two blocks
with open(tmp_path / "cache", "rb") as f:
cache = pickle.load(f)
assert "/out" in cache
assert cache["/out"]["blocks"] == [0, 1]
if force_save_pickle:
with open(tmp_path / "cache", "rb") as f:
cache = pickle.load(f)
else:
with open(tmp_path / "cache", "r") as f:
cache = json.load(f)
assert "/out" in cache
assert cache["/out"]["blocks"] == [0, 1]

# Reopen the same cache and read some more...
fs = fsspec.filesystem("blockcache", **fs_kwargs)
fs._metadata._force_save_pickle = force_save_pickle
with fs.open("/out", block_size=5) as f:
assert f.read(5) == b"test\n"
f.seek(30)
Expand Down Expand Up @@ -292,7 +338,8 @@ def test_clear():
assert len(os.listdir(cache1)) < 2


def test_clear_expired(tmp_path):
@pytest.mark.parametrize("force_save_pickle", [True, False])
def test_clear_expired(tmp_path, force_save_pickle):
def __ager(cache_fn, fn, del_fn=False):
"""
Modify the cache file to virtually add time lag to selected files.
Expand All @@ -310,15 +357,23 @@ def __ager(cache_fn, fn, del_fn=False):
import time

if os.path.exists(cache_fn):
with open(cache_fn, "rb") as f:
cached_files = pickle.load(f)
fn_posix = pathlib.Path(fn).as_posix()
cached_files[fn_posix]["time"] = cached_files[fn_posix]["time"] - 691200
if force_save_pickle:
with open(cache_fn, "rb") as f:
cached_files = pickle.load(f)
else:
with open(cache_fn, "r") as f:
cached_files = json.load(f)
fn_posix = pathlib.Path(fn).as_posix()
cached_files[fn_posix]["time"] = cached_files[fn_posix]["time"] - 691200
assert os.access(cache_fn, os.W_OK), "Cache is not writable"
if del_fn:
del cached_files[fn_posix]["fn"]
with open(cache_fn, "wb") as f:
pickle.dump(cached_files, f)
if force_save_pickle:
with open(cache_fn, "wb") as f:
pickle.dump(cached_files, f)
else:
with open(cache_fn, "w") as f:
json.dump(cached_files, f)
time.sleep(1)

origin = tmp_path.joinpath("origin")
Expand Down Expand Up @@ -350,6 +405,7 @@ def __ager(cache_fn, fn, del_fn=False):
fs = fsspec.filesystem(
"filecache", target_protocol="file", cache_storage=str(cache1), cache_check=1
)
fs._metadata._force_save_pickle = force_save_pickle
assert fs.cat(str(f1)) == data

# populates "last" cache if file not found in first one
Expand All @@ -359,6 +415,7 @@ def __ager(cache_fn, fn, del_fn=False):
cache_storage=[str(cache1), str(cache2)],
cache_check=1,
)
fs._metadata._force_save_pickle = force_save_pickle
assert fs.cat(str(f2)) == data
assert fs.cat(str(f3)) == data
assert len(os.listdir(cache2)) == 3
Expand Down Expand Up @@ -390,6 +447,7 @@ def __ager(cache_fn, fn, del_fn=False):
same_names=True,
cache_check=1,
)
fs._metadata._force_save_pickle = force_save_pickle
assert fs.cat(str(f4)) == data

cache_fn = os.path.join(fs.storage[-1], "cache")
Expand All @@ -406,6 +464,7 @@ def __ager(cache_fn, fn, del_fn=False):
same_names=True,
cache_check=1,
)
fs._metadata._force_save_pickle = force_save_pickle
assert fs.cat(str(f1)) == data

cache_fn = os.path.join(fs.storage[-1], "cache")
Expand Down

0 comments on commit d99979d

Please sign in to comment.