Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Adding config max_active_tasks_include_deferred #41560

Open
wants to merge 9 commits into
base: main
Choose a base branch
from

Conversation

gopidesupavan
Copy link
Collaborator

Adding max_active_tasks_include_deferred.

max_active_tasks should take in account the deferred task when max_active_tasks_include_deferred=true

closes: #40528


^ Add meaningful description above
Read the Pull Request Guidelines for more information.
In case of fundamental code changes, an Airflow Improvement Proposal (AIP) is needed.
In case of a new dependency, check compliance with the ASF 3rd Party License Policy.
In case of backwards incompatible changes please leave a note in a newsfragment file, named {pr_number}.significant.rst or {issue_number}.significant.rst, in newsfragments.

@boring-cyborg boring-cyborg bot added area:API Airflow's REST/HTTP API area:CLI area:Scheduler including HA (high availability) scheduler area:serialization labels Aug 18, 2024
@gopidesupavan gopidesupavan marked this pull request as draft August 18, 2024 11:18
@gopidesupavan
Copy link
Collaborator Author

Hi , can you please help me , which airflow_version should i set in the migration file?

@gopidesupavan gopidesupavan marked this pull request as ready for review August 20, 2024 21:12
@gopidesupavan
Copy link
Collaborator Author

gopidesupavan commented Aug 20, 2024

I am not sure exact version :), so have updated to 3.0.0 version, but please let me know if this version needs to be changed.

@uranusjr
Copy link
Member

This should be 3.0; we are not adding new features to 2.x at this point (unless they help the 2-3 migration; this one does not).

@gopidesupavan
Copy link
Collaborator Author

This should be 3.0; we are not adding new features to 2.x at this point (unless they help the 2-3 migration; this one does not).

Sure thank you @uranusjr :).

Comment on lines 350 to 328
concurrency_map = self.__get_concurrency_maps(states=EXECUTION_STATES, session=session)
concurrency_map_deferred = self.__get_concurrency_maps(states=DEFERRED_STATES, session=session)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This should be something like

if max_active_tasks_include_deferred:
    concurrency_map = self.__get_concurrency_maps(states=DEFERRED_STATES | EXECUTION_STATE, session=session)
else:
    concurrency_map = self.__get_concurrency_maps(states=EXECUTION_STATES, session=session)

to save one db query.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes agree to save one db query , but
I believe max_active_tasks_include_deferred config is accessible only inside for loop from the task_instances_to_examine

for task_instance in task_instances_to_examine:
, not sure how to update your suggestion in here at top level. happy to take some suggestion , please let me know. :)

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@uranusjr Can i have review please, is the updates are fine :)

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not quite sure how to apply the suggested approach because max_active_tasks_include_deferred is a DAG-level setting. A user might enable this option for only a few DAGs. Therefore, if we send states=DEFERRED_STATES | EXECUTION_STATE to __get_concurrency_maps, it could retrieve all tasks which are in deferred state, even for DAGs that do not have max_active_tasks_include_deferred enabled.

I have updated __get_concurrency_maps to check for max_active_tasks_include_deferred. i believe which is one query but extra join condition.

@gopidesupavan gopidesupavan force-pushed the adding-config-max-active-tasks-include-deferred branch from 42a1ae6 to 57477c0 Compare September 7, 2024 08:51
@gopidesupavan gopidesupavan force-pushed the adding-config-max-active-tasks-include-deferred branch from 57477c0 to 6810056 Compare September 29, 2024 11:10
ti_concurrency_query: Result = session.execute(
select(TI.task_id, TI.run_id, TI.dag_id, func.count("*"))
.where(TI.state.in_(states))
ti_concurrency_query: Result = (
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@uranusjr is this approach fine to run single query instead of two queries ? or any other suggestion please let me know.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
area:API Airflow's REST/HTTP API area:CLI area:Scheduler including HA (high availability) scheduler area:serialization
Projects
None yet
Development

Successfully merging this pull request may close these issues.

deferred / deferrable task are not take in account by max_active_tasks
2 participants