Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Caching of file-set hashes by local path and mtimes #700

Merged
merged 36 commits into from
Mar 17, 2024
Merged
Show file tree
Hide file tree
Changes from 21 commits
Commits
Show all changes
36 commits
Select commit Hold shift + click to select a range
45117ef
added code to handle "locally-persistent-ids"
tclose Feb 24, 2024
2b7ca50
implemented persistent hash cache to avoid rehashing files
tclose Feb 24, 2024
04b95ff
touched up persistent_hash_cache test
tclose Feb 24, 2024
0c865f4
replaced Cache({}) with Cache() to match new proper class
tclose Feb 24, 2024
3b3fdb7
upped resolution of mtime to nanoseconds
tclose Feb 24, 2024
81a5108
added sleep to various tests to ensure file mtimes are different
tclose Feb 24, 2024
0c4b179
added more sleeps to ensure mtimes of input files are different in tests
tclose Feb 24, 2024
615d590
debugged setting hash cache via env var and added clean up of directory
tclose Feb 24, 2024
55b660e
mock mtime writing instead of adding sleeps to ensure mtimes are diff…
tclose Feb 24, 2024
5d51736
undid overzealous black
tclose Feb 24, 2024
0421f85
added missing import
tclose Feb 24, 2024
a864b32
Adds platformdirs dependency and use it to store the hash cache within
tclose Feb 24, 2024
05ca695
added unittests to hit exceptions in persistentcache init
tclose Feb 24, 2024
52ef03f
added mkdir to location converter
tclose Feb 24, 2024
0216236
debugged mkdir of persistent cache
tclose Feb 24, 2024
bad261b
bug fixes in persistentcache location init
tclose Feb 24, 2024
2fbee2b
Revert "mock mtime writing instead of adding sleeps to ensure mtimes …
tclose Feb 24, 2024
91948f0
skip lock files in directory clean up
tclose Feb 24, 2024
e058408
fixed clean-up bug
tclose Feb 24, 2024
f1ded7a
added mock import
tclose Feb 24, 2024
bb11067
added another sleep to trigger atime change
tclose Feb 24, 2024
a031ea5
implementing @effigies suggestions
tclose Feb 29, 2024
f2f70a6
added comments and doc strings to explain the use of the persistent c…
tclose Feb 29, 2024
191aa9c
touched up comments
tclose Feb 29, 2024
3076fea
another comment touch up
tclose Feb 29, 2024
a094fbc
touch up comments again
tclose Feb 29, 2024
d27201f
Merge branch 'master' into local-cache-ids
tclose Mar 8, 2024
291f29f
[pre-commit.ci] auto fixes from pre-commit.com hooks
pre-commit-ci[bot] Mar 8, 2024
0a10f6c
added in @djarecka's test for moving file cache locations
tclose Mar 8, 2024
311e3dd
updated cache initialisation
tclose Mar 8, 2024
4827365
switched to use blake2b isntead of blake2s
tclose Mar 8, 2024
b6799b6
[skip ci] deleted already commented-out code
tclose Mar 8, 2024
2bb86fe
additional doc strings for hash cache objects
tclose Mar 8, 2024
1f601e1
added test to see that persistent cache is used in the running of tasks
tclose Mar 16, 2024
7e60c41
moved persistent hash cache within "hash_cache" subdirectory of the p…
tclose Mar 17, 2024
921979c
fixed import issue
tclose Mar 17, 2024
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions pydra/engine/submitter.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
from .workers import WORKERS
from .core import is_workflow
from .helpers import get_open_loop, load_and_run_async
from ..utils.hash import PersistentCache

import logging

Expand Down Expand Up @@ -43,6 +44,7 @@ def __call__(self, runnable, cache_locations=None, rerun=False, environment=None
self.loop.run_until_complete(
self.submit_from_call(runnable, rerun, environment)
)
PersistentCache().clean_up()
return runnable.result()

async def submit_from_call(self, runnable, rerun, environment):
Expand Down
3 changes: 3 additions & 0 deletions pydra/engine/tests/test_node_task.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,9 @@
import shutil
import attr
import numpy as np
from unittest import mock
import pytest
import time

from .utils import (
fun_addtwo,
Expand Down Expand Up @@ -320,6 +322,7 @@ def test_task_init_7(tmp_path):
output_dir1 = nn1.output_dir

# changing the content of the file
time.sleep(2) # need the mtime to be different
file2 = tmp_path / "file2.txt"
with open(file2, "w") as f:
f.write("from pydra")
Expand Down
6 changes: 6 additions & 0 deletions pydra/engine/tests/test_specs.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
import os
import attrs
from copy import deepcopy
import time

from ..specs import (
BaseSpec,
Expand Down Expand Up @@ -163,6 +164,7 @@ def test_input_file_hash_2(tmp_path):
assert hash1 == hash2

# checking if different content (the same name) affects the hash
time.sleep(2) # ensure mtime is different
file_diffcontent = tmp_path / "in_file_1.txt"
with open(file_diffcontent, "w") as f:
f.write("hi")
Expand Down Expand Up @@ -193,6 +195,7 @@ def test_input_file_hash_2a(tmp_path):
assert hash1 == hash2

# checking if different content (the same name) affects the hash
time.sleep(2) # ensure mtime is different
file_diffcontent = tmp_path / "in_file_1.txt"
with open(file_diffcontent, "w") as f:
f.write("hi")
Expand Down Expand Up @@ -234,6 +237,7 @@ def test_input_file_hash_3(tmp_path):
# assert id(files_hash1["in_file"][filename]) == id(files_hash2["in_file"][filename])

# recreating the file
time.sleep(2) # ensure mtime is different
with open(file, "w") as f:
f.write("hello")

Expand Down Expand Up @@ -288,6 +292,7 @@ def test_input_file_hash_4(tmp_path):
assert hash1 == hash2

# checking if different content (the same name) affects the hash
time.sleep(2) # need the mtime to be different
file_diffcontent = tmp_path / "in_file_1.txt"
with open(file_diffcontent, "w") as f:
f.write("hi")
Expand Down Expand Up @@ -324,6 +329,7 @@ def test_input_file_hash_5(tmp_path):
assert hash1 == hash2

# checking if different content (the same name) affects the hash
time.sleep(2) # ensure mtime is different
file_diffcontent = tmp_path / "in_file_1.txt"
with open(file_diffcontent, "w") as f:
f.write("hi")
Expand Down
193 changes: 176 additions & 17 deletions pydra/utils/hash.py
Original file line number Diff line number Diff line change
@@ -1,24 +1,26 @@
"""Generic object hashing dispatch"""

import os

# import stat
import struct
from datetime import datetime
import typing as ty
from pathlib import Path
from collections.abc import Mapping
from functools import singledispatch
from hashlib import blake2b
from hashlib import blake2b, blake2s
import logging

# from pathlib import Path
from typing import (
Dict,
Iterator,
NewType,
Sequence,
Set,
)
from filelock import SoftFileLock
import platformdirs
import attrs.exceptions
from fileformats.core import FileSet
from pydra._version import __version__

logger = logging.getLogger("pydra")

Expand Down Expand Up @@ -52,19 +54,144 @@
)

Hash = NewType("Hash", bytes)
Cache = NewType("Cache", Dict[int, Hash])
CacheKey = NewType("CacheKey", ty.Tuple[ty.Hashable, ty.Hashable])


def location_converter(path: ty.Union[Path, str, None]) -> Path:
if path is None:
path = PersistentCache.location_default()
path = Path(path)
if not path.exists():
path.mkdir(parents=True)
return path


@attrs.define
class PersistentCache:
"""Persistent cache in which to store computationally expensive hashes between nodes
and workflow/task runs

Parameters
----------
location: Path
the directory in which to store the hashes cache
"""

location: Path = attrs.field(converter=location_converter) # type: ignore[misc]
cleanup_period: int = attrs.field()
_hashes: ty.Dict[CacheKey, Hash] = attrs.field(factory=dict)

# Set the location of the persistent hash cache
LOCATION_ENV_VAR = "PYDRA_HASH_CACHE"
CLEANUP_ENV_VAR = "PYDRA_HASH_CACHE_CLEANUP_PERIOD"

@classmethod
def location_default(cls):
try:
location = os.environ[cls.LOCATION_ENV_VAR]
except KeyError:
location = platformdirs.user_cache_dir(
appname="pydra",
appauthor="nipype",
version=__version__,
)
return location

# the default needs to be an instance method
@location.default
def _location_default(self):
return self.location_default()

@location.validator
def location_validator(self, _, location):
if not os.path.isdir(location):
raise ValueError(
f"Persistent cache location '{location}' is not a directory"
)

@cleanup_period.default
def cleanup_period_default(self):
return int(os.environ.get(self.CLEANUP_ENV_VAR, 30))

def get_or_calculate_hash(self, key: CacheKey, calculate_hash: ty.Callable) -> Hash:
"""Check whether key is present in the persistent cache store and return it if so.
Otherwise use `calculate_hash` to generate the hash and save it in the persistent
store.

Parameters
----------
key : CacheKey
locally unique key (e.g. to the host) used to lookup the corresponding hash
in the persistent store
calculate_hash : ty.Callable
function to calculate the hash if it isn't present in the persistent store

Returns
-------
Hash
_description_
tclose marked this conversation as resolved.
Show resolved Hide resolved
"""
try:
return self._hashes[key]
except KeyError:
pass
key_path = self.location / blake2s(str(key).encode()).hexdigest()
tclose marked this conversation as resolved.
Show resolved Hide resolved
with SoftFileLock(key_path.with_suffix(".lock")):
if key_path.exists():
return Hash(key_path.read_bytes())
hsh = calculate_hash()
key_path.write_bytes(hsh)
return Hash(hsh)
tclose marked this conversation as resolved.
Show resolved Hide resolved

def clean_up(self):
"""Cleans up old hash caches that haven't been accessed in the last 30 days"""
now = datetime.now()
for path in self.location.iterdir():
if path.name.endswith(".lock"):
continue
days = (now - datetime.fromtimestamp(path.lstat().st_atime)).days
if days > self.cleanup_period:
path.unlink()

@classmethod
def from_path(
cls, path: ty.Union[Path, str, "PersistentCache", None]
) -> "PersistentCache":
if isinstance(path, PersistentCache):
return path
return PersistentCache(path)


@attrs.define
class Cache:
persistent: ty.Optional[PersistentCache] = attrs.field(
default=None,
converter=PersistentCache.from_path, # type: ignore[misc]
)
_hashes: ty.Dict[int, Hash] = attrs.field(factory=dict)

def __getitem__(self, object_id: int) -> Hash:
return self._hashes[object_id]

def __setitem__(self, object_id: int, hsh: Hash):
self._hashes[object_id] = hsh

def __contains__(self, object_id):
return object_id in self._hashes
tclose marked this conversation as resolved.
Show resolved Hide resolved


class UnhashableError(ValueError):
"""Error for objects that cannot be hashed"""


def hash_function(obj):
def hash_function(obj, **kwargs):
"""Generate hash of object."""
return hash_object(obj).hex()
return hash_object(obj, **kwargs).hex()


def hash_object(obj: object) -> Hash:
def hash_object(
obj: object, persistent_cache: ty.Union[PersistentCache, Path, None] = None
) -> Hash:
"""Hash an object

Constructs a byte string that uniquely identifies the object,
Expand All @@ -74,9 +201,9 @@
dicts. Custom types can be registered with :func:`register_serializer`.
"""
try:
return hash_single(obj, Cache({}))
return hash_single(obj, Cache(persistent=persistent_cache))
except Exception as e:
raise UnhashableError(f"Cannot hash object {obj!r}") from e
raise UnhashableError(f"Cannot hash object {obj!r} due to '{e}'") from e

Check warning on line 206 in pydra/utils/hash.py

View check run for this annotation

Codecov / codecov/patch

pydra/utils/hash.py#L206

Added line #L206 was not covered by tests


def hash_single(obj: object, cache: Cache) -> Hash:
Expand All @@ -89,11 +216,31 @@
if objid not in cache:
# Handle recursion by putting a dummy value in the cache
cache[objid] = Hash(b"\x00")
h = blake2b(digest_size=16, person=b"pydra-hash")
for chunk in bytes_repr(obj, cache):
h.update(chunk)
hsh = cache[objid] = Hash(h.digest())
bytes_it = bytes_repr(obj, cache)
# Pop first element from the bytes_repr iterator and check whether it is a
# "local cache key" (e.g. file-system path + mtime tuple) or the first bytes
# chunk

def calc_hash(first: ty.Optional[bytes] = None) -> Hash:
h = blake2b(digest_size=16, person=b"pydra-hash")
if first is not None:
h.update(first)
for chunk in bytes_it: # Note that `bytes_it` is in outer scope
h.update(chunk)
return Hash(h.digest())

first = next(bytes_it)
if isinstance(first, tuple):
tp = type(obj)
key = (
tp.__module__,
tp.__name__,
) + first
hsh = cache.persistent.get_or_calculate_hash(key, calc_hash)
else:
hsh = calc_hash(first=first)
logger.debug("Hash of %s object is %s", obj, hsh)
cache[objid] = hsh
return cache[objid]


Expand Down Expand Up @@ -258,6 +405,18 @@
yield b")"


@register_serializer(FileSet)
def bytes_repr_fileset(
fileset: FileSet, cache: Cache
) -> Iterator[ty.Union[CacheKey, bytes]]:
fspaths = sorted(fileset.fspaths)
yield CacheKey(
tuple(repr(p) for p in fspaths) # type: ignore[arg-type]
+ tuple(p.lstat().st_mtime_ns for p in fspaths)
)
yield from fileset.__bytes_repr__(cache)


@register_serializer(list)
@register_serializer(tuple)
def bytes_repr_seq(obj: Sequence, cache: Cache) -> Iterator[bytes]:
Expand All @@ -282,7 +441,7 @@
.. code-block:: python

>>> from pydra.utils.hash import bytes_repr_mapping_contents, Cache
>>> generator = bytes_repr_mapping_contents({"a": 1, "b": 2}, Cache({}))
>>> generator = bytes_repr_mapping_contents({"a": 1, "b": 2}, Cache())
>>> b''.join(generator)
b'str:1:a=...str:1:b=...'
"""
Expand All @@ -300,7 +459,7 @@
.. code-block:: python

>>> from pydra.utils.hash import bytes_repr_sequence_contents, Cache
>>> generator = bytes_repr_sequence_contents([1, 2], Cache({}))
>>> generator = bytes_repr_sequence_contents([1, 2], Cache())
>>> list(generator)
[b'\x6d...', b'\xa3...']
"""
Expand Down
Loading
Loading