Skip to content

Conversation

@vandonr-amz
Copy link
Contributor

@vandonr-amz vandonr-amz commented Dec 21, 2022

Current behavior of the Debug Executor with sensors:

  • When the sensor code gets executed the (copy of the) task is in "reschedule" mode, so the sensor releases it and reschedules it now + poke_interval in the future
    if conf.get("core", "executor") == "DebugExecutor":
    self.log.warning("DebugExecutor changes sensor mode to 'reschedule'.")
    task.mode = "reschedule"
  • The original task object stays in "poke" mode, so when the rescheduling event is received, it isn't handled properly, and the task is rescheduled for immediate execution (instead of rescheduling it int the future). This is taking place here:
    if not is_mapped and not getattr(ti.task, "reschedule", False):
    # Mapped sensors don't have the reschedule property (it can only
    # be calculated after unmapping), so we don't check them here.
    # They are handled below by checking TaskReschedule instead.
    yield self._passing_status(reason="Task is not in reschedule mode.")
    return

Because of this, the poke method is effectively called in a tight loop, hammering whichever API it's querying, eventually leading to trouble such as rate-limiting if the sensor waits long enough, which hampers debugging, the initial purpose of this executor.

In this change, I propose that we add a special case for the debug executor in the ready_to_reschedule file to always reschedule when running with the debug executor.

While testing this change, I noticed that the Executor itself also spins in a tight loop when it has no task to execute, leading to unnecessary resource usage and huge log files. With limited knowledge of how executors work, I'm proposing a poor man's fix for this here as well, where the executor would sleep for 500ms if there are no task ready to be executed.
I think this time is short enough that humans won't have to wait too long for their tasks to be picked up when ready, and also long enough that the amount of logs is manageable and can be reasonably scrolled.

@boring-cyborg boring-cyborg bot added area:core-operators area:Scheduler including HA (high availability) scheduler labels Dec 21, 2022
Co-authored-by: Tzu-ping Chung <uranusjr@gmail.com>
vandonr-amz and others added 3 commits January 3, 2023 10:21
Co-authored-by: Tzu-ping Chung <uranusjr@gmail.com>

is_mapped = isinstance(ti.task, MappedOperator)
if not is_mapped and not getattr(ti.task, "reschedule", False):
is_debug_executor = conf.get("core", "executor") == "DebugExecutor"
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I wonder if there’s a better way to obtain this information. But since this pattern is used elsewhere inthe code base, it’s probably good enough.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@o-nikolas - maybe we could also add "is_debug" as part of AIP-51 ?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah not great to pull this from a global static place, but looking through what we have accessible here, I don't see anything that could help us. And injecting this all the way through here looks like a lot of pain.

I'm pushing a commit to use the constant that's available, making this piece of code a tiny bit better I think ?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh no! This is definitely new executor coupling being added, we should not add this as it is.

I apologize but I'm missing some context for this specific line change. Why do we need to know if this is the debug executor or not? Shouldn't we just need to know that the task is being rescheduled (by any executor)?

maybe we could also add "is_debug" as part of AIP-51?

@potiuk this is covered by the single threaded case actually, which is more precisely what the issue is. Any single threaded executor needs to reschedule sensors otherwise the thread will be blocked when the sensor sleeps.

See 2c here: #27929

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@potiuk this is covered by the single threaded case actually, which is more precisely what the issue is. Any single threaded executor needs to reschedule sensors otherwise the thread will be blocked when the sensor sleeps.

Yep. You are right. Single-threaded is the right check here and we should use it here. I think being single-threaded is the only reason we are doing it.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ok, I just checked, and the SequentialExecutor has the same Behavior, in that it does not wait to reschedule, it's just slower at it so it spams less requests.
As such, it appears that checking on single threaded is indeed the way to go.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm a bit confused by DebugExecutor. Didn't we deprecate it with the attention of removing it completely in Airflow 3?
#28861 (comment)

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah it was removed in the context of the dag.test() command in this commit. But AIP-47 compliant system tests still depend on that executor, so we can't kill it just yet without finding a replacement for that.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What`s mor i think we do not possibly even want to replace it for AIP-47. Why would we? Maybe we can rename to SystemTestExcecutor but I think it does the job nicely ?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

A flag for executors that require reschedule mode has been added in #28934, I've just merged it, you can now make use of it here @vandonr-amz

@potiuk
Copy link
Member

potiuk commented Jan 19, 2023

Not sure - after all the discussions and other things happening what would be the best course of action here. @o-nikolas - maybe you can make a call (especially in relation to AIP-51. Do you think we need some more discussion / feedback/ brainstorming ?

@vandonr-amz
Copy link
Contributor Author

Not sure - after all the discussions and other things happening what would be the best course of action here.

I think the course of action is to wait for an is_single_threaded_executor to be available through the works of @utkarsharma2 , and use that instead of the current check on debug executor ?

@potiuk
Copy link
Member

potiuk commented Jan 19, 2023

Not sure - after all the discussions and other things happening what would be the best course of action here.

I think the course of action is to wait for an is_single_threaded_executor to be available through the works of @utkarsharma2 , and use that instead of the current check on debug executor ?

Yeah. that's what I thought too. wasn't sure though :)

@o-nikolas
Copy link
Contributor

Not sure - after all the discussions and other things happening what would be the best course of action here.

I think the course of action is to wait for an is_single_threaded_executor to be available through the works of @utkarsharma2 , and use that instead of the current check on debug executor ?

Yeah. that's what I thought too. wasn't sure though :)

Yupp, this is the current plan!

@vandonr-amz
Copy link
Contributor Author

AIP-51 has been merged, so I'm using it !
Should be good to go now !

Copy link
Contributor

@o-nikolas o-nikolas left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Changes look good with AIP-51 updates included 👍

@pierrejeambrun pierrejeambrun added the type:bug-fix Changelog: Bug Fixes label Feb 27, 2023
@pierrejeambrun
Copy link
Member

Marking for 2.6.0, this relies on AIP-51

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

area:Scheduler including HA (high availability) scheduler type:bug-fix Changelog: Bug Fixes

Projects

None yet

Development

Successfully merging this pull request may close these issues.

6 participants