Skip to content

Commit 5c89bb7

Browse files
committed
fix: better handle errors in celery result backend
- use celery's own "retry on result-backend error" logic - remove `die_on_unhandled` hack - new env var: `CELERY_RESULT_BACKEND_MAX_RETRIES` (default 173) - retry on error caused by `get_or_create` in overlapping transactions (with less-strict transaction isolation levels) - port fixes from celery's `BaseKeyValueStoreBackend`: - avoid clobbering successes with "worker lost" or other errors - avoid error trying to get non-existent task results - use celery's `BaseBackend` instead of `BaseDictBackend` (equivalent for back-compat; let's use the better name)
1 parent 150997a commit 5c89bb7

File tree

2 files changed

+33
-29
lines changed

2 files changed

+33
-29
lines changed

project/settings.py

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -336,6 +336,8 @@ def split(string, delim):
336336
}
337337

338338
CELERY_RESULT_BACKEND = 'share.celery:CeleryDatabaseBackend'
339+
CELERY_RESULT_BACKEND_ALWAYS_RETRY = True
340+
CELERY_RESULT_BACKEND_MAX_RETRIES = int(os.environ.get('CELERY_RESULT_BACKEND_MAX_RETRIES', 173))
339341
CELERY_RESULT_EXPIRES = int(os.environ.get(
340342
'CELERY_RESULT_EXPIRES',
341343
60 * 60 * 24 * 3, # 3 days

share/celery.py

Lines changed: 31 additions & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -1,19 +1,17 @@
11
import datetime
2-
import functools
32
import logging
43

54
from celery import states
65
from celery.app.task import Context
7-
from celery.backends.base import BaseDictBackend
6+
from celery.backends.base import BaseBackend
87
from celery.utils.time import maybe_timedelta
98

109
from django.conf import settings
1110
from django.db import transaction
1211
from django.db.models import Q
12+
from django.db.utils import IntegrityError
1313
from django.utils import timezone
1414

15-
import sentry_sdk
16-
1715
from share.models import CeleryTaskResult
1816
from share.models.sql import GroupBy
1917
from trove.util.django import pk_chunked
@@ -22,25 +20,8 @@
2220
logger = logging.getLogger(__name__)
2321

2422

25-
def die_on_unhandled(func):
26-
@functools.wraps(func)
27-
def wrapped(*args, **kwargs):
28-
err = None
29-
try:
30-
return func(*args, **kwargs)
31-
except Exception as e:
32-
err = e
33-
try:
34-
logger.exception('Celery internal method %s failed', func)
35-
sentry_sdk.capture_exception()
36-
finally:
37-
if err:
38-
raise SystemExit(57) # Something a bit less generic than 1 or -1
39-
return wrapped
40-
41-
4223
# Based on https://github.com/celery/django-celery-results/commit/f88c677d66ba1eaf1b7cb1f3b8c910012990984f
43-
class CeleryDatabaseBackend(BaseDictBackend):
24+
class CeleryDatabaseBackend(BaseBackend):
4425
"""
4526
4627
Implemented from scratch rather than subclassed due to:
@@ -53,8 +34,28 @@ class CeleryDatabaseBackend(BaseDictBackend):
5334
"""
5435
TaskModel = CeleryTaskResult
5536

56-
@die_on_unhandled
37+
# override BaseBackend
38+
def exception_safe_to_retry(self, exc):
39+
# retry error from multiple simultaneous `get_or_create` calls --
40+
# re-raises an IntegrityError while handling TaskModel.DoesNotExist
41+
# https://github.com/django/django/blob/9e7cc2b628fe8fd3895986af9b7fc9525034c1b0/django/db/models/query.py#L959
42+
return (
43+
isinstance(exc, IntegrityError)
44+
and isinstance(exc.__context__, self.TaskModel.DoesNotExist)
45+
)
46+
47+
# implement for BaseBackend
5748
def _store_result(self, task_id, result, status, traceback=None, request=None, **kwargs):
49+
_already_successful = (
50+
self.TaskModel.objects
51+
.filter(task_id=task_id, status=states.SUCCESS)
52+
.exists()
53+
)
54+
if _already_successful:
55+
# avoid clobbering prior successful result, which could be caused by network partition or lost worker, ostensibly:
56+
# https://github.com/celery/celery/blob/92514ac88afc4ccdff31f3a1018b04499607ca1e/celery/backends/base.py#L967-L972
57+
return
58+
5859
fields = {
5960
'result': result,
6061
'traceback': traceback,
@@ -88,20 +89,21 @@ def _store_result(self, task_id, result, status, traceback=None, request=None, *
8889
setattr(obj, key, value)
8990
obj.save()
9091

91-
return obj
92-
93-
@die_on_unhandled
92+
# override BaseBackend
9493
def cleanup(self, expires=None):
9594
TaskResultCleaner(
9695
success_ttl=(expires or self.expires),
9796
nonsuccess_ttl=settings.FAILED_CELERY_RESULT_EXPIRES,
9897
).clean()
9998

100-
@die_on_unhandled
99+
# implement for BaseBackend
101100
def _get_task_meta_for(self, task_id):
102-
return self.TaskModel.objects.get(task_id=task_id).as_dict()
101+
try:
102+
return self.TaskModel.objects.get(task_id=task_id).as_dict()
103+
except self.TaskModel.DoesNotExist:
104+
return {'status': states.PENDING, 'result': None}
103105

104-
@die_on_unhandled
106+
# implement for BaseBackend
105107
def _forget(self, task_id):
106108
try:
107109
self.TaskModel.objects.get(task_id=task_id).delete()

0 commit comments

Comments
 (0)