-
Notifications
You must be signed in to change notification settings - Fork 97
/
memoize.py
174 lines (141 loc) · 5.06 KB
/
memoize.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
from datetime import datetime
import fcntl
from functools import wraps
import os
from osclib.cache_manager import CacheManager
import shelve
import pickle
# Where the cache files are stored
CACHEDIR = CacheManager.directory('memoize')
def memoize(ttl=None, session=False, add_invalidate=False):
"""Decorator function to implement a persistent cache.
>>> @memoize()
... def test_func(a):
... return a
Internally, the memoized function has a cache:
>>> cache = [c.cell_contents for c in test_func.func_closure if 'sync' in dir(c.cell_contents)][0]
>>> 'sync' in dir(cache)
True
There is a limit of the size of the cache
>>> for k in cache:
... del cache[k]
>>> len(cache)
0
>>> for i in range(4095):
... test_func(i)
... len(cache)
4095
>>> test_func(0)
0
>>> len(cache)
4095
>>> test_func(4095)
4095
>>> len(cache)
3072
>>> test_func(0)
0
>>> len(cache)
3073
>>> from datetime import timedelta
>>> k = [k for k in cache if cPickle.loads(k) == ((0,), {})][0]
>>> t, v = cache[k]
>>> t = t - timedelta(days=10)
>>> cache[k] = (t, v)
>>> test_func(0)
0
>>> t2, v = cache[k]
>>> t != t2
True
"""
# Configuration variables
SLOTS = 4096 # Number of slots in the cache file
NCLEAN = 1024 # Number of slots to remove when limit reached
TIMEOUT = 60 * 60 * 2 # Time to live for every cache slot (seconds)
memoize.session_functions = []
def _memoize(fn):
# Implement a POSIX lock / unlock extension for shelves. Inspired
# on ActiveState Code recipe #576591
def _lock(filename):
lckfile = open(filename + '.lck', 'w')
fcntl.flock(lckfile.fileno(), fcntl.LOCK_EX)
return lckfile
def _unlock(lckfile):
fcntl.flock(lckfile.fileno(), fcntl.LOCK_UN)
lckfile.close()
def _open_cache(cache_name):
if not session:
lckfile = _lock(cache_name)
cache = shelve.open(cache_name, protocol=-1)
# Store a reference to the lckfile to avoid to be
# closed by gc
cache.lckfile = lckfile
else:
if not hasattr(fn, '_memoize_session_cache'):
fn._memoize_session_cache = {}
memoize.session_functions.append(fn)
cache = fn._memoize_session_cache
return cache
def _close_cache(cache):
if not session:
cache.close()
_unlock(cache.lckfile)
def _clean_cache(cache):
len_cache = len(cache)
if len_cache >= SLOTS:
nclean = NCLEAN + len_cache - SLOTS
keys_to_delete = sorted(cache, key=lambda k: cache[k][0])[:nclean]
for key in keys_to_delete:
del cache[key]
def _key(obj):
# Pickle doesn't guarantee that there is a single
# representation for every serialization. We can try to
# picke / depickle twice to have a canonical
# representation.
key = pickle.dumps(obj, protocol=-1)
key = pickle.dumps(pickle.loads(key), protocol=-1)
return key
def _invalidate(*args, **kwargs):
key = _key((args, kwargs))
cache = _open_cache(cache_name)
if key in cache:
del cache[key]
def _invalidate_all():
cache = _open_cache(cache_name)
cache.clear()
def _add_invalidate_method(_self):
name = f'_invalidate_{fn.__name__}'
if not hasattr(_self, name):
setattr(_self, name, _invalidate)
name = '_invalidate_all'
if not hasattr(_self, name):
setattr(_self, name, _invalidate_all)
@wraps(fn)
def _fn(*args, **kwargs):
def total_seconds(td):
return (td.microseconds + (td.seconds + td.days * 24 * 3600.) * 10**6) / 10**6
now = datetime.now()
if add_invalidate:
_self = args[0]
_add_invalidate_method(_self)
first = str(args[0]) if isinstance(args[0], object) else args[0]
key = _key((first, args[1:], kwargs))
updated = False
cache = _open_cache(cache_name)
if key in cache:
timestamp, value = cache[key]
updated = True if total_seconds(now - timestamp) < ttl else False
if not updated:
value = fn(*args, **kwargs)
cache[key] = (now, value)
_clean_cache(cache)
_close_cache(cache)
return value
cache_name = os.path.join(CACHEDIR, fn.__name__)
return _fn
ttl = ttl if ttl else TIMEOUT
return _memoize
def memoize_session_reset():
"""Reset all session caches."""
for i, _ in enumerate(memoize.session_functions):
memoize.session_functions[i]._memoize_session_cache = {}