Skip to content

fix: single task executor getting all tasks from Redis queue #7330

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 2 commits into
base: main
Choose a base branch
from

Conversation

wanpdsantos
Copy link
Contributor

What problem does this PR solve?

Currently, as long as there are tasks in Redis, this loop will keep getting the tasks. This will lead to a single task executor with many tasks in the pending state. Then we need to wait for the pending tasks to get them back in the queue.

In first place, if we set the MAX_CONCURRENT_TASKS to X, then only X tasks should be picked from the queue, and others should be left in the queue for other task_executors or be picked after 1 of the spots in the current executor gets free. This PR ensures this behavior.

The additional changes were due to the Ruff linting in pre-commit. But I believe these are expected to keep the coding style.

Type of change

  • Bug Fix (non-breaking change which fixes an issue)
  • New Feature (non-breaking change which adds functionality)
  • Documentation Update
  • Refactoring
  • Performance Improvement
  • Other (please describe):

@dosubot dosubot bot added size:L This PR changes 100-499 lines, ignoring generated files. 🐞 bug Something isn't working, pull request that fix bug. labels Apr 26, 2025
async with task_limiter:
nursery.start_soon(handle_task)
nursery.start_soon(limited_handle_task)
await trio.sleep(1)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good catch! However your change is smell. It creates an async task every 1 second so there may be very many limited_handle_task tasks while only 5 of them are inside handle_task.

@yuzhichang
Copy link
Member

trio.Semaphore is better here. Be careful to ensure handle_task release semaphore finally.

import trio
import random

task_limiter = trio.Semaphore(3)


async def handle_task(id: int):
    try:
        print(f"{trio.current_time()}: Worker {id} started")
        await trio.sleep(random.randint(1, 5))
        print(f"{trio.current_time()}: Worker {id} done")
    finally:
        task_limiter.release()


async def handle_tasks():
    id = 0
    async with trio.open_nursery() as nursery:
        while True:
            await task_limiter.acquire()
            nursery.start_soon(handle_task, id)
            id += 1


trio.run(handle_tasks)

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
🐞 bug Something isn't working, pull request that fix bug. size:L This PR changes 100-499 lines, ignoring generated files.
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants