-
Notifications
You must be signed in to change notification settings - Fork 14.2k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Adding config max_active_tasks_include_deferred #41560
base: main
Are you sure you want to change the base?
Adding config max_active_tasks_include_deferred #41560
Conversation
Hi , can you please help me , which airflow_version should i set in the migration file? |
I am not sure exact version :), so have updated to 3.0.0 version, but please let me know if this version needs to be changed. |
This should be 3.0; we are not adding new features to 2.x at this point (unless they help the 2-3 migration; this one does not). |
Sure thank you @uranusjr :). |
airflow/jobs/scheduler_job_runner.py
Outdated
concurrency_map = self.__get_concurrency_maps(states=EXECUTION_STATES, session=session) | ||
concurrency_map_deferred = self.__get_concurrency_maps(states=DEFERRED_STATES, session=session) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This should be something like
if max_active_tasks_include_deferred:
concurrency_map = self.__get_concurrency_maps(states=DEFERRED_STATES | EXECUTION_STATE, session=session)
else:
concurrency_map = self.__get_concurrency_maps(states=EXECUTION_STATES, session=session)
to save one db query.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes agree to save one db query , but
I believe max_active_tasks_include_deferred
config is accessible only inside for loop from the task_instances_to_examine
airflow/airflow/jobs/scheduler_job_runner.py
Line 435 in 42a1ae6
for task_instance in task_instances_to_examine: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@uranusjr Can i have review please, is the updates are fine :)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm not quite sure how to apply the suggested approach because max_active_tasks_include_deferred is a DAG-level setting. A user might enable this option for only a few DAGs. Therefore, if we send states=DEFERRED_STATES | EXECUTION_STATE to __get_concurrency_maps, it could retrieve all tasks which are in deferred state, even for DAGs that do not have max_active_tasks_include_deferred enabled.
I have updated __get_concurrency_maps
to check for max_active_tasks_include_deferred. i believe which is one query but extra join condition.
42a1ae6
to
57477c0
Compare
57477c0
to
6810056
Compare
…_tasks_include_deferred is set to true
ti_concurrency_query: Result = session.execute( | ||
select(TI.task_id, TI.run_id, TI.dag_id, func.count("*")) | ||
.where(TI.state.in_(states)) | ||
ti_concurrency_query: Result = ( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@uranusjr is this approach fine to run single query instead of two queries ? or any other suggestion please let me know.
Adding max_active_tasks_include_deferred.
max_active_tasks should take in account the deferred task when max_active_tasks_include_deferred=true
closes: #40528
^ Add meaningful description above
Read the Pull Request Guidelines for more information.
In case of fundamental code changes, an Airflow Improvement Proposal (AIP) is needed.
In case of a new dependency, check compliance with the ASF 3rd Party License Policy.
In case of backwards incompatible changes please leave a note in a newsfragment file, named
{pr_number}.significant.rst
or{issue_number}.significant.rst
, in newsfragments.