-
Notifications
You must be signed in to change notification settings - Fork 814
Retry peeruserimport task on Database or connection errors #13821
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: develop
Are you sure you want to change the base?
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -4,6 +4,7 @@ | |
import traceback | ||
import uuid | ||
from collections import namedtuple | ||
from datetime import timedelta | ||
|
||
from kolibri.core.tasks.constants import ( # noqa F401 - imported for backwards compatibility | ||
Priority, | ||
|
@@ -101,6 +102,13 @@ def default_status_text(job): | |
|
||
ALLOWED_RETRY_IN_KWARGS = {"priority", "repeat", "interval", "retry_interval"} | ||
|
||
RETRY_ON_DELAY = timedelta( | ||
seconds=5 | ||
) # Delay before retrying a job that failed due to a retryable exception | ||
MAX_RETRIES = ( | ||
3 # Maximum number of retries for a job that failed due to a retryable exception | ||
) | ||
|
||
|
||
class Job(object): | ||
""" | ||
|
@@ -129,6 +137,7 @@ class Job(object): | |
"facility_id", | ||
"func", | ||
"long_running", | ||
"retry_on", | ||
} | ||
|
||
def to_json(self): | ||
|
@@ -169,6 +178,7 @@ def from_job(cls, job, **kwargs): | |
kwargs["track_progress"] = job.track_progress | ||
kwargs["cancellable"] = job.cancellable | ||
kwargs["long_running"] = job.long_running | ||
kwargs["retry_on"] = job.retry_on.copy() | ||
kwargs["extra_metadata"] = job.extra_metadata.copy() | ||
kwargs["facility_id"] = job.facility_id | ||
return cls(job.func, **kwargs) | ||
|
@@ -190,6 +200,7 @@ def __init__( | |
total_progress=0, | ||
result=None, | ||
long_running=False, | ||
retry_on=None, | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I feel like we don't need to store this in the job object - we're not allowing this to be customized per job, only per task - so I think we can just reference this from the task itself, rather than having to pass it in at job initialization. This also saves us having to coerce the exception classes to import paths. |
||
): | ||
""" | ||
Create a new Job that will run func given the arguments passed to Job(). If the track_progress keyword parameter | ||
|
@@ -231,6 +242,7 @@ def __init__( | |
self.kwargs = kwargs or {} | ||
self._storage = None | ||
self.func = callable_to_import_path(func) | ||
self.retry_on = [callable_to_import_path(exc) for exc in (retry_on or [])] | ||
|
||
def _check_storage_attached(self): | ||
if self._storage is None: | ||
|
@@ -362,6 +374,8 @@ def execute(self): | |
|
||
args, kwargs = copy.copy(self.args), copy.copy(self.kwargs) | ||
|
||
should_retry = False | ||
|
||
try: | ||
# First check whether the job has been cancelled | ||
self.check_for_cancel() | ||
|
@@ -370,6 +384,7 @@ def execute(self): | |
except UserCancelledError: | ||
self.storage.mark_job_as_canceled(self.job_id) | ||
except Exception as e: | ||
should_retry = self.should_retry(e) | ||
# If any error occurs, mark the job as failed and save the exception | ||
traceback_str = traceback.format_exc() | ||
e.traceback = traceback_str | ||
|
@@ -379,10 +394,26 @@ def execute(self): | |
self.storage.mark_job_as_failed(self.job_id, e, traceback_str) | ||
|
||
self.storage.reschedule_finished_job_if_needed( | ||
self.job_id, delay=self._retry_in_delay, **self._retry_in_kwargs | ||
self.job_id, | ||
delay=RETRY_ON_DELAY if should_retry else self._retry_in_delay, | ||
**self._retry_in_kwargs, | ||
) | ||
setattr(current_state_tracker, "job", None) | ||
|
||
def should_retry(self, exception): | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I think I'd rather defer all this logic to the |
||
retries = self.extra_metadata.get("retries", 0) + 1 | ||
self.extra_metadata["retries"] = retries | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I am a bit iffy about using extra_metadata for tracking this - I think if we want to hack the existing schema, 'repeat' is probably a better place for this, but I wonder if instead we should add to the job table schema to add I also think I'd rather have the retry interval defined by the task registration (we could also set a sensible default if retryable exceptions are set). |
||
self.save_meta() | ||
if retries > MAX_RETRIES: | ||
return False | ||
|
||
for retry_exception in self.retry_on: | ||
exc = import_path_to_callable(retry_exception) | ||
if isinstance(exception, exc): | ||
logger.info(f"Retrying job {self.job_id} due to exception {exception}") | ||
return True | ||
return False | ||
|
||
@property | ||
def task(self): | ||
""" | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Good job avoiding a classic Python gotcha! (passing mutable values as default arguments, such as
[]
is a very common mistake that can cause issues)