From ab8e3e36db6a6ca7c0ecf4bea6a471485869e519 Mon Sep 17 00:00:00 2001 From: lucasgomide Date: Thu, 29 Feb 2024 10:43:27 -0300 Subject: [PATCH] refactor: processing Task discard in parallel --- django_cloud_tasks/tasks/task.py | 25 +++++++++++++++---------- 1 file changed, 15 insertions(+), 10 deletions(-) diff --git a/django_cloud_tasks/tasks/task.py b/django_cloud_tasks/tasks/task.py index fd7e4a9..9b410be 100644 --- a/django_cloud_tasks/tasks/task.py +++ b/django_cloud_tasks/tasks/task.py @@ -6,7 +6,7 @@ from random import randint from typing import Any, Self from urllib.parse import urljoin - +from concurrent.futures import ThreadPoolExecutor from django.apps import apps from django.urls import reverse from django.utils.timezone import now @@ -319,18 +319,23 @@ def discard(cls, task_id: str | None = None, min_retries: int = 0): else: task_objects = client.list_tasks(queue_name=cls.queue()) - outputs = [] - for task_obj in task_objects: + def process(task_obj): task_name = task_obj.http_request.url.rsplit("/", 1)[-1] - if task_name != cls.name(): - continue - - if task_obj.dispatch_count < min_retries: - continue - task_id = task_obj.name.split("/")[-1] client.delete_task(queue_name=cls.queue(), task_name=task_id) - outputs.append(f"{task_name}/{task_id}") + return f"{task_name}/{task_id}" + + def jobs(): + for task_obj in task_objects: + task_name = task_obj.http_request.url.rsplit("/", 1)[-1] + if task_name == cls.name() and task_obj.dispatch_count >= min_retries: + yield task_obj + + pool = ThreadPoolExecutor() + + outputs = [] + for output in pool.map(process, jobs()): + outputs.append(output) return outputs @classmethod