Skip to content

Commit

Permalink
feat: ensure valid task_name before create a Routine
Browse files Browse the repository at this point in the history
  • Loading branch information
lucasgomide committed Feb 23, 2022
1 parent 885d036 commit b41110d
Show file tree
Hide file tree
Showing 4 changed files with 23 additions and 2 deletions.
3 changes: 3 additions & 0 deletions django_cloud_tasks/apps.py
Original file line number Diff line number Diff line change
Expand Up @@ -102,3 +102,6 @@ async def _get_subscriptions():
removed.append(subscribed_task)

return updated, removed

def ready(self):
from django_cloud_tasks import signals
4 changes: 2 additions & 2 deletions django_cloud_tasks/models.py
Original file line number Diff line number Diff line change
Expand Up @@ -100,10 +100,10 @@ def revert(self):
self.status = self.Statuses.ABORTED
self.save()

@property
def task(self) -> Optional[tasks.Task]:
app = apps.get_app_config("django_cloud_tasks")
tasks = app.on_demand_tasks.copy()
return tasks.get(self.task_name)
return app.get_task(name=self.task_name)


class RoutineVertex(models.Model):
Expand Down
13 changes: 13 additions & 0 deletions django_cloud_tasks/signals.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
from django.core.exceptions import ValidationError
from django.db.models.signals import pre_save
from django.dispatch import receiver

from django_cloud_tasks import models, exceptions


@receiver(pre_save, sender=models.Routine)
def ensure_valid_task_name(sender, instance, **kwargs):
try:
instance.task
except exceptions.TaskNotFound:
raise ValidationError(f"The task {instance.task_name} was not found. Make sure {instance.task_name} is properly set.")
5 changes: 5 additions & 0 deletions sample_project/sample_app/tests/tests_models.py
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,11 @@ def tests_revert_not_processed_routine(self):
self.assertEqual("aborted", routine.status)
revert_task.assert_not_called()

def tests_ensure_valid_task_name(self):
task_name = "InvalidTaskName"
with self.assertRaises(ValidationError, msg=f"The task {task_name} was not found. Make sure {task_name} is properly set."):
factories.RoutineFactory(task_name=task_name)

class PipelineModelTest(TestCase):
def tests_start_pipeline(self):
pipeline = factories.PipelineFactory()
Expand Down

0 comments on commit b41110d

Please sign in to comment.