Skip to content

EMR Step sensor incompatible with DebugExecutor #28861

@Limess

Description

@Limess

Apache Airflow version

2.5.0

What happened

The EMRStepSensor is now decorated with poke_mode_only which only allows the mode to be set to poke.

However, when running this via the DebugExecutor, the poke mode is set internally to reschedule, which throws.

def prepare_for_execution(self) -> BaseOperator:
task = super().prepare_for_execution()
# Sensors in `poke` mode can block execution of DAGs when running
# with single process executor, thus we change the mode to`reschedule`
# to allow parallel task being scheduled and executed
if conf.get("core", "executor") == "DebugExecutor":
self.log.warning("DebugExecutor changes sensor mode to 'reschedule'.")
task.mode = "reschedule"
return task

dags/signal_ai/data_platform/articles/offline_processing_articles_action_generation.py:9: in <module>
    import signal_ai.data_platform.articles.offline_processing_articles_storage_chunking as offline_processing_articles_storage_chunking
dags/signal_ai/data_platform/articles/offline_processing_articles_storage_chunking.py:198: in <module>
    emr_partitioning_group = _partition_realtime_output(article_document_type, FULL_RANGE_DAG_NAME)
dags/signal_ai/data_platform/articles/offline_processing_articles_storage_chunking.py:179: in _partition_realtime_output
    return emr_partition_steps(
dags/signal_ai/data_platform/library/chunks.py:403: in emr_partition_steps
    wait_for_emr_step = EmrStepSensor(
.venv/lib/python3.10/site-packages/airflow/models/baseoperator.py:390: in apply_defaults
    result = func(self, **kwargs, default_args=default_args)
dags/signal_ai/data_platform/library/operators/emr_operators.py:144: in __init__
    self.mode = mode
.venv/lib/python3.10/site-packages/airflow/models/baseoperator.py:990: in __setattr__
    super().__setattr__(key, value)

_ = <Task(EmrStepSensor): partition_realtime_output.wait_for_emr_step>, value = 'reschedule'

    def mode_setter(_, value):
        if value != 'poke':
>           raise ValueError("cannot set mode to 'poke'.")
E           ValueError: cannot set mode to 'poke'.

.venv/lib/python3.10/site-packages/airflow/sensors/base.py:364: ValueError

Additionally, the message for poke_mode_only is incorrect - the mode can only be set to poke.

What you think should happen instead

The decorator should allow reschedule when in the DebugExecutor

How to reproduce

Create an EMRStepSensor in a dag, run the dag in a DebugExecutor

Operating System

Ubuntu 20.04.5 LTS (Focal Fossa)"

Versions of Apache Airflow Providers

apache-airflow-providers-amazon==7.0.0

Deployment

Other

Deployment details

No response

Anything else

No response

Are you willing to submit PR?

  • Yes I am willing to submit a PR!

Code of Conduct

Metadata

Metadata

Assignees

No one assigned

    Labels

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions