-
Notifications
You must be signed in to change notification settings - Fork 16.4k
Closed
Labels
Description
Apache Airflow version
2.5.0
What happened
The EMRStepSensor is now decorated with poke_mode_only which only allows the mode to be set to poke.
However, when running this via the DebugExecutor, the poke mode is set internally to reschedule, which throws.
airflow/airflow/sensors/base.py
Lines 258 to 266 in 352d492
| def prepare_for_execution(self) -> BaseOperator: | |
| task = super().prepare_for_execution() | |
| # Sensors in `poke` mode can block execution of DAGs when running | |
| # with single process executor, thus we change the mode to`reschedule` | |
| # to allow parallel task being scheduled and executed | |
| if conf.get("core", "executor") == "DebugExecutor": | |
| self.log.warning("DebugExecutor changes sensor mode to 'reschedule'.") | |
| task.mode = "reschedule" | |
| return task |
dags/signal_ai/data_platform/articles/offline_processing_articles_action_generation.py:9: in <module>
import signal_ai.data_platform.articles.offline_processing_articles_storage_chunking as offline_processing_articles_storage_chunking
dags/signal_ai/data_platform/articles/offline_processing_articles_storage_chunking.py:198: in <module>
emr_partitioning_group = _partition_realtime_output(article_document_type, FULL_RANGE_DAG_NAME)
dags/signal_ai/data_platform/articles/offline_processing_articles_storage_chunking.py:179: in _partition_realtime_output
return emr_partition_steps(
dags/signal_ai/data_platform/library/chunks.py:403: in emr_partition_steps
wait_for_emr_step = EmrStepSensor(
.venv/lib/python3.10/site-packages/airflow/models/baseoperator.py:390: in apply_defaults
result = func(self, **kwargs, default_args=default_args)
dags/signal_ai/data_platform/library/operators/emr_operators.py:144: in __init__
self.mode = mode
.venv/lib/python3.10/site-packages/airflow/models/baseoperator.py:990: in __setattr__
super().__setattr__(key, value)
_ = <Task(EmrStepSensor): partition_realtime_output.wait_for_emr_step>, value = 'reschedule'
def mode_setter(_, value):
if value != 'poke':
> raise ValueError("cannot set mode to 'poke'.")
E ValueError: cannot set mode to 'poke'.
.venv/lib/python3.10/site-packages/airflow/sensors/base.py:364: ValueError
Additionally, the message for poke_mode_only is incorrect - the mode can only be set to poke.
What you think should happen instead
The decorator should allow reschedule when in the DebugExecutor
How to reproduce
Create an EMRStepSensor in a dag, run the dag in a DebugExecutor
Operating System
Ubuntu 20.04.5 LTS (Focal Fossa)"
Versions of Apache Airflow Providers
apache-airflow-providers-amazon==7.0.0
Deployment
Other
Deployment details
No response
Anything else
No response
Are you willing to submit PR?
- Yes I am willing to submit a PR!
Code of Conduct
- I agree to follow this project's Code of Conduct