-
Notifications
You must be signed in to change notification settings - Fork 16.4k
Fixes to how DebugExecutor handles sensors #28528
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
Co-authored-by: Tzu-ping Chung <uranusjr@gmail.com>
This reverts commit cc508a6.
|
|
||
| is_mapped = isinstance(ti.task, MappedOperator) | ||
| if not is_mapped and not getattr(ti.task, "reschedule", False): | ||
| is_debug_executor = conf.get("core", "executor") == "DebugExecutor" |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I wonder if there’s a better way to obtain this information. But since this pattern is used elsewhere inthe code base, it’s probably good enough.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@o-nikolas - maybe we could also add "is_debug" as part of AIP-51 ?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah not great to pull this from a global static place, but looking through what we have accessible here, I don't see anything that could help us. And injecting this all the way through here looks like a lot of pain.
I'm pushing a commit to use the constant that's available, making this piece of code a tiny bit better I think ?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Oh no! This is definitely new executor coupling being added, we should not add this as it is.
I apologize but I'm missing some context for this specific line change. Why do we need to know if this is the debug executor or not? Shouldn't we just need to know that the task is being rescheduled (by any executor)?
maybe we could also add "is_debug" as part of AIP-51?
@potiuk this is covered by the single threaded case actually, which is more precisely what the issue is. Any single threaded executor needs to reschedule sensors otherwise the thread will be blocked when the sensor sleeps.
See 2c here: #27929
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@potiuk this is covered by the single threaded case actually, which is more precisely what the issue is. Any single threaded executor needs to reschedule sensors otherwise the thread will be blocked when the sensor sleeps.
Yep. You are right. Single-threaded is the right check here and we should use it here. I think being single-threaded is the only reason we are doing it.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ok, I just checked, and the SequentialExecutor has the same Behavior, in that it does not wait to reschedule, it's just slower at it so it spams less requests.
As such, it appears that checking on single threaded is indeed the way to go.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm a bit confused by DebugExecutor. Didn't we deprecate it with the attention of removing it completely in Airflow 3?
#28861 (comment)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah it was removed in the context of the dag.test() command in this commit. But AIP-47 compliant system tests still depend on that executor, so we can't kill it just yet without finding a replacement for that.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What`s mor i think we do not possibly even want to replace it for AIP-47. Why would we? Maybe we can rename to SystemTestExcecutor but I think it does the job nicely ?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
A flag for executors that require reschedule mode has been added in #28934, I've just merged it, you can now make use of it here @vandonr-amz
|
Not sure - after all the discussions and other things happening what would be the best course of action here. @o-nikolas - maybe you can make a call (especially in relation to AIP-51. Do you think we need some more discussion / feedback/ brainstorming ? |
I think the course of action is to wait for an |
Yeah. that's what I thought too. wasn't sure though :) |
Yupp, this is the current plan! |
|
AIP-51 has been merged, so I'm using it ! |
o-nikolas
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Changes look good with AIP-51 updates included 👍
|
Marking for 2.6.0, this relies on AIP-51 |
Current behavior of the Debug Executor with sensors:
now + poke_intervalin the futureairflow/airflow/sensors/base.py
Lines 263 to 265 in 13b43a7
airflow/airflow/ti_deps/deps/ready_to_reschedule.py
Lines 47 to 52 in 681835a
Because of this, the
pokemethod is effectively called in a tight loop, hammering whichever API it's querying, eventually leading to trouble such as rate-limiting if the sensor waits long enough, which hampers debugging, the initial purpose of this executor.In this change, I propose that we add a special case for the debug executor in the
ready_to_reschedulefile to always reschedule when running with the debug executor.While testing this change, I noticed that the Executor itself also spins in a tight loop when it has no task to execute, leading to unnecessary resource usage and huge log files. With limited knowledge of how executors work, I'm proposing a poor man's fix for this here as well, where the executor would sleep for 500ms if there are no task ready to be executed.
I think this time is short enough that humans won't have to wait too long for their tasks to be picked up when ready, and also long enough that the amount of logs is manageable and can be reasonably scrolled.