Skip to content
1 change: 1 addition & 0 deletions airflow/executors/base_executor.py
Original file line number Diff line number Diff line change
Expand Up @@ -113,6 +113,7 @@ class BaseExecutor(LoggingMixin):
callback_sink: BaseCallbackSink | None = None

is_local: bool = False
change_sensor_mode_to_reschedule: bool = False

serve_logs: bool = False

Expand Down
1 change: 1 addition & 0 deletions airflow/executors/celery_kubernetes_executor.py
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ class CeleryKubernetesExecutor(LoggingMixin):
supports_ad_hoc_ti_run: bool = True
supports_pickling: bool = True
supports_sentry: bool = False
change_sensor_mode_to_reschedule: bool = False

callback_sink: BaseCallbackSink | None = None

Expand Down
1 change: 1 addition & 0 deletions airflow/executors/debug_executor.py
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ class DebugExecutor(BaseExecutor):
"""

_terminated = threading.Event()
change_sensor_mode_to_reschedule: bool = True

def __init__(self):
super().__init__()
Expand Down
1 change: 1 addition & 0 deletions airflow/executors/local_kubernetes_executor.py
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ class LocalKubernetesExecutor(LoggingMixin):
supports_ad_hoc_ti_run: bool = True
supports_pickling: bool = False
supports_sentry: bool = False
change_sensor_mode_to_reschedule: bool = False

callback_sink: BaseCallbackSink | None = None

Expand Down
7 changes: 5 additions & 2 deletions airflow/sensors/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
AirflowSensorTimeout,
AirflowSkipException,
)
from airflow.executors.executor_loader import ExecutorLoader
from airflow.models.baseoperator import BaseOperator
from airflow.models.skipmixin import SkipMixin
from airflow.models.taskreschedule import TaskReschedule
Expand Down Expand Up @@ -257,11 +258,13 @@ def _get_next_poke_interval(

def prepare_for_execution(self) -> BaseOperator:
task = super().prepare_for_execution()

# Sensors in `poke` mode can block execution of DAGs when running
# with single process executor, thus we change the mode to`reschedule`
# to allow parallel task being scheduled and executed
if conf.get("core", "executor") == "DebugExecutor":
self.log.warning("DebugExecutor changes sensor mode to 'reschedule'.")
executor, _ = ExecutorLoader.import_default_executor_cls()
if executor.change_sensor_mode_to_reschedule:
self.log.warning("%s changes sensor mode to 'reschedule'.", executor.__name__)
task.mode = "reschedule"
return task

Expand Down
61 changes: 61 additions & 0 deletions tests/sensors/test_base.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,23 @@
import time_machine

from airflow.exceptions import AirflowException, AirflowRescheduleException, AirflowSensorTimeout
from airflow.executors.celery_executor import CeleryExecutor
from airflow.executors.celery_kubernetes_executor import CeleryKubernetesExecutor
from airflow.executors.debug_executor import DebugExecutor
from airflow.executors.executor_constants import (
CELERY_EXECUTOR,
CELERY_KUBERNETES_EXECUTOR,
DASK_EXECUTOR,
DEBUG_EXECUTOR,
KUBERNETES_EXECUTOR,
LOCAL_EXECUTOR,
LOCAL_KUBERNETES_EXECUTOR,
SEQUENTIAL_EXECUTOR,
)
from airflow.executors.kubernetes_executor import KubernetesExecutor
from airflow.executors.local_executor import LocalExecutor
from airflow.executors.local_kubernetes_executor import LocalKubernetesExecutor
from airflow.executors.sequential_executor import SequentialExecutor
from airflow.models import TaskReschedule
from airflow.models.xcom import XCom
from airflow.operators.empty import EmptyOperator
Expand Down Expand Up @@ -675,6 +692,50 @@ def test_sensor_with_xcom_fails(self, make_sensor):
)
assert actual_xcom_value is None

@pytest.mark.parametrize(
"executor_cls_mode",
[
(CELERY_EXECUTOR, CeleryExecutor, "poke"),
(CELERY_KUBERNETES_EXECUTOR, CeleryKubernetesExecutor, "poke"),
(DEBUG_EXECUTOR, DebugExecutor, "reschedule"),
(KUBERNETES_EXECUTOR, KubernetesExecutor, "poke"),
(LOCAL_EXECUTOR, LocalExecutor, "poke"),
(LOCAL_KUBERNETES_EXECUTOR, LocalKubernetesExecutor, "poke"),
(SEQUENTIAL_EXECUTOR, SequentialExecutor, "poke"),
(DASK_EXECUTOR, DebugExecutor, "poke"),
],
ids=[
CELERY_EXECUTOR,
CELERY_KUBERNETES_EXECUTOR,
DEBUG_EXECUTOR,
KUBERNETES_EXECUTOR,
LOCAL_EXECUTOR,
LOCAL_KUBERNETES_EXECUTOR,
SEQUENTIAL_EXECUTOR,
DASK_EXECUTOR,
],
)
def test_prepare_for_execution(self, executor_cls_mode):
"""
Should change mode of the task to reschedule if using DEBUG_EXECUTOR
"""
executor_name, executor_cls, mode = executor_cls_mode
sensor = DummySensor(
task_id=SENSOR_OP,
return_value=None,
poke_interval=10,
timeout=60,
exponential_backoff=True,
max_wait=timedelta(seconds=30),
)
with patch("airflow.configuration.conf.get") as get, patch(
"airflow.executors.executor_loader.ExecutorLoader.load_executor"
) as load_executor:
get.return_value = executor_name
load_executor.return_value = executor_cls
task = sensor.prepare_for_execution()
assert task.mode == mode


@poke_mode_only
class DummyPokeOnlySensor(BaseSensorOperator):
Expand Down