Skip to content

Commit

Permalink
refactor: processing Task discard in parallel
Browse files Browse the repository at this point in the history
  • Loading branch information
lucasgomide committed Feb 29, 2024
1 parent fbb933a commit ab8e3e3
Showing 1 changed file with 15 additions and 10 deletions.
25 changes: 15 additions & 10 deletions django_cloud_tasks/tasks/task.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
from random import randint
from typing import Any, Self
from urllib.parse import urljoin

from concurrent.futures import ThreadPoolExecutor
from django.apps import apps
from django.urls import reverse
from django.utils.timezone import now
Expand Down Expand Up @@ -319,18 +319,23 @@ def discard(cls, task_id: str | None = None, min_retries: int = 0):
else:
task_objects = client.list_tasks(queue_name=cls.queue())

outputs = []
for task_obj in task_objects:
def process(task_obj):
task_name = task_obj.http_request.url.rsplit("/", 1)[-1]
if task_name != cls.name():
continue

if task_obj.dispatch_count < min_retries:
continue

task_id = task_obj.name.split("/")[-1]
client.delete_task(queue_name=cls.queue(), task_name=task_id)
outputs.append(f"{task_name}/{task_id}")
return f"{task_name}/{task_id}"

def jobs():
for task_obj in task_objects:
task_name = task_obj.http_request.url.rsplit("/", 1)[-1]
if task_name == cls.name() and task_obj.dispatch_count >= min_retries:
yield task_obj

pool = ThreadPoolExecutor()

outputs = []
for output in pool.map(process, jobs()):
outputs.append(output)
return outputs

@classmethod
Expand Down

0 comments on commit ab8e3e3

Please sign in to comment.