Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 1 addition & 2 deletions airflow/providers/amazon/aws/sensors/emr.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@
from airflow.providers.amazon.aws.hooks.emr import EmrContainerHook, EmrHook, EmrServerlessHook
from airflow.providers.amazon.aws.hooks.s3 import S3Hook
from airflow.providers.amazon.aws.links.emr import EmrLogsLink
from airflow.sensors.base import BaseSensorOperator, poke_mode_only
from airflow.sensors.base import BaseSensorOperator

if TYPE_CHECKING:
from airflow.utils.context import Context
Expand Down Expand Up @@ -451,7 +451,6 @@ def failure_message_from_response(response: dict[str, Any]) -> str | None:
return None


@poke_mode_only
class EmrStepSensor(EmrBaseSensor):
"""
Asks for the state of the step until it reaches any of the target states.
Expand Down
9 changes: 9 additions & 0 deletions docs/apache-airflow-providers-amazon/operators/emr/emr.rst
Original file line number Diff line number Diff line change
Expand Up @@ -188,6 +188,15 @@ To monitor the state of an EMR job flow you can use
Wait on an Amazon EMR step state
================================

To monitor the state of an EMR job step you can use
:class:`~airflow.providers.amazon.aws.sensors.emr.EmrStepSensor`.

.. exampleinclude:: /../../tests/system/providers/amazon/aws/example_emr.py
:language: python
:dedent: 4
:start-after: [START howto_sensor_emr_step]
:end-before: [END howto_sensor_emr_step]

Reference
---------

Expand Down
2 changes: 0 additions & 2 deletions tests/always/test_project_structure.py
Original file line number Diff line number Diff line change
Expand Up @@ -405,8 +405,6 @@ class TestAmazonProviderProjectStructure(ExampleCoverageTest):
"airflow.providers.amazon.aws.transfers.exasol_to_s3.ExasolToS3Operator",
# Glue Catalog sensor difficult to test
"airflow.providers.amazon.aws.sensors.glue_catalog_partition.GlueCatalogPartitionSensor",
# EMR Step sensor difficult to test, see: https://github.com/apache/airflow/pull/27286
"airflow.providers.amazon.aws.sensors.emr.EmrStepSensor",
}

DEPRECATED_CLASSES = {
Expand Down
16 changes: 15 additions & 1 deletion tests/system/providers/amazon/aws/example_emr.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@
EmrTerminateJobFlowOperator,
)
from airflow.providers.amazon.aws.operators.s3 import S3CreateBucketOperator, S3DeleteBucketOperator
from airflow.providers.amazon.aws.sensors.emr import EmrJobFlowSensor
from airflow.providers.amazon.aws.sensors.emr import EmrJobFlowSensor, EmrStepSensor
from airflow.utils.trigger_rule import TriggerRule
from tests.system.providers.amazon.aws.utils import ENV_ID_KEY, SystemTestContextBuilder

Expand Down Expand Up @@ -117,6 +117,11 @@ def delete_security_config(config_name: str):
)


@task
def get_step_id(step_ids: list):
return step_ids[0]


sys_test_context_task = SystemTestContextBuilder().add_variable(EXECUTION_ROLE_ARN_KEY).build()

with DAG(
Expand Down Expand Up @@ -164,6 +169,14 @@ def delete_security_config(config_name: str):
)
# [END howto_operator_emr_add_steps]

# [START howto_sensor_emr_step]
wait_for_step = EmrStepSensor(
task_id="wait_for_step",
job_flow_id=create_job_flow.output,
step_id=get_step_id(add_steps.output),
)
# [END howto_sensor_emr_step]

# [START howto_operator_emr_terminate_job_flow]
remove_cluster = EmrTerminateJobFlowOperator(
task_id="remove_cluster",
Expand Down Expand Up @@ -195,6 +208,7 @@ def delete_security_config(config_name: str):
create_job_flow,
modify_cluster,
add_steps,
wait_for_step,
# TEST TEARDOWN
remove_cluster,
check_job_flow,
Expand Down