Skip to content

Commit ee96090

Browse files
committed
Add basic support for manual retries
1 parent 0ac596c commit ee96090

File tree

10 files changed

+117
-14
lines changed

10 files changed

+117
-14
lines changed

README.md

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -216,13 +216,13 @@ Note that this is just the type of exception, and contains no other values. The
216216
assert isinstance(result.errors[0].traceback, str)
217217
```
218218

219-
Note that currently, whilst `.errors` is a list, it will only ever contain a single element.
220-
221219
#### Attempts
222220

223-
The number of times a task has been run is stored as the `.attempts` attribute. This will currently only ever be 0 or 1.
221+
The number of times a task has been run is stored as the `.attempts` attribute. The date of the last attempt is stored as `.last_attempted_at`.
222+
223+
#### Retries
224224

225-
The date of the last attempt is stored as `.last_attempted_at`.
225+
A task result can be retried by calling `.retry()` (or `.aretry`). This adds the task back to the queue, retaining the `id`.
226226

227227
### Backend introspecting
228228

@@ -231,6 +231,7 @@ Because `django-tasks` enables support for multiple different backends, those ba
231231
- `supports_defer`: Can tasks be enqueued with the `run_after` attribute?
232232
- `supports_async_task`: Can coroutines be enqueued?
233233
- `supports_get_result`: Can results be retrieved after the fact (from **any** thread / process)?
234+
- `supports_retries`: Can results be retried?
234235

235236
```python
236237
from django_tasks import default_task_backend

django_tasks/backends/base.py

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,9 @@ class BaseTaskBackend(metaclass=ABCMeta):
3232
supports_get_result = False
3333
"""Can results be retrieved after the fact (from **any** thread / process)"""
3434

35+
supports_retries = False
36+
"""Can results be retried"""
37+
3538
def __init__(self, alias: str, params: dict) -> None:
3639
from django_tasks import DEFAULT_QUEUE_NAME
3740

@@ -129,3 +132,17 @@ def check(self, **kwargs: Any) -> Iterable[messages.CheckMessage]:
129132
"`ENQUEUE_ON_COMMIT` cannot be used when no databases are configured",
130133
hint="Set `ENQUEUE_ON_COMMIT` to False",
131134
)
135+
136+
def retry(self, task_result: TaskResult) -> None:
137+
"""
138+
Retry the task by putting it back into the queue store.
139+
"""
140+
raise NotImplementedError("This backend does not support retries.")
141+
142+
async def aretry(self, task_result: TaskResult) -> None:
143+
"""
144+
Retry the task by putting it back into the queue store.
145+
"""
146+
return await sync_to_async(self.retry, thread_sensitive=True)(
147+
task_result=task_result
148+
)

django_tasks/backends/database/backend.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,7 @@ class DatabaseBackend(BaseTaskBackend):
3030
supports_async_task = True
3131
supports_get_result = True
3232
supports_defer = True
33+
supports_retries = False
3334

3435
def _task_to_db_task(
3536
self,

django_tasks/backends/dummy.py

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,8 @@
2020
class DummyBackend(BaseTaskBackend):
2121
supports_defer = True
2222
supports_async_task = True
23+
supports_retries = False
24+
2325
results: list[TaskResult]
2426

2527
def __init__(self, alias: str, params: dict) -> None:

django_tasks/backends/immediate.py

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -33,8 +33,9 @@ def _execute_task(self, task_result: TaskResult) -> None:
3333
"""
3434
Execute the task for the given `TaskResult`, mutating it with the outcome
3535
"""
36-
object.__setattr__(task_result, "enqueued_at", timezone.now())
37-
task_enqueued.send(type(self), task_result=task_result)
36+
if task_result.enqueued_at is None:
37+
object.__setattr__(task_result, "enqueued_at", timezone.now())
38+
task_enqueued.send(type(self), task_result=task_result)
3839

3940
task = task_result.task
4041

@@ -106,3 +107,6 @@ def enqueue(
106107
self._execute_task(task_result)
107108

108109
return task_result
110+
111+
def retry(self, task_result: TaskResult) -> None:
112+
self._execute_task(task_result)

django_tasks/backends/rq.py

Lines changed: 15 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,4 @@
11
from collections.abc import Iterable
2-
from dataclasses import dataclass
32
from types import TracebackType
43
from typing import Any, Optional, TypeVar
54

@@ -24,8 +23,8 @@
2423
ResultStatus,
2524
Task,
2625
TaskError,
26+
TaskResult,
2727
)
28-
from django_tasks.task import TaskResult as BaseTaskResult
2928
from django_tasks.utils import get_module_path, get_random_id
3029

3130
T = TypeVar("T")
@@ -44,11 +43,6 @@
4443
}
4544

4645

47-
@dataclass(frozen=True)
48-
class TaskResult(BaseTaskResult[T]):
49-
pass
50-
51-
5246
class Job(BaseJob):
5347
def perform(self) -> Any:
5448
task_result = self.into_task_result()
@@ -257,3 +251,17 @@ def check(self, **kwargs: Any) -> Iterable[messages.CheckMessage]:
257251
f"{queue_name!r} is not configured for django-rq",
258252
f"Add {queue_name!r} to RQ_QUEUES",
259253
)
254+
255+
def retry(self, task_result: TaskResult) -> None:
256+
job = self._get_job(task_result.id)
257+
258+
if job is None:
259+
raise ResultDoesNotExist(task_result.id)
260+
261+
if job.retries_left:
262+
queue = django_rq.get_queue(task_result.task.queue_name)
263+
264+
with queue.connection.pipeline() as pipeline:
265+
job.retry(queue=queue, pipeline=pipeline)
266+
else:
267+
job.requeue()

django_tasks/task.py

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -331,3 +331,21 @@ async def arefresh(self) -> None:
331331

332332
for attr in TASK_REFRESH_ATTRS:
333333
object.__setattr__(self, attr, getattr(refreshed_task, attr))
334+
335+
def retry(self) -> None:
336+
"""
337+
Retry the task by putting it back into the queue store.
338+
"""
339+
if self.status != ResultStatus.FAILED:
340+
raise ValueError("Only failed tasks can be retried")
341+
342+
self.task.get_backend().retry(self)
343+
344+
async def aretry(self) -> None:
345+
"""
346+
Retry the task by putting it back into the queue store.
347+
"""
348+
if self.status != ResultStatus.FAILED:
349+
raise ValueError("Only failed tasks can be retried")
350+
351+
await self.task.get_backend().aretry(self)

tests/tests/test_dummy_backend.py

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -195,6 +195,21 @@ def test_enqueue_on_commit_with_no_databases(self) -> None:
195195
self.assertEqual(len(errors), 1)
196196
self.assertIn("Set `ENQUEUE_ON_COMMIT` to False", errors[0].hint) # type:ignore[arg-type]
197197

198+
async def test_no_retry(self) -> None:
199+
result = test_tasks.noop_task.enqueue()
200+
201+
with self.assertRaises(ValueError):
202+
result.retry()
203+
204+
with self.assertRaises(ValueError):
205+
await result.aretry()
206+
207+
with self.assertRaises(NotImplementedError):
208+
default_task_backend.retry(result)
209+
210+
with self.assertRaises(NotImplementedError):
211+
await default_task_backend.aretry(result)
212+
198213

199214
class DummyBackendTransactionTestCase(TransactionTestCase):
200215
@override_settings(

tests/tests/test_immediate_backend.py

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -257,6 +257,23 @@ def test_check(self) -> None:
257257

258258
self.assertEqual(len(errors), 0, errors)
259259

260+
def test_retry(self) -> None:
261+
with self.assertLogs("django_tasks", level="ERROR"):
262+
result = test_tasks.failing_task_value_error.enqueue()
263+
264+
self.assertEqual(result.status, ResultStatus.FAILED)
265+
self.assertEqual(result.attempts, 1)
266+
self.assertEqual(len(result.errors), 1)
267+
original_enqueued = result.enqueued_at
268+
269+
with self.assertLogs("django_tasks", level="ERROR"):
270+
result.retry()
271+
272+
self.assertEqual(result.status, ResultStatus.FAILED)
273+
self.assertEqual(result.attempts, 2)
274+
self.assertEqual(result.enqueued_at, original_enqueued)
275+
self.assertEqual(len(result.errors), 2)
276+
260277

261278
class ImmediateBackendTransactionTestCase(TransactionTestCase):
262279
@override_settings(

tests/tests/test_rq_backend.py

Lines changed: 21 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -61,7 +61,7 @@ def get_fake_connection(
6161
},
6262
)
6363
@modify_settings(INSTALLED_APPS={"append": ["django_rq"]})
64-
class DatabaseBackendTestCase(TransactionTestCase):
64+
class RQBackendTestCase(TransactionTestCase):
6565
def setUp(self) -> None:
6666
super().setUp()
6767

@@ -491,3 +491,23 @@ def test_unknown_queue_name(self) -> None:
491491

492492
self.assertEqual(len(errors), 1)
493493
self.assertIn("Add 'queue-2' to RQ_QUEUES", errors[0].hint) # type:ignore[arg-type]
494+
495+
def test_retry(self) -> None:
496+
result = test_tasks.failing_task_value_error.enqueue()
497+
498+
with self.assertLogs("django_tasks", "DEBUG"):
499+
self.run_worker()
500+
result.refresh()
501+
502+
self.assertEqual(result.status, ResultStatus.FAILED)
503+
self.assertEqual(result.attempts, 1)
504+
self.assertEqual(len(result.errors), 1)
505+
506+
result.retry()
507+
with self.assertLogs("django_tasks", "DEBUG"):
508+
self.run_worker()
509+
result.refresh()
510+
511+
self.assertEqual(result.status, ResultStatus.FAILED)
512+
self.assertEqual(result.attempts, 2)
513+
self.assertEqual(len(result.errors), 2)

0 commit comments

Comments
 (0)