Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

deferred / deferrable task are not take in account by max_active_tasks #40528

Open
1 of 2 tasks
raphaelauv opened this issue Jul 1, 2024 · 5 comments · May be fixed by #41560
Open
1 of 2 tasks

deferred / deferrable task are not take in account by max_active_tasks #40528

raphaelauv opened this issue Jul 1, 2024 · 5 comments · May be fixed by #41560
Assignees
Labels
affected_version:2.9 area:async-operators AIP-40: Deferrable ("Async") Operators area:Triggerer kind:bug This is a clearly a bug

Comments

@raphaelauv
Copy link
Contributor

raphaelauv commented Jul 1, 2024

Apache Airflow version

2.9.2

What happened?

If you set max_active_tasks, it do not limit the number of concurrent deferred tasks.

It's possible to restrict the number of concurrent deferred tasks with a pool but I want limit the number of tasks run by a dag and keep the possibility for every task to use a specific pool ( we can only use 1 pool by task in airflow )

What you think should happen instead?

max_active_tasks should take in account the deferred task or we should create a new boolean setting

max_active_tasks_include_deferred like what was done for the pools

How to reproduce

from datetime import datetime, timezone, timedelta

from airflow.sensors.time_sensor import TimeSensorAsync
from pendulum import today
from airflow import DAG

with DAG(
        dag_id='example',
        schedule_interval='0 0 * * *',
        max_active_tasks=10,
        start_date=today("UTC").add(days=-1)):
    for i in range(100):
        TimeSensorAsync(task_id=f"a_{i}", target_time=(datetime.now(tz=timezone.utc) + timedelta(minutes=10)).time())

Are you willing to submit PR?

  • Yes I am willing to submit a PR!

Code of Conduct

@raphaelauv raphaelauv added area:core kind:bug This is a clearly a bug needs-triage label for new issues that we didn't triage yet labels Jul 1, 2024
@dosubot dosubot bot added the area:async-operators AIP-40: Deferrable ("Async") Operators label Jul 1, 2024
@RNHTTR RNHTTR added area:Triggerer and removed area:core needs-triage label for new issues that we didn't triage yet labels Jul 2, 2024
@gopidesupavan
Copy link
Collaborator

@raphaelauv @kaxil @RNHTTR I would like to look into this, could you please assign to me?

@RNHTTR
Copy link
Contributor

RNHTTR commented Jul 31, 2024

Go for it!

@gopidesupavan
Copy link
Collaborator

Thank you @RNHTTR assigning to me, was looking into this, My understand of this ask is. limit total deferrable tasks per dag? Thanks @raphaelauv putting this.

To my understand of the triggers and deferrable state, when the any task defers, a new record/row will be created into the trigger tables and once task state is set DEFERRABLE, then the triggerer job picks these deferred tasks from the trigger table and executes in asyncio event loop.

The max_active_task verification is done by scheduler job, so no interaction between the scheduler job and trigger job. I would think to limit number of max deferrable tasks by dag below approach.

Create new state something like DEFERRED_QUEUE, and when the capacity available(<=max deferrable tasks). then move the state to DEFERRED, and trigger job runner should consider only deferred state tasks to run.

Please suggest your thoughts :)

@RNHTTR
Copy link
Contributor

RNHTTR commented Aug 13, 2024

max_active_tasks already exists to limit the number of tasks in the running or queued state for a given DAG. the proposal here is to include tasks that are in the deferred state in this calculation.

Per @raphaelauv 's suggestion, I think there should be a new parameter max_active_tasks_include_deferred which will be False by default. When this parameter is set to True, then tasks in the deferred status should count toward the calculation for max_active_tasks.

@gopidesupavan
Copy link
Collaborator

Sure thank you for the clarification :) will come up with PR.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
affected_version:2.9 area:async-operators AIP-40: Deferrable ("Async") Operators area:Triggerer kind:bug This is a clearly a bug
Projects
None yet
Development

Successfully merging a pull request may close this issue.

4 participants