Skip to content

Commit

Permalink
fix: ensure that the Routine retry flow is properly dispatched
Browse files Browse the repository at this point in the history
We are not able to start the retry flow of a pipeline because the Routine._diff was not being properly calculated. To fix that, I just ensure that the ModelDiffMixin.save will be triggered by changing the inheritance ordering.

This commit also improves some tests in order to ensure that the retrying flow (failure and success) is being properly tested.
  • Loading branch information
lucasgomide authored and joaodaher committed Jul 13, 2023
1 parent 8410058 commit eedbc39
Show file tree
Hide file tree
Showing 3 changed files with 42 additions and 40 deletions.
2 changes: 1 addition & 1 deletion django_cloud_tasks/models.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ def add_routine(self, routine: dict) -> "Routine":
return self.routines.create(**routine)


class Routine(models.Model, ModelDiffMixin):
class Routine(ModelDiffMixin, models.Model):
class Statuses(models.TextChoices):
PENDING = ("pending", "Pending")
SCHEDULED = ("scheduled", "Scheduled")
Expand Down
11 changes: 2 additions & 9 deletions django_cloud_tasks/tasks/routine_task.py
Original file line number Diff line number Diff line change
Expand Up @@ -56,22 +56,15 @@ def process_routine(self, routine: models.Routine):
routine.status = models.Routine.Statuses.RUNNING
routine.save(update_fields=("attempt_count", "status", "updated_at"))

# we are adding this to re-instantiate this object due to
# a bug that are happening with _diff field from ModelDiffMixin.
# the complete method called below is triggering the ensure_status_machine
# with wrong previous_status. when we call complete(), we had previous status
# scheduled, but we just changed it to running. this was raising an error:
# changing from scheduled to complete is not allowed.
routine = models.Routine(**routine._dict)

try:
logger.info(f"Routine #{routine.pk} is running")
task_response = routine.task_class(metadata=self._metadata).sync(**routine.body)
except Exception as error:
logger.info(f"Routine #{routine.pk} has failed")

routine.fail(output={"error": str(error)})
logger.info(f"Routine #{routine.pk} is being enqueued to retry")
routine.enqueue()
logger.info(f"Routine #{routine.pk} has been enqueued for retry")
return

routine.complete(output=task_response)
Expand Down
69 changes: 39 additions & 30 deletions sample_project/sample_app/tests/tests_tasks.py
Original file line number Diff line number Diff line change
Expand Up @@ -258,20 +258,28 @@ def tests_start_pipeline_revert_flow_if_exceeded_retries(self):
routine = factories.RoutineWithoutSignalFactory(
status="running",
task_name="SayHelloTask",
max_retries=1,
attempt_count=2,
max_retries=3,
attempt_count=1,
)
with patch("django_cloud_tasks.models.Pipeline.revert") as revert:
with self.assertLogs(level="INFO") as context:
RoutineExecutorTask.asap(routine_id=routine.pk)
self.assertEqual(
context.output,
[
f"INFO:root:Routine #{routine.id} has exhausted retries and is being reverted",
],
)
self.assert_routine_lock(routine_id=routine.pk)
revert.assert_called_once()
with patch("sample_app.tasks.SayHelloTask.sync", side_effect=Exception("any error")):
RoutineExecutorTask.asap(routine_id=routine.pk)
self.assertEqual(
context.output,
[
f"INFO:root:Routine #{routine.id} is running",
f"INFO:root:Routine #{routine.id} has failed",
f"INFO:root:Routine #{routine.id} is being enqueued to retry",
f"INFO:root:Routine #{routine.id} is running",
f"INFO:root:Routine #{routine.id} has failed",
f"INFO:root:Routine #{routine.id} is being enqueued to retry",
f"INFO:root:Routine #{routine.id} has exhausted retries and is being reverted",
],
)

self.assert_routine_lock(routine_id=routine.pk)
revert.assert_called_once()

def tests_store_task_output_into_routine(self):
routine = factories.RoutineWithoutSignalFactory(
Expand All @@ -294,30 +302,31 @@ def tests_store_task_output_into_routine(self):
self.assertEqual("completed", routine.status)
self.assertEqual(2, routine.attempt_count)

def tests_fail_routine_if_task_has_failed(self):
def tests_retry_and_complete_task_processing_once_failure(self):
routine = factories.RoutineWithoutSignalFactory(
status="running",
status="scheduled",
task_name="SayHelloTask",
body={"attributes": [1, 2, 3]},
attempt_count=1,
attempt_count=0,
max_retries=2,
)
with self.assertLogs(level="INFO") as context:
with patch("sample_app.tasks.SayHelloTask.sync", side_effect=Exception("any error")):
with patch("django_cloud_tasks.models.Routine.enqueue") as enqueue:
RoutineExecutorTask.sync(routine_id=routine.pk)
self.assert_routine_lock(routine_id=routine.pk)
routine.refresh_from_db()
self.assertEqual(
context.output,
[
f"INFO:root:Routine #{routine.id} is running",
f"INFO:root:Routine #{routine.id} has failed",
f"INFO:root:Routine #{routine.id} has been enqueued for retry",
],
)
self.assertEqual("failed", routine.status)
enqueue.assert_called_once()
self.assertEqual(2, routine.attempt_count)
with patch("sample_app.tasks.SayHelloTask.sync", side_effect=[Exception("any error"), "success"]):
RoutineExecutorTask.sync(routine_id=routine.pk)
self.assert_routine_lock(routine_id=routine.pk)
routine.refresh_from_db()
self.assertEqual(
context.output,
[
f"INFO:root:Routine #{routine.id} is running",
f"INFO:root:Routine #{routine.id} has failed",
f"INFO:root:Routine #{routine.id} is being enqueued to retry",
f"INFO:root:Routine #{routine.id} is running",
f"INFO:root:Routine #{routine.id} just completed",
],
)
self.assertEqual("completed", routine.status)
self.assertEqual(2, routine.attempt_count)


class SayHelloTaskTest(TestCase, tests_base.RoutineTaskTestMixin):
Expand Down

0 comments on commit eedbc39

Please sign in to comment.