forked from openstate/open-raadsinformatie
-
Notifications
You must be signed in to change notification settings - Fork 0
/
mixins.py
30 lines (23 loc) · 1.18 KB
/
mixins.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
from ocd_backend.utils.misc import load_object
class OCDBackendTaskMixin(object):
"""
This Mixin provides a cleanup method that is called from the classes that
inherit from it: this way, we can provide cleanup behaviour that is either
executed when a task fails (for instance, when it is somewhere in the middle
of a chain), or when a task is successfully executed (for instance, when a
loader successfully inserted its data into the storage system.
It loads a `Task` that is defined by a dotted path in `sources.json`.
"""
def cleanup(self, **kwargs):
cleanup_task = load_object(self.source_definition.get('cleanup'))()
cleanup_task.delay(**kwargs)
class OCDBackendTaskFailureMixin(OCDBackendTaskMixin):
"""Add this mixin to a task that should execute `self.cleanup` when the
Task fails."""
def on_failure(self, exc, task_id, args, kwargs, einfo):
self.cleanup(**kwargs)
class OCDBackendTaskSuccessMixin(OCDBackendTaskMixin):
"""Add this mixin to a task that should execute `self.cleanup` when the
Task succeeds."""
def after_return(self, status, retval, task_id, args, kwargs, einfo):
self.cleanup(**kwargs)