From 67d425e3e7683a2846853aef0f0a960bd8e8f706 Mon Sep 17 00:00:00 2001 From: Charles Tapley Hoyt Date: Tue, 17 Nov 2020 12:35:25 +0100 Subject: [PATCH] Add "in memory" backend Closes #6 Still needs unit tests Update memory_core.py Add tests for memory core @shaypal5 these are copied from the pickle tests, the only one failing is test_memory_being_calculated. I could use your help with the implementation of the relevant function becuase I don't think I understand the threading part --- cachier/core.py | 6 +- cachier/memory_core.py | 67 ++++++++++ tests/test_memory_core.py | 270 ++++++++++++++++++++++++++++++++++++++ 3 files changed, 339 insertions(+), 4 deletions(-) create mode 100644 cachier/memory_core.py create mode 100644 tests/test_memory_core.py diff --git a/cachier/core.py b/cachier/core.py index 76b21b27..baf1a413 100644 --- a/cachier/core.py +++ b/cachier/core.py @@ -20,6 +20,7 @@ from .pickle_core import _PickleCore from .mongo_core import _MongoCore, RecalculationNeeded +from .memory_core import _MemoryCore MAX_WORKERS_ENVAR_NAME = 'CACHIER_MAX_WORKERS' @@ -157,10 +158,7 @@ def cachier( raise MissingMongetter('must specify ``mongetter`` when using the mongo core') core = _MongoCore(mongetter, stale_after, next_time, wait_for_calc_timeout) elif backend == 'memory': - raise NotImplementedError( - 'An in-memory backend has not yet been implemented. ' - 'Please see https://github.com/shaypal5/cachier/issues/6' - ) + core = _MemoryCore(stale_after=stale_after, next_time=next_time) elif backend == 'redis': raise NotImplementedError( 'A Redis backend has not yet been implemented. ' diff --git a/cachier/memory_core.py b/cachier/memory_core.py new file mode 100644 index 00000000..fc15f51d --- /dev/null +++ b/cachier/memory_core.py @@ -0,0 +1,67 @@ +"""A memory-based caching core for cachier.""" + +from collections import defaultdict +from datetime import datetime + +from .base_core import _BaseCore + + +class _MemoryCore(_BaseCore): + """The pickle core class for cachier. + + Parameters + ---------- + stale_after : datetime.timedelta, optional + See :class:`_BaseCore` documentation. + next_time : bool, optional + See :class:`_BaseCore` documentation. + """ + + def __init__(self, stale_after, next_time): + super().__init__(stale_after=stale_after, next_time=next_time) + self.cache = {} + + def get_entry_by_key(self, key, reload=False): # pylint: disable=W0221 + return key, self.cache.get(key, None) + + def get_entry(self, args, kwds, hash_params): + key = args + tuple(sorted(kwds.items())) if hash_params is None else hash_params(args, kwds) + return self.get_entry_by_key(key) + + def set_entry(self, key, func_res): + self.cache[key] = { + 'value': func_res, + 'time': datetime.now(), + 'stale': False, + 'being_calculated': False, + } + + def mark_entry_being_calculated(self, key): + try: + self.cache[key]['being_calculated'] = True + except KeyError: + self.cache[key] = { + 'value': None, + 'time': datetime.now(), + 'stale': False, + 'being_calculated': True, + } + + def mark_entry_not_calculated(self, key): + try: + self.cache[key]['being_calculated'] = False + except KeyError: + pass # that's ok, we don't need an entry in that case + + def wait_on_entry_calc(self, key): + entry = self.cache[key] + # I don't think waiting is necessary for this one + # if not entry['being_calculated']: + return entry['value'] + + def clear_cache(self): + self.cache.clear() + + def clear_being_calculated(self): + for value in self.cache.values(): + value['being_calculated'] = False diff --git a/tests/test_memory_core.py b/tests/test_memory_core.py new file mode 100644 index 00000000..a63358fc --- /dev/null +++ b/tests/test_memory_core.py @@ -0,0 +1,270 @@ +"""Test for the in-memory implementation of the Cachier python package.""" + +import hashlib +import queue +import threading +from datetime import timedelta +from random import random +from time import sleep, time + +import pandas as pd + +from cachier import cachier + + +@cachier(backend='memory', next_time=False) +def _takes_5_seconds(arg_1, arg_2): + """Some function.""" + sleep(5) + return 'arg_1:{}, arg_2:{}'.format(arg_1, arg_2) + + +def test_memory_core(): + """Basic memory core functionality.""" + _takes_5_seconds.clear_cache() + _takes_5_seconds('a', 'b') + start = time() + _takes_5_seconds('a', 'b', verbose_cache=True) + end = time() + assert end - start < 1 + _takes_5_seconds.clear_cache() + + +SECONDS_IN_DELTA = 3 +DELTA = timedelta(seconds=SECONDS_IN_DELTA) + + +@cachier(backend='memory', stale_after=DELTA, next_time=False) +def _stale_after_seconds(arg_1, arg_2): + """Some function.""" + return random() + + +def test_stale_after(): + """Testing the stale_after functionality.""" + _stale_after_seconds.clear_cache() + val1 = _stale_after_seconds(1, 2) + val2 = _stale_after_seconds(1, 2) + val3 = _stale_after_seconds(1, 3) + assert val1 == val2 + assert val1 != val3 + sleep(3) + val4 = _stale_after_seconds(1, 2) + assert val4 != val1 + _stale_after_seconds.clear_cache() + + +@cachier(backend='memory', stale_after=DELTA, next_time=True) +def _stale_after_next_time(arg_1, arg_2): + """Some function.""" + return random() + + +def test_stale_after_next_time(): + """Testing the stale_after with next_time functionality.""" + _stale_after_next_time.clear_cache() + val1 = _stale_after_next_time(1, 2) + val2 = _stale_after_next_time(1, 2) + val3 = _stale_after_next_time(1, 3) + assert val1 == val2 + assert val1 != val3 + sleep(SECONDS_IN_DELTA + 1) + val4 = _stale_after_next_time(1, 2) + assert val4 == val1 + sleep(0.5) + val5 = _stale_after_next_time(1, 2) + assert val5 != val1 + _stale_after_next_time.clear_cache() + + +@cachier(backend='memory') +def _random_num(): + return random() + + +@cachier(backend='memory') +def _random_num_with_arg(a): + # print(a) + return random() + + +def test_overwrite_cache(): + """Tests that the overwrite feature works correctly.""" + _random_num.clear_cache() + int1 = _random_num() + int2 = _random_num() + assert int2 == int1 + int3 = _random_num(overwrite_cache=True) + assert int3 != int1 + int4 = _random_num() + assert int4 == int3 + _random_num.clear_cache() + + _random_num_with_arg.clear_cache() + int1 = _random_num_with_arg('a') + int2 = _random_num_with_arg('a') + assert int2 == int1 + int3 = _random_num_with_arg('a', overwrite_cache=True) + assert int3 != int1 + int4 = _random_num_with_arg('a') + assert int4 == int3 + _random_num_with_arg.clear_cache() + + +def test_ignore_cache(): + """Tests that the ignore_cache feature works correctly.""" + _random_num.clear_cache() + int1 = _random_num() + int2 = _random_num() + assert int2 == int1 + int3 = _random_num(ignore_cache=True) + assert int3 != int1 + int4 = _random_num() + assert int4 != int3 + assert int4 == int1 + _random_num.clear_cache() + + _random_num_with_arg.clear_cache() + int1 = _random_num_with_arg('a') + int2 = _random_num_with_arg('a') + assert int2 == int1 + int3 = _random_num_with_arg('a', ignore_cache=True) + assert int3 != int1 + int4 = _random_num_with_arg('a') + assert int4 != int3 + assert int4 == int1 + _random_num_with_arg.clear_cache() + + +@cachier(backend='memory') +def _takes_time(arg_1, arg_2): + """Some function.""" + sleep(2) # this has to be enough time for check_calculation to run twice + return random() + arg_1 + arg_2 + + +def _calls_takes_time(res_queue): + res = _takes_time(0.13, 0.02) + res_queue.put(res) + + +def test_memory_being_calculated(): + """Testing memory core handling of being calculated scenarios.""" + _takes_time.clear_cache() + res_queue = queue.Queue() + thread1 = threading.Thread(target=_calls_takes_time, kwargs={'res_queue': res_queue}) + thread2 = threading.Thread(target=_calls_takes_time, kwargs={'res_queue': res_queue}) + thread1.start() + sleep(0.5) + thread2.start() + thread1.join() + thread2.join() + assert res_queue.qsize() == 2 + res1 = res_queue.get() + res2 = res_queue.get() + assert res1 == res2 + + +@cachier(backend='memory', stale_after=timedelta(seconds=1), next_time=True) +def _being_calc_next_time(arg_1, arg_2): + """Some function.""" + sleep(1) + return random() + arg_1 + arg_2 + + +def _calls_being_calc_next_time(res_queue): + res = _being_calc_next_time(0.13, 0.02) + res_queue.put(res) + + +def test_being_calc_next_time(): + """Testing memory core handling of being calculated scenarios.""" + _takes_time.clear_cache() + _being_calc_next_time(0.13, 0.02) + sleep(1.1) + res_queue = queue.Queue() + thread1 = threading.Thread( + target=_calls_being_calc_next_time, kwargs={'res_queue': res_queue}) + thread2 = threading.Thread( + target=_calls_being_calc_next_time, kwargs={'res_queue': res_queue}) + thread1.start() + sleep(0.5) + thread2.start() + thread1.join() + thread2.join() + assert res_queue.qsize() == 2 + res1 = res_queue.get() + res2 = res_queue.get() + assert res1 == res2 + + +@cachier(backend='memory') +def _bad_cache(arg_1, arg_2): + """Some function.""" + sleep(1) + return random() + arg_1 + arg_2 + + +@cachier(backend='memory') +def _delete_cache(arg_1, arg_2): + """Some function.""" + sleep(1) + return random() + arg_1 + arg_2 + + +def test_clear_being_calculated(): + """Test memory core clear `being calculated` functionality.""" + _takes_time.clear_being_calculated() + + +@cachier(backend='memory', stale_after=timedelta(seconds=1), next_time=True) +def _error_throwing_func(arg1): + if not hasattr(_error_throwing_func, 'count'): + _error_throwing_func.count = 0 + _error_throwing_func.count += 1 + if _error_throwing_func.count > 1: + raise ValueError("Tiny Rick!") + return 7 + + +def test_error_throwing_func(): + # with + res1 = _error_throwing_func(4) + sleep(1.5) + res2 = _error_throwing_func(4) + assert res1 == res2 + + +def test_callable_hash_param(): + def _hash_params(args, kwargs): + def _hash(obj): + if isinstance(obj, pd.core.frame.DataFrame): + return hashlib.sha256(pd.util.hash_pandas_object(obj).values.tobytes()).hexdigest() + return obj + + k_args = tuple(map(_hash, args)) + k_kwargs = tuple(sorted({k: _hash(v) for k, v in kwargs.items()}.items())) + return k_args + k_kwargs + + @cachier(backend='memory', hash_params=_hash_params) + def _params_with_dataframe(*args, **kwargs): + """Some function.""" + return random() + + _params_with_dataframe.clear_cache() + + df_a = pd.DataFrame.from_dict(dict(a=[0], b=[2], c=[3])) + df_b = pd.DataFrame.from_dict(dict(a=[0], b=[2], c=[3])) + value_a = _params_with_dataframe(df_a, 1) + value_b = _params_with_dataframe(df_b, 1) + + assert value_a == value_b # same content --> same key + + value_a = _params_with_dataframe(1, df=df_a) + value_b = _params_with_dataframe(1, df=df_b) + + assert value_a == value_b # same content --> same key + + +if __name__ == '__main__': + test_memory_being_calculated()