Skip to content

Commit 9c1d587

Browse files
Fix/cache maxsize + tag override for artifacts/tasks + proxy issue (Netflix#327)
* Only take into account first forwarded host/proto * Fix issue with tag override for artifacts/tasks * Fix issue with cache exceeding set size There were two main issues with the current implementation: - the size of files was considered to be the number of bytes which is not the best for large number of small files which can happen when the cache is dealing with artifacts (lots of small ones). The fix is to reason based on the block size. - there was a possibility where a file could be marked as deletable and would then be re-created only to be then deleted again. This fix addresses this situation. * Added an expiration to the pg pools to recycle connections * no exception, so have to use echo (based on code convention here) * actually convention seems to be "warn(None, ...)" Co-authored-by: Jackie Tung <jackie@outerbounds.co>
1 parent 94855d3 commit 9c1d587

File tree

5 files changed

+68
-36
lines changed

5 files changed

+68
-36
lines changed

services/data/postgres_async_db.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -80,6 +80,7 @@ async def _init(self, db_conf: DBConfiguration, create_triggers=DB_TRIGGER_CREAT
8080
minsize=db_conf.pool_min,
8181
maxsize=db_conf.pool_max,
8282
timeout=db_conf.timeout,
83+
pool_recycle=10 * db_conf.timeout,
8384
echo=AIOPG_ECHO)
8485

8586
for table in self.tables:

services/ui_backend_service/api/ws.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -213,10 +213,10 @@ async def get_table_postprocessor(self, table_name):
213213
refiner_postprocess = None
214214
table = None
215215
if table_name == self.db.task_table_postgres.table_name:
216-
table = self.db.task_table_postgres
216+
table = self.db.run_table_postgres
217217
refiner_postprocess = self.task_refiner.postprocess
218218
elif table_name == self.db.artifact_table_postgres.table_name:
219-
table = self.db.artifact_table_postgres
219+
table = self.db.run_table_postgres
220220
refiner_postprocess = self.artifact_refiner.postprocess
221221

222222
if table:

services/ui_backend_service/data/cache/client/cache_server.py

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -294,6 +294,9 @@ def _error_callback(self, worker, res):
294294
def cli(root=None,
295295
max_actions=None,
296296
max_size=None):
297+
# NOTE: The store will only be accessed by this process. The processes
298+
# in the pool never touch the store itself. This is done in the __init__ and
299+
# terminate methods in Worker which all happen in this process.
297300
store = CacheStore(root, max_size, echo)
298301
Scheduler(store, max_actions).loop()
299302

services/ui_backend_service/data/cache/client/cache_store.py

Lines changed: 59 additions & 34 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,4 @@
1+
import math
12
import os
23
import time
34
import shutil
@@ -28,7 +29,7 @@ def makedirs(path):
2829

2930

3031
def key_filename(key):
31-
return hashlib.sha1(key.encode('utf-8')).hexdigest()
32+
return hashlib.sha1(key.encode("utf-8")).hexdigest()
3233

3334

3435
def object_path(root, key):
@@ -38,7 +39,7 @@ def object_path(root, key):
3839

3940

4041
def stream_path(root, key):
41-
return object_path(root, key) + '.stream'
42+
return object_path(root, key) + ".stream"
4243

4344

4445
def is_safely_readable(path):
@@ -50,11 +51,17 @@ def is_safely_readable(path):
5051

5152

5253
def filesize(path):
53-
return os.stat(path).st_size
54+
try:
55+
blk_sz = os.statvfs(path).f_bsize
56+
sz = os.stat(path).st_size
57+
if sz == 0:
58+
return blk_sz
59+
return blk_sz * math.ceil(sz / blk_sz)
60+
except Exception:
61+
return None
5462

5563

5664
class CacheStore(object):
57-
5865
def __init__(self, root, max_size, echo, fill_factor=0.8):
5966
self.root = os.path.abspath(root)
6067
self.tmproot = self._init_temp(self.root)
@@ -79,7 +86,7 @@ def warn(self, ex, msg):
7986
self.echo("IO ERROR: (%s) %s" % (ex, msg))
8087

8188
def _init_temp(self, root):
82-
tmproot = os.path.join(root, 'tmp')
89+
tmproot = os.path.join(root, "tmp")
8390
if os.path.exists(tmproot):
8491
self.safe_fileop(shutil.rmtree, tmproot)
8592
makedirs(tmproot)
@@ -94,18 +101,23 @@ def _init_gc(self, root):
94101
self.safe_fileop(os.unlink, path)
95102
else:
96103
stat_res = self.safe_fileop(os.stat, path)
97-
if stat_res:
104+
sz = filesize(path)
105+
if stat_res and sz is not None:
98106
_, info = stat_res
99107
if info.st_mtime == TIMESTAMP_FOR_DELETABLE:
100-
self.safe_fileop(os.unlink, path)
108+
# If we can't unlink it, we add it to the gc_queue
109+
# to try at least another time
110+
if not self.safe_fileop(os.unlink, path):
111+
self.gc_queue[path] = (time.time(), sz)
101112
elif info.st_mtime == TIMESTAMP_FOR_DISPOSABLE:
102-
self.disposables_queue[path] = info.st_size
113+
self.disposables_queue[path] = sz
103114
else:
104-
objects.append((info.st_mtime, (path, info.st_size)))
115+
objects.append((info.st_mtime, (path, sz)))
105116

106117
self.objects_queue.update(x for _, x in sorted(objects))
107-
self.total_size = sum(self.disposables_queue.values()) +\
108-
sum(self.objects_queue.values())
118+
self.total_size = sum(self.disposables_queue.values()) + sum(
119+
self.objects_queue.values()
120+
)
109121

110122
# It is possible that the datastore contains more than gc_watermark
111123
# bytes. To ensure that we start below the gc_watermark, we run the GC:
@@ -115,17 +127,19 @@ def _init_gc(self, root):
115127
# yet, so we can safely delete the marked objects without a quarantine:
116128
self._gc_objects(quarantine=-1)
117129

118-
self.echo("Cache initialized with %d permanents objects, "
119-
"%d disposable objects, totaling %d bytes."
120-
% (len(self.objects_queue),
121-
len(self.disposables_queue),
122-
self.total_size))
130+
self.warn(
131+
None,
132+
"Cache initialized with %d permanents objects, "
133+
"%d disposable objects, totaling %d bytes."
134+
% (len(self.objects_queue), len(self.disposables_queue), self.total_size)
135+
)
123136

124137
def _gc_objects(self, quarantine=GC_MARKER_QUARANTINE):
125138
def mark_for_deletion(path, size):
126-
self.safe_fileop(os.utime, path, (TIMESTAMP_FOR_DELETABLE,
127-
TIMESTAMP_FOR_DELETABLE))
128-
self.gc_queue[path] = (time.time(), size)
139+
if self.safe_fileop(
140+
os.utime, path, (TIMESTAMP_FOR_DELETABLE, TIMESTAMP_FOR_DELETABLE)
141+
):
142+
self.gc_queue[path] = (time.time(), size)
129143

130144
# 1) delete marked objects that are past their quarantine period
131145
limit = time.time() - quarantine
@@ -135,6 +149,13 @@ def mark_for_deletion(path, size):
135149
if self.safe_fileop(os.unlink, path):
136150
del self.gc_queue[path]
137151
self.total_size -= size
152+
else:
153+
self.echo(
154+
"Could not remove file at '%s' -- removing from GC" % path
155+
)
156+
# We still remove to prevent the garbage collector from
157+
# being stuck a few lines below.
158+
del self.gc_queue[path]
138159
else:
139160
break
140161

@@ -169,14 +190,15 @@ def ensure_path(self, path):
169190

170191
def open_tempdir(self, token, action_name, stream_key):
171192
self._gc_objects()
172-
173193
if self.total_size > self.max_size:
174-
self.warn(None, "Cache soft limit reached! Used %d bytes, max %s bytes"
175-
% (self.total_size, self.max_size))
194+
self.warn(
195+
None,
196+
"Cache soft limit reached! Used %d bytes, max %s bytes"
197+
% (self.total_size, self.max_size),
198+
)
176199

177200
try:
178-
tmp = tempfile.mkdtemp(prefix='cache_action_%s.' % token,
179-
dir=self.tmproot)
201+
tmp = tempfile.mkdtemp(prefix="cache_action_%s." % token, dir=self.tmproot)
180202
except Exception as ex:
181203
msg = "Could not create a temp directory for request %s" % token
182204
self.warn(ex, msg)
@@ -187,7 +209,7 @@ def open_tempdir(self, token, action_name, stream_key):
187209
# make sure that the new symlink points at a valid (empty!)
188210
# file by creating a dummy destination file
189211
self.ensure_path(src)
190-
open_res = self.safe_fileop(open, dst, 'w')
212+
open_res = self.safe_fileop(open, dst, "w")
191213
if open_res:
192214
_, f = open_res
193215
f.close()
@@ -198,8 +220,7 @@ def open_tempdir(self, token, action_name, stream_key):
198220
# simultaneously. We don't consider an existing
199221
# symlink (errno 17) to be an error.
200222
if ex.errno != 17:
201-
err = "Could not create a symlink %s->%s"\
202-
% (src, dst)
223+
err = "Could not create a symlink %s->%s" % (src, dst)
203224
self.warn(ex, err)
204225
except Exception as ex:
205226
self.warn(ex, "Unknown error")
@@ -227,6 +248,12 @@ def _insert(queue, key, value):
227248
# previous entry first
228249
queue.pop(key, None)
229250
queue[key] = value
251+
# If we are inserting something in disposables_queue or objects_queue,
252+
# we make sure it is no longer in the gc_queue. This can happen if, for
253+
# example, an object is marked as deletable, is therefore not "readable"
254+
# and is therefore re-created.
255+
if key in self.gc_queue:
256+
del self.gc_queue[key]
230257

231258
disposables = frozenset(disposable_keys)
232259
missing = []
@@ -243,21 +270,19 @@ def _insert(queue, key, value):
243270
if os.path.exists(src):
244271
dst = object_path(self.root, key)
245272
self.ensure_path(dst)
246-
stat_res = self.safe_fileop(os.stat, src)
247-
if stat_res and self.safe_fileop(os.rename, src, dst):
248-
_, info = stat_res
249-
size = info.st_size
273+
sz = filesize(src)
274+
if sz is not None and self.safe_fileop(os.rename, src, dst):
250275
if key in disposables:
251276
# we proceed even if we fail to mark the object as
252277
# disposable. It just means that during a possible
253278
# restart the object is treated as a non-disposable
254279
# object
255280
tstamp = TIMESTAMP_FOR_DISPOSABLE
256281
self.safe_fileop(os.utime, dst, (tstamp, tstamp))
257-
_insert(self.disposables_queue, dst, size)
282+
_insert(self.disposables_queue, dst, sz)
258283
else:
259-
_insert(self.objects_queue, dst, size)
260-
self.total_size += size
284+
_insert(self.objects_queue, dst, sz)
285+
self.total_size += sz
261286
else:
262287
missing.append(key)
263288

services/utils/__init__.py

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -134,6 +134,9 @@ def format_qs(query: Dict[str, str], overwrite=None):
134134
def format_baseurl(request: web.BaseRequest):
135135
scheme = request.headers.get("X-Forwarded-Proto") or request.scheme
136136
host = request.headers.get("X-Forwarded-Host") or request.host
137+
# Only get the first Forwarded-Host/Proto in case there are more than one
138+
scheme = scheme.split(",")[0].strip()
139+
host = host.split(",")[0].strip()
137140
baseurl = os.environ.get(
138141
"MF_BASEURL", "{scheme}://{host}".format(scheme=scheme, host=host))
139142
return "{baseurl}{path}".format(baseurl=baseurl, path=request.path)

0 commit comments

Comments
 (0)