Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 9 additions & 1 deletion django_tasks/backends/database/admin.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,15 @@

@admin.register(DBTaskResult)
class DBTaskResultAdmin(admin.ModelAdmin):
list_display = ("id", "get_task_name", "status", "priority", "queue_name")
list_display = (
"id",
"get_task_name",
"status",
"enqueued_at",
"finished_at",
"priority",
"queue_name",
)
list_filter = ("status", "priority", "queue_name")

def has_add_permission(
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
# Generated by Django 5.0.6 on 2024-06-13 10:24

import django.utils.timezone
from django.db import migrations, models


class Migration(migrations.Migration):
dependencies = [
("django_tasks_database", "0002_alter_dbtaskresult_options"),
]

operations = [
migrations.AddField(
model_name="dbtaskresult",
name="enqueued_at",
field=models.DateTimeField(
auto_now_add=True, default=django.utils.timezone.now
),
preserve_default=False,
),
migrations.AddField(
model_name="dbtaskresult",
name="finished_at",
field=models.DateTimeField(null=True),
),
]
11 changes: 9 additions & 2 deletions django_tasks/backends/database/models.py
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,9 @@ class DBTaskResult(GenericBase[P, T], models.Model):
max_length=max(len(value) for value in ResultStatus.values),
)

enqueued_at = models.DateTimeField(auto_now_add=True)
finished_at = models.DateTimeField(null=True)

args_kwargs = models.JSONField()

priority = models.PositiveSmallIntegerField(default=0)
Expand Down Expand Up @@ -108,6 +111,8 @@ def task_result(self) -> "TaskResult[T]":
task=self.task,
id=str(self.id),
status=ResultStatus[self.status],
enqueued_at=self.enqueued_at,
finished_at=self.finished_at,
args=self.args_kwargs["args"],
kwargs=self.args_kwargs["kwargs"],
backend=self.backend_name,
Expand All @@ -126,9 +131,11 @@ def claim(self) -> None:

def set_result(self, result: Any) -> None:
self.status = ResultStatus.COMPLETE
self.finished_at = timezone.now()
self.result = result
self.save(update_fields=["status", "result"])
self.save(update_fields=["status", "result", "finished_at"])

def set_failed(self) -> None:
self.status = ResultStatus.FAILED
self.save(update_fields=["status"])
self.finished_at = timezone.now()
self.save(update_fields=["status", "finished_at"])
3 changes: 3 additions & 0 deletions django_tasks/backends/dummy.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
from typing import List, TypeVar
from uuid import uuid4

from django.utils import timezone
from typing_extensions import ParamSpec

from django_tasks.exceptions import ResultDoesNotExist
Expand Down Expand Up @@ -33,6 +34,8 @@ def enqueue(
task=task,
id=str(uuid4()),
status=ResultStatus.NEW,
enqueued_at=timezone.now(),
finished_at=None,
args=json_normalize(args),
kwargs=json_normalize(kwargs),
backend=self.alias,
Expand Down
4 changes: 4 additions & 0 deletions django_tasks/backends/immediate.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
from uuid import uuid4

from asgiref.sync import async_to_sync
from django.utils import timezone
from typing_extensions import ParamSpec

from django_tasks.task import ResultStatus, Task, TaskResult
Expand All @@ -26,6 +27,7 @@ def enqueue(
async_to_sync(task.func) if iscoroutinefunction(task.func) else task.func
)

enqueued_at = timezone.now()
try:
result = json_normalize(calling_task_func(*args, **kwargs))
status = ResultStatus.COMPLETE
Expand All @@ -37,6 +39,8 @@ def enqueue(
task=task,
id=str(uuid4()),
status=status,
enqueued_at=enqueued_at,
finished_at=timezone.now(),
args=json_normalize(args),
kwargs=json_normalize(kwargs),
backend=self.alias,
Expand Down
12 changes: 10 additions & 2 deletions django_tasks/task.py
Original file line number Diff line number Diff line change
Expand Up @@ -177,6 +177,12 @@ class TaskResult(Generic[T]):
status: ResultStatus
"""The status of the running task"""

enqueued_at: datetime
"""The time this task was enqueued"""

finished_at: Optional[datetime]
"""The time this task was finished"""

args: list
"""The arguments to pass to the task function"""

Expand Down Expand Up @@ -207,8 +213,9 @@ def refresh(self) -> None:
"""
refreshed_task = self.task.get_backend().get_result(self.id)

# status and result are the only refreshable attributes
# status, finished_at and result are the only refreshable attributes
self.status = refreshed_task.status
self.finished_at = refreshed_task.finished_at
self._result = refreshed_task._result

async def arefresh(self) -> None:
Expand All @@ -217,6 +224,7 @@ async def arefresh(self) -> None:
"""
refreshed_task = await self.task.get_backend().aget_result(self.id)

# status and result are the only refreshable attributes
# status, finished_at and result are the only refreshable attributes
self.status = refreshed_task.status
self.finished_at = refreshed_task.finished_at
self._result = refreshed_task._result
18 changes: 16 additions & 2 deletions tests/tests/test_database_backend.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ def test_enqueue_task(self) -> None:
result = default_task_backend.enqueue(task, (1,), {"two": 3})

self.assertEqual(result.status, ResultStatus.NEW)
self.assertIsNone(result.finished_at)
with self.assertRaisesMessage(ValueError, "Task has not finished yet"):
result.result # noqa:B018
self.assertEqual(result.task, task)
Expand All @@ -43,6 +44,7 @@ async def test_enqueue_task_async(self) -> None:
result = await default_task_backend.aenqueue(task, [], {})

self.assertEqual(result.status, ResultStatus.NEW)
self.assertIsNone(result.finished_at)
with self.assertRaisesMessage(ValueError, "Task has not finished yet"):
result.result # noqa:B018
self.assertEqual(result.task, task)
Expand Down Expand Up @@ -70,22 +72,30 @@ def test_refresh_result(self) -> None:
test_tasks.calculate_meaning_of_life, (), {}
)

DBTaskResult.objects.all().update(status=ResultStatus.COMPLETE)
DBTaskResult.objects.all().update(
status=ResultStatus.COMPLETE, finished_at=timezone.now()
)

self.assertEqual(result.status, ResultStatus.NEW)
self.assertIsNone(result.finished_at)
with self.assertNumQueries(1):
result.refresh()
self.assertIsNotNone(result.finished_at)
self.assertEqual(result.status, ResultStatus.COMPLETE)

async def test_refresh_result_async(self) -> None:
result = await default_task_backend.aenqueue(
test_tasks.calculate_meaning_of_life, (), {}
)

await DBTaskResult.objects.all().aupdate(status=ResultStatus.COMPLETE)
await DBTaskResult.objects.all().aupdate(
status=ResultStatus.COMPLETE, finished_at=timezone.now()
)

self.assertEqual(result.status, ResultStatus.NEW)
self.assertIsNone(result.finished_at)
await result.arefresh()
self.assertIsNotNone(result.finished_at)
self.assertEqual(result.status, ResultStatus.COMPLETE)

def test_get_missing_result(self) -> None:
Expand Down Expand Up @@ -162,6 +172,8 @@ def test_run_enqueued_task(self) -> None:

self.assertEqual(result.status, ResultStatus.NEW)
result.refresh()
self.assertIsNotNone(result.finished_at)
self.assertGreaterEqual(result.finished_at, result.enqueued_at)
self.assertEqual(result.status, ResultStatus.COMPLETE)

self.assertEqual(DBTaskResult.objects.ready().count(), 0)
Expand Down Expand Up @@ -223,6 +235,8 @@ def test_failing_task(self) -> None:

self.assertEqual(result.status, ResultStatus.NEW)
result.refresh()
self.assertIsNotNone(result.finished_at)
self.assertGreaterEqual(result.finished_at, result.enqueued_at) # type: ignore[arg-type]
self.assertEqual(result.status, ResultStatus.FAILED)

self.assertEqual(DBTaskResult.objects.ready().count(), 0)
Expand Down
2 changes: 2 additions & 0 deletions tests/tests/test_dummy_backend.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ def test_enqueue_task(self) -> None:
result = default_task_backend.enqueue(task, (1,), {"two": 3})

self.assertEqual(result.status, ResultStatus.NEW)
self.assertIsNone(result.finished_at)
with self.assertRaisesMessage(ValueError, "Task has not finished yet"):
result.result # noqa:B018
self.assertEqual(result.task, task)
Expand All @@ -40,6 +41,7 @@ async def test_enqueue_task_async(self) -> None:
result = await default_task_backend.aenqueue(task, (), {})

self.assertEqual(result.status, ResultStatus.NEW)
self.assertIsNone(result.finished_at)
with self.assertRaisesMessage(ValueError, "Task has not finished yet"):
result.result # noqa:B018
self.assertEqual(result.task, task)
Expand Down
6 changes: 6 additions & 0 deletions tests/tests/test_immediate_backend.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,8 @@ def test_enqueue_task(self) -> None:
result = default_task_backend.enqueue(task, (1,), {"two": 3})

self.assertEqual(result.status, ResultStatus.COMPLETE)
self.assertIsNotNone(result.finished_at)
self.assertGreaterEqual(result.finished_at, result.enqueued_at)
self.assertIsNone(result.result)
self.assertEqual(result.task, task)
self.assertEqual(result.args, [1])
Expand All @@ -35,6 +37,8 @@ async def test_enqueue_task_async(self) -> None:
result = await default_task_backend.aenqueue(task, (), {})

self.assertEqual(result.status, ResultStatus.COMPLETE)
self.assertIsNotNone(result.finished_at)
self.assertGreaterEqual(result.finished_at, result.enqueued_at)
self.assertIsNone(result.result)
self.assertEqual(result.task, task)
self.assertEqual(result.args, [])
Expand All @@ -44,6 +48,8 @@ def test_catches_exception(self) -> None:
result = default_task_backend.enqueue(test_tasks.failing_task, [], {})

self.assertEqual(result.status, ResultStatus.FAILED)
self.assertIsNotNone(result.finished_at)
self.assertGreaterEqual(result.finished_at, result.enqueued_at)
self.assertIsNone(result.result)
self.assertEqual(result.task, test_tasks.failing_task)
self.assertEqual(result.args, [])
Expand Down