Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions contentcuration/contentcuration/decorators.py
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,10 @@ class DelayUserStorageCalculation(ContextDecorator):
def is_active(self):
return self.depth > 0

def add(self, user_id):
if user_id not in self.queue:
self.queue.append(user_id)

def __enter__(self):
self.depth += 1

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
import logging

from django.core.management.base import BaseCommand

from contentcuration.celery import app
from contentcuration.models import Change
from contentcuration.models import User

logging.basicConfig()
logger = logging.getLogger('command')


class Command(BaseCommand):
"""
Reconciles that unready tasks are marked as reserved or active according to celery control
"""

def handle(self, *args, **options):
from contentcuration.tasks import apply_channel_changes_task
from contentcuration.tasks import apply_user_changes_task

active_task_ids = []
for worker_name, tasks in app.control.inspect().active().items():
active_task_ids.extend(task['id'] for task in tasks)
for worker_name, tasks in app.control.inspect().reserved().items():
active_task_ids.extend(task['id'] for task in tasks)

channel_changes = Change.objects.filter(channel_id__isnull=False, applied=False, errored=False) \
.order_by('channel_id', 'created_by_id') \
.values('channel_id', 'created_by_id') \
.distinct()
for channel_change in channel_changes:
apply_channel_changes_task.revoke(exclude_task_ids=active_task_ids, channel_id=channel_change['channel_id'])
apply_channel_changes_task.fetch_or_enqueue(
User.objects.get(pk=channel_change['created_by_id']),
channel_id=channel_change['channel_id']
)

user_changes = Change.objects.filter(channel_id__isnull=True, user_id__isnull=False, applied=False, errored=False) \
.order_by('user_id', 'created_by_id') \
.values('user_id', 'created_by_id') \
.distinct()
for user_change in user_changes:
apply_user_changes_task.revoke(exclude_task_ids=active_task_ids, user_id=user_change['user_id'])
apply_user_changes_task.fetch_or_enqueue(
User.objects.get(pk=user_change['created_by_id']),
user_id=user_change['user_id']
)
2 changes: 2 additions & 0 deletions contentcuration/contentcuration/tests/helpers.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
from importlib import import_module

import mock
from celery import states

from contentcuration.models import TaskResult

Expand All @@ -20,6 +21,7 @@ def clear_tasks(except_task_id=None):
qs = qs.exclude(task_id=except_task_id)
for task_id in qs.values_list("task_id", flat=True):
app.control.revoke(task_id, terminate=True)
qs.update(status=states.REVOKED)


def mock_class_instance(target):
Expand Down
17 changes: 17 additions & 0 deletions contentcuration/contentcuration/tests/test_asynctask.py
Original file line number Diff line number Diff line change
Expand Up @@ -142,6 +142,9 @@ def _wait_for(self, async_result, timeout=30):
with allow_join_result():
return async_result.get(timeout=timeout)

def test_app_count_queued_tasks(self):
self.assertIsInstance(app.count_queued_tasks(), int)

def test_asynctask_reports_success(self):
"""
Tests that when an async task is created and completed, the Task object has a status of 'SUCCESS' and
Expand Down Expand Up @@ -245,3 +248,17 @@ def test_requeue_task(self):
second_result = self._wait_for(second_async_result)
self.assertIsNone(second_result)
self.assertTrue(second_async_result.successful())

def test_revoke_task(self):
channel_id = uuid.uuid4()
async_result = test_task.enqueue(self.user, channel_id=channel_id)
test_task.revoke(channel_id=channel_id)

# this should raise an exception, even though revoked, because the task is in ready state but not success
with self.assertRaises(Exception):
self._wait_for(async_result)

try:
TaskResult.objects.get(task_id=async_result.task_id, status=states.REVOKED)
except TaskResult.DoesNotExist:
self.fail('Missing revoked task result')
11 changes: 8 additions & 3 deletions contentcuration/contentcuration/tests/test_decorators.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,17 +2,22 @@

from contentcuration.decorators import delay_user_storage_calculation
from contentcuration.tests.base import StudioTestCase
from contentcuration.tests.base import testdata
from contentcuration.utils.user import calculate_user_storage


class DecoratorsTestCase(StudioTestCase):
def setUp(self):
super(DecoratorsTestCase, self).setUp()
self.user = testdata.user()

@mock.patch("contentcuration.utils.user.calculate_user_storage_task")
def test_delay_storage_calculation(self, mock_task):
@delay_user_storage_calculation
def do_test():
calculate_user_storage(self.admin_user.id)
calculate_user_storage(self.admin_user.id)
calculate_user_storage(self.user.id)
calculate_user_storage(self.user.id)
mock_task.fetch_or_enqueue.assert_not_called()

do_test()
mock_task.fetch_or_enqueue.assert_called_once_with(self.admin_user, user_id=self.admin_user.id)
mock_task.fetch_or_enqueue.assert_called_once_with(self.user, user_id=self.user.id)
9 changes: 9 additions & 0 deletions contentcuration/contentcuration/utils/celery/app.py
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,15 @@ def get_queued_tasks(self, queue_name="celery"):

return decoded_tasks

def count_queued_tasks(self, queue_name="celery"):
"""
:param queue_name: The queue name, defaults to the default "celery" queue
:return: int
"""
with self.pool.acquire(block=True) as conn:
count = conn.default_channel.client.llen(queue_name)
return count

def decode_result(self, result, status=None):
"""
Decodes the celery result, like the raw result from the database, using celery tools
Expand Down
19 changes: 19 additions & 0 deletions contentcuration/contentcuration/utils/celery/tasks.py
Original file line number Diff line number Diff line change
Expand Up @@ -246,6 +246,25 @@ def requeue(self, **kwargs):
logging.info(f"Re-queuing task {self.name} for user {task_result.user.pk} from {request.id} | {task_kwargs}")
return self.enqueue(task_result.user, **task_kwargs)

def revoke(self, exclude_task_ids=None, **kwargs):
"""
Revokes and terminates all unready tasks matching the kwargs
:param exclude_task_ids: Any task ids to exclude from this behavior
:param kwargs: Task keyword arguments that will be used to match against tasks
:return: The number of tasks revoked
"""
task_ids = self.find_incomplete_ids(**self.backend.decode(self._prepare_kwargs(kwargs)))
if exclude_task_ids is not None:
task_ids = task_ids.exclude(task_id__in=task_ids)
count = 0
for task_id in task_ids:
logging.info(f"Revoking task {task_id}")
self.app.control.revoke(task_id, terminate=True)
count += 1
# be sure the database backend has these marked appropriately
self.TaskModel.objects.filter(task_id__in=task_ids).update(status=states.REVOKED)
return count


class CeleryAsyncResult(AsyncResult):
"""
Expand Down
5 changes: 3 additions & 2 deletions contentcuration/contentcuration/utils/user.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,13 +9,14 @@ def calculate_user_storage(user_id):
from contentcuration.decorators import delay_user_storage_calculation

if delay_user_storage_calculation.is_active:
delay_user_storage_calculation.queue.append(user_id)
delay_user_storage_calculation.add(user_id)
return

try:
if user_id is None:
raise User.DoesNotExist
user = User.objects.get(pk=user_id)
calculate_user_storage_task.fetch_or_enqueue(user, user_id=user_id)
if not user.is_admin:
calculate_user_storage_task.fetch_or_enqueue(user, user_id=user_id)
except User.DoesNotExist:
logging.error("Tried to calculate user storage for user with id {} but they do not exist".format(user_id))
4 changes: 2 additions & 2 deletions contentcuration/contentcuration/views/internal.py
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@
from contentcuration.api import activate_channel
from contentcuration.api import write_file_to_storage
from contentcuration.constants import completion_criteria
from contentcuration.decorators import delay_user_storage_calculation
from contentcuration.models import AssessmentItem
from contentcuration.models import Change
from contentcuration.models import Channel
Expand All @@ -54,7 +55,6 @@
from contentcuration.utils.nodes import map_files_to_node
from contentcuration.utils.nodes import map_files_to_slideshow_slide_item
from contentcuration.utils.sentry import report_exception
from contentcuration.utils.tracing import trace
from contentcuration.viewsets.sync.constants import CHANNEL
from contentcuration.viewsets.sync.utils import generate_publish_event
from contentcuration.viewsets.sync.utils import generate_update_event
Expand Down Expand Up @@ -565,7 +565,7 @@ def __init__(self, node, errors):
super(IncompleteNodeError, self).__init__(self.message)


@trace
@delay_user_storage_calculation
def convert_data_to_nodes(user, content_data, parent_node):
""" Parse dict and create nodes accordingly """
try:
Expand Down