Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,10 @@ set-tsvectors:
python contentcuration/manage.py set_channel_tsvectors
python contentcuration/manage.py set_contentnode_tsvectors --published

reconcile:
python contentcuration/manage.py reconcile_publishing_status
python contentcuration/manage.py reconcile_change_tasks

###############################################################
# END PRODUCTION COMMANDS #####################################
###############################################################
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
import logging

from django.core.management.base import BaseCommand

from contentcuration.celery import app
from contentcuration.models import Channel

logging.basicConfig()
logger = logging.getLogger("command")


class Command(BaseCommand):
"""
Reconciles publishing status of channels.
If there's no active task for a publishing channel then we reset its publishing status
to False.
"""

def handle(self, *args, **options):
from contentcuration.tasks import apply_channel_changes_task

# Channels that are in `publishing` state.
publishing_channels = list(Channel.objects.filter(deleted=False, main_tree__publishing=True).values_list("id", flat=True))

# channel_ids of tasks that are currently being run by the celery workers.
active_channel_tasks = [task["kwargs"].get("channel_id") for task in app.get_active_tasks()
if task["name"] == apply_channel_changes_task.name]

# If channel is in publishing state and doesnot have any active task,
# that means the worker has crashed. So, we reset the publishing state to False.
for channel_id in publishing_channels:
if channel_id not in active_channel_tasks:
channel = Channel.objects.get(pk=channel_id)
channel.main_tree.publishing = False
channel.main_tree.save()
logger.info(f"Resetted publishing status to False for channel {channel.id}.")
10 changes: 10 additions & 0 deletions contentcuration/contentcuration/utils/celery/app.py
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,16 @@ def get_active_and_reserved_tasks(self):
for task in tasks:
yield task

def get_active_tasks(self):
"""
Iterate over active tasks
:return: A list of dictionaries
"""
active = self.control.inspect().active() or {}
for _, tasks in active.items():
for task in tasks:
yield task

def decode_result(self, result, status=None):
"""
Decodes the celery result, like the raw result from the database, using celery tools
Expand Down
9 changes: 4 additions & 5 deletions contentcuration/contentcuration/utils/celery/tasks.py
Original file line number Diff line number Diff line change
Expand Up @@ -104,10 +104,8 @@ def my_task(self):
track_started = True
send_events = True

# ensure our tasks are restarted if they're interrupted
acks_late = True
acks_on_failure_or_timeout = True
reject_on_worker_lost = True
# Tasks are acknowledged just before they start executing
acks_late = False

@property
def TaskModel(self):
Expand Down Expand Up @@ -293,7 +291,7 @@ def revoke(self, exclude_task_ids=None, **kwargs):
task_ids = self.find_incomplete_ids(signature)

if exclude_task_ids is not None:
task_ids = task_ids.exclude(task_id__in=task_ids)
task_ids = task_ids.exclude(task_id__in=exclude_task_ids)
count = 0
for task_id in task_ids:
logging.info(f"Revoking task {task_id}")
Expand All @@ -311,6 +309,7 @@ class CeleryAsyncResult(AsyncResult):
The properties access additional properties in the same manner as super properties,
and our custom properties are added to the meta via TaskResultCustom.as_dict()
"""

def get_model(self):
"""
:return: The TaskResult model object
Expand Down