Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions kolibri/core/auth/tasks.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,8 @@
from django.core.management import call_command
from django.core.management.base import CommandError
from django.utils import timezone
from morango.errors import MorangoError
from requests.exceptions import HTTPError
from rest_framework import serializers
from rest_framework.exceptions import AuthenticationFailed
from rest_framework.exceptions import ValidationError
Expand Down Expand Up @@ -41,6 +43,7 @@
from kolibri.core.tasks.permissions import IsAdminForJob
from kolibri.core.tasks.permissions import IsSuperAdmin
from kolibri.core.tasks.permissions import NotProvisioned
from kolibri.core.tasks.utils import DatabaseLockedError
from kolibri.core.tasks.utils import get_current_job
from kolibri.core.tasks.validation import JobValidator
from kolibri.utils.time_utils import naive_utc_datetime
Expand Down Expand Up @@ -607,6 +610,7 @@ def validate(self, data):
permission_classes=[IsSuperAdmin() | NotProvisioned()],
status_fn=status_fn,
long_running=True,
retry_on=[DatabaseLockedError, MorangoError, HTTPError],
)
def peeruserimport(command, **kwargs):
call_command(command, **kwargs)
Expand Down
3 changes: 3 additions & 0 deletions kolibri/core/tasks/decorators.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ def register_task(
permission_classes=None,
long_running=False,
status_fn=None,
retry_on=None,
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good job avoiding a classic Python gotcha! (passing mutable values as default arguments, such as [] is a very common mistake that can cause issues)

):
"""
Registers the decorated function as task.
Expand All @@ -36,6 +37,7 @@ def register_task(
permission_classes=permission_classes,
long_running=long_running,
status_fn=status_fn,
retry_on=retry_on,
)

return RegisteredTask(
Expand All @@ -49,4 +51,5 @@ def register_task(
permission_classes=permission_classes,
long_running=long_running,
status_fn=status_fn,
retry_on=retry_on,
)
33 changes: 32 additions & 1 deletion kolibri/core/tasks/job.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
import traceback
import uuid
from collections import namedtuple
from datetime import timedelta

from kolibri.core.tasks.constants import ( # noqa F401 - imported for backwards compatibility
Priority,
Expand Down Expand Up @@ -101,6 +102,13 @@ def default_status_text(job):

ALLOWED_RETRY_IN_KWARGS = {"priority", "repeat", "interval", "retry_interval"}

RETRY_ON_DELAY = timedelta(
seconds=5
) # Delay before retrying a job that failed due to a retryable exception
MAX_RETRIES = (
3 # Maximum number of retries for a job that failed due to a retryable exception
)


class Job(object):
"""
Expand Down Expand Up @@ -129,6 +137,7 @@ class Job(object):
"facility_id",
"func",
"long_running",
"retry_on",
}

def to_json(self):
Expand Down Expand Up @@ -169,6 +178,7 @@ def from_job(cls, job, **kwargs):
kwargs["track_progress"] = job.track_progress
kwargs["cancellable"] = job.cancellable
kwargs["long_running"] = job.long_running
kwargs["retry_on"] = job.retry_on.copy()
kwargs["extra_metadata"] = job.extra_metadata.copy()
kwargs["facility_id"] = job.facility_id
return cls(job.func, **kwargs)
Expand All @@ -190,6 +200,7 @@ def __init__(
total_progress=0,
result=None,
long_running=False,
retry_on=None,
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I feel like we don't need to store this in the job object - we're not allowing this to be customized per job, only per task - so I think we can just reference this from the task itself, rather than having to pass it in at job initialization. This also saves us having to coerce the exception classes to import paths.

):
"""
Create a new Job that will run func given the arguments passed to Job(). If the track_progress keyword parameter
Expand Down Expand Up @@ -231,6 +242,7 @@ def __init__(
self.kwargs = kwargs or {}
self._storage = None
self.func = callable_to_import_path(func)
self.retry_on = [callable_to_import_path(exc) for exc in (retry_on or [])]

def _check_storage_attached(self):
if self._storage is None:
Expand Down Expand Up @@ -362,6 +374,8 @@ def execute(self):

args, kwargs = copy.copy(self.args), copy.copy(self.kwargs)

should_retry = False

try:
# First check whether the job has been cancelled
self.check_for_cancel()
Expand All @@ -370,6 +384,7 @@ def execute(self):
except UserCancelledError:
self.storage.mark_job_as_canceled(self.job_id)
except Exception as e:
should_retry = self.should_retry(e)
# If any error occurs, mark the job as failed and save the exception
traceback_str = traceback.format_exc()
e.traceback = traceback_str
Expand All @@ -379,10 +394,26 @@ def execute(self):
self.storage.mark_job_as_failed(self.job_id, e, traceback_str)

self.storage.reschedule_finished_job_if_needed(
self.job_id, delay=self._retry_in_delay, **self._retry_in_kwargs
self.job_id,
delay=RETRY_ON_DELAY if should_retry else self._retry_in_delay,
**self._retry_in_kwargs,
)
setattr(current_state_tracker, "job", None)

def should_retry(self, exception):
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think I'd rather defer all this logic to the reschedule_finished_job_if_needed method on the storage class, rather than having it in the job class.

retries = self.extra_metadata.get("retries", 0) + 1
self.extra_metadata["retries"] = retries
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am a bit iffy about using extra_metadata for tracking this - I think if we want to hack the existing schema, 'repeat' is probably a better place for this, but I wonder if instead we should add to the job table schema to add error_retries so that we can put a sensible default in place for failing tasks so they don't endlessly repeat.

I also think I'd rather have the retry interval defined by the task registration (we could also set a sensible default if retryable exceptions are set).

self.save_meta()
if retries > MAX_RETRIES:
return False

for retry_exception in self.retry_on:
exc = import_path_to_callable(retry_exception)
if isinstance(exception, exc):
logger.info(f"Retrying job {self.job_id} due to exception {exception}")
return True
return False

@property
def task(self):
"""
Expand Down
14 changes: 14 additions & 0 deletions kolibri/core/tasks/registry.py
Original file line number Diff line number Diff line change
Expand Up @@ -159,6 +159,7 @@ def __init__( # noqa: C901
permission_classes=None,
long_running=False,
status_fn=None,
retry_on=None,
):
"""
:param func: Function to be wrapped as a Registered task
Expand Down Expand Up @@ -229,6 +230,7 @@ def __init__( # noqa: C901
self.track_progress = track_progress
self.long_running = long_running
self._status_fn = status_fn
self.retry_on = self._validate_retry_on(retry_on)

# Make this wrapper object look seamlessly like the wrapped function
update_wrapper(self, func)
Expand Down Expand Up @@ -258,6 +260,17 @@ def _validate_permissions_classes(self, permission_classes):
else:
yield permission_class

def _validate_retry_on(self, retry_on):
if retry_on is None:
return []

if not isinstance(retry_on, list):
raise TypeError("retry_on must be a list of exceptions")
for item in retry_on:
if not issubclass(item, Exception):
raise TypeError("Each item in retry_on must be an Exception subclass")
return retry_on

def check_job_permissions(self, user, job, view):
for permission in self.permissions:
if not permission.has_permission(user, job, view):
Expand Down Expand Up @@ -395,6 +408,7 @@ def _ready_job(self, **job_kwargs):
cancellable=job_kwargs.pop("cancellable", self.cancellable),
track_progress=job_kwargs.pop("track_progress", self.track_progress),
long_running=job_kwargs.pop("long_running", self.long_running),
retry_on=self.retry_on,
**job_kwargs
)
return job_obj
Expand Down
2 changes: 2 additions & 0 deletions kolibri/core/tasks/test/test_decorators.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ def add(x, y):
cancellable=True,
track_progress=True,
status_fn=status_fn,
retry_on=[],
)(add)

MockRegisteredTask.assert_called_once_with(
Expand All @@ -39,6 +40,7 @@ def add(x, y):
track_progress=True,
long_running=False,
status_fn=status_fn,
retry_on=[],
)

def test_register_decorator_registers_without_args(self):
Expand Down
71 changes: 71 additions & 0 deletions kolibri/core/tasks/test/test_job.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,10 +3,12 @@

import mock
from django.test.testcases import TestCase
from requests.exceptions import HTTPError

from kolibri.core.tasks.constants import Priority
from kolibri.core.tasks.exceptions import JobNotRunning
from kolibri.core.tasks.job import Job
from kolibri.core.tasks.job import MAX_RETRIES
from kolibri.core.tasks.permissions import IsSuperAdmin
from kolibri.core.tasks.registry import RegisteredTask
from kolibri.core.tasks.utils import current_state_tracker
Expand All @@ -17,6 +19,10 @@ def status_fn(job):
pass


def fn_with_http_error():
raise HTTPError("Test exception")


class JobTest(TestCase):
def setUp(self):
self.job = Job(id, track_progress=True)
Expand Down Expand Up @@ -181,6 +187,70 @@ def test_job_retry_in_all_allowable_values(self):
except Exception:
setattr(current_state_tracker, "job", None)

def test_job_retry_on_matching_exception(self):
# The task raises an HTTPError, which is in the retry_on list, so it should be rescheduled
job = Job(fn_with_http_error, retry_on=[HTTPError])
job.storage = mock.MagicMock()
setattr(current_state_tracker, "job", job)

job.execute()

job.storage.reschedule_finished_job_if_needed.assert_called_once()

# If delay was set to the reschedule call, it means it will be retried
args, kwargs = job.storage.reschedule_finished_job_if_needed.call_args
self.assertEqual(args[0], job.job_id)
self.assertEqual(job.extra_metadata.get("retries"), 1)
self.assertIsNotNone(kwargs.get("delay"))

setattr(current_state_tracker, "job", None)

def test_job_retry_on_max_retries_not_exceeded(self):
job = Job(fn_with_http_error, retry_on=[HTTPError])
job.storage = mock.MagicMock()
job.extra_metadata["retries"] = MAX_RETRIES - 1 # Still allowed to retry
setattr(current_state_tracker, "job", job)

job.execute()

job.storage.reschedule_finished_job_if_needed.assert_called_once()
args, kwargs = job.storage.reschedule_finished_job_if_needed.call_args
self.assertEqual(args[0], job.job_id)
self.assertEqual(job.extra_metadata.get("retries"), MAX_RETRIES)
self.assertIsNotNone(kwargs.get("delay"))

setattr(current_state_tracker, "job", None)

def test_job_retry_on_max_retries_exceeded(self):
job = Job(fn_with_http_error, retry_on=[HTTPError])
job.storage = mock.MagicMock()
job.extra_metadata["retries"] = MAX_RETRIES # Already reached max retries
setattr(current_state_tracker, "job", job)

job.execute()

job.storage.reschedule_finished_job_if_needed.assert_called_once()
args, kwargs = job.storage.reschedule_finished_job_if_needed.call_args
self.assertEqual(args[0], job.job_id)
self.assertIsNone(kwargs.get("delay")) # No delay means no retry

setattr(current_state_tracker, "job", None)

def test_job_retry_on_non_matching_exception(self):
# The task raises an HTTPError, which is not in the retry_on list, so it should not be rescheduled
job = Job(fn_with_http_error, retry_on=[ValueError]) # Different exception type
job.storage = mock.MagicMock()
setattr(current_state_tracker, "job", job)

job.execute()

job.storage.reschedule_finished_job_if_needed.assert_called_once()
args, kwargs = job.storage.reschedule_finished_job_if_needed.call_args
self.assertEqual(args[0], job.job_id)
self.assertIsNone(kwargs.get("delay")) # No delay means no retry

setattr(current_state_tracker, "job", None)

# Test generated by Claude 3.7 Sonnet and tweaked

def test_job_update_progress_throttles_small_updates(self):
Expand Down Expand Up @@ -402,6 +472,7 @@ def test__ready_job(self, MockJob):
track_progress=True,
long_running=True,
kwargs=dict(base=10), # kwarg that was passed to _ready_job()
retry_on=[],
)

# Do we return the job object?
Expand Down
17 changes: 17 additions & 0 deletions kolibri/core/tasks/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
from threading import Thread

import click
from django.db.utils import OperationalError
from django.utils.functional import SimpleLazyObject
from django.utils.module_loading import import_string
from sqlalchemy import create_engine
Expand Down Expand Up @@ -385,3 +386,19 @@ def fd_safe_executor(fds_per_task=2):
)

return executor(max_workers=max_workers)


class DatabaseLockedError(OperationalError):
"""
Custom exception that is only raised when the underlying error
is an OperationalError whose message contains 'database is locked'.
"""

def __init__(self, *args, **kwargs):
error_message = str(args[0]) if args else ""

if "database is locked" not in error_message.lower():
# If the condition is not met, re-raise the original error.
raise OperationalError(*args, **kwargs)

super().__init__(*args, **kwargs)
Loading