Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions airflow/providers/amazon/aws/operators/emr.py
Original file line number Diff line number Diff line change
Expand Up @@ -830,8 +830,8 @@ def execute(self, context: Context) -> str | None:
trigger=EmrCreateJobFlowTrigger(
job_flow_id=self._job_flow_id,
aws_conn_id=self.aws_conn_id,
poll_interval=self.waiter_delay,
max_attempts=self.waiter_max_attempts,
waiter_delay=self.waiter_delay,
waiter_max_attempts=self.waiter_max_attempts,
),
method_name="execute_complete",
# timeout is set to ensure that if a trigger dies, the timeout does not restart
Expand Down
19 changes: 0 additions & 19 deletions tests/deprecations_ignore.yml
Original file line number Diff line number Diff line change
Expand Up @@ -246,25 +246,6 @@
- tests/providers/amazon/aws/operators/test_ecs.py::TestEcsRunTaskOperator::test_wait_end_tasks
- tests/providers/amazon/aws/operators/test_ecs.py::TestEcsRunTaskOperator::test_with_defer
- tests/providers/amazon/aws/operators/test_eks.py::TestEksPodOperator::test_on_finish_action_handler
- tests/providers/amazon/aws/operators/test_emr_create_job_flow.py::TestEmrCreateJobFlowOperator::test_create_job_flow_deferrable
- tests/providers/amazon/aws/operators/test_emr_create_job_flow.py::TestEmrCreateJobFlowOperator::test_execute_returns_job_id
- tests/providers/amazon/aws/operators/test_emr_create_job_flow.py::TestEmrCreateJobFlowOperator::test_execute_with_wait
- tests/providers/amazon/aws/operators/test_emr_create_job_flow.py::TestEmrCreateJobFlowOperator::test_init
- tests/providers/amazon/aws/operators/test_emr_create_job_flow.py::TestEmrCreateJobFlowOperator::test_render_template
- tests/providers/amazon/aws/operators/test_emr_create_job_flow.py::TestEmrCreateJobFlowOperator::test_render_template_from_file
- tests/providers/amazon/aws/operators/test_emr_notebook_execution.py::TestEmrStartNotebookExecutionOperator::test_start_notebook_execution_http_code_fail
- tests/providers/amazon/aws/operators/test_emr_notebook_execution.py::TestEmrStartNotebookExecutionOperator::test_start_notebook_execution_no_wait_for_completion
- tests/providers/amazon/aws/operators/test_emr_notebook_execution.py::TestEmrStartNotebookExecutionOperator::test_start_notebook_execution_wait_for_completion
- tests/providers/amazon/aws/operators/test_emr_notebook_execution.py::TestEmrStartNotebookExecutionOperator::test_start_notebook_execution_wait_for_completion_fail_state
- tests/providers/amazon/aws/operators/test_emr_notebook_execution.py::TestEmrStartNotebookExecutionOperator::test_start_notebook_execution_wait_for_completion_multiple_attempts
- tests/providers/amazon/aws/operators/test_emr_notebook_execution.py::TestStopEmrNotebookExecutionOperator::test_stop_notebook_execution
- tests/providers/amazon/aws/operators/test_emr_notebook_execution.py::TestStopEmrNotebookExecutionOperator::test_stop_notebook_execution_wait_for_completion
- tests/providers/amazon/aws/operators/test_emr_notebook_execution.py::TestStopEmrNotebookExecutionOperator::test_stop_notebook_execution_wait_for_completion_fail_state
- tests/providers/amazon/aws/operators/test_emr_notebook_execution.py::TestStopEmrNotebookExecutionOperator::test_stop_notebook_execution_wait_for_completion_multiple_attempts
- tests/providers/amazon/aws/operators/test_emr_notebook_execution.py::TestStopEmrNotebookExecutionOperator::test_stop_notebook_execution_waiter_config
- tests/providers/amazon/aws/operators/test_emr_serverless.py::TestEmrServerlessCreateApplicationOperator::test_create_application_waiter_params
- tests/providers/amazon/aws/operators/test_emr_serverless.py::TestEmrServerlessDeleteOperator::test_delete_application_waiter_params
- tests/providers/amazon/aws/operators/test_emr_serverless.py::TestEmrServerlessStartJobOperator::test_start_job_waiter_params
- tests/providers/amazon/aws/secrets/test_secrets_manager.py::TestSecretsManagerBackend::test_get_conn_value_broken_field_mode
- tests/providers/amazon/aws/secrets/test_secrets_manager.py::TestSecretsManagerBackend::test_get_conn_value_broken_field_mode_extra_words_added
- tests/providers/amazon/aws/secrets/test_secrets_manager.py::TestSecretsManagerBackend::test_get_connection_broken_field_mode_extra_allows_nested_json
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -284,22 +284,22 @@ def test_stop_notebook_execution_wait_for_completion_multiple_attempts(self, moc
@mock.patch.object(EmrHook, "conn")
def test_stop_notebook_execution_waiter_config(self, mock_conn, mock_waiter, _):
test_execution_id = "test-execution-id"
countdown = 400
waiter_max_attempts = 35
delay = 12

op = EmrStopNotebookExecutionOperator(
task_id="test-id",
notebook_execution_id=test_execution_id,
wait_for_completion=True,
waiter_countdown=countdown,
waiter_check_interval_seconds=delay,
waiter_max_attempts=waiter_max_attempts,
waiter_delay=delay,
)

op.execute(None)
mock_conn.stop_notebook_execution.assert_called_once_with(NotebookExecutionId=test_execution_id)
mock_waiter.assert_called_once_with(
mock.ANY,
NotebookExecutionId=test_execution_id,
WaiterConfig={"Delay": delay, "MaxAttempts": countdown // delay},
WaiterConfig={"Delay": delay, "MaxAttempts": waiter_max_attempts},
)
assert_expected_waiter_type(mock_waiter, "notebook_stopped")
164 changes: 115 additions & 49 deletions tests/providers/amazon/aws/operators/test_emr_serverless.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@
import pytest
from botocore.exceptions import WaiterError

from airflow.exceptions import AirflowException, TaskDeferred
from airflow.exceptions import AirflowException, AirflowProviderDeprecationWarning, TaskDeferred
from airflow.providers.amazon.aws.hooks.emr import EmrServerlessHook
from airflow.providers.amazon.aws.links.emr import (
EmrServerlessCloudWatchLogsLink,
Expand Down Expand Up @@ -336,28 +336,51 @@ def test_application_in_failure_state(self, mock_conn, mock_get_waiter):
)

@pytest.mark.parametrize(
"waiter_delay, waiter_max_attempts, waiter_countdown, waiter_check_interval_seconds, expected",
"waiter_delay, waiter_max_attempts, waiter_countdown, waiter_check_interval_seconds, expected, warning",
[
(NOTSET, NOTSET, NOTSET, NOTSET, [60, 25]),
(30, 10, NOTSET, NOTSET, [30, 10]),
(NOTSET, NOTSET, 30 * 15, 15, [15, 30]),
(10, 20, 30, 40, [10, 20]),
(NOTSET, NOTSET, NOTSET, NOTSET, [60, 25], False),
(30, 10, NOTSET, NOTSET, [30, 10], False),
(NOTSET, NOTSET, 30 * 15, 15, [15, 30], True),
(10, 20, 30, 40, [10, 20], True),
],
)
def test_create_application_waiter_params(
self, waiter_delay, waiter_max_attempts, waiter_countdown, waiter_check_interval_seconds, expected
self,
waiter_delay,
waiter_max_attempts,
waiter_countdown,
waiter_check_interval_seconds,
expected,
warning,
):
operator = EmrServerlessCreateApplicationOperator(
task_id=task_id,
release_label=release_label,
job_type=job_type,
client_request_token=client_request_token,
config=config,
waiter_delay=waiter_delay,
waiter_max_attempts=waiter_max_attempts,
waiter_countdown=waiter_countdown,
waiter_check_interval_seconds=waiter_check_interval_seconds,
)
if warning:
with pytest.warns(
AirflowProviderDeprecationWarning,
match="The parameter waiter_.* has been deprecated to standardize naming conventions. Please use waiter_.* instead. .*In the future this will default to None and defer to the waiter's default value.",
):
operator = EmrServerlessCreateApplicationOperator(
task_id=task_id,
release_label=release_label,
job_type=job_type,
client_request_token=client_request_token,
config=config,
waiter_delay=waiter_delay,
waiter_max_attempts=waiter_max_attempts,
waiter_countdown=waiter_countdown,
waiter_check_interval_seconds=waiter_check_interval_seconds,
)
else:
operator = EmrServerlessCreateApplicationOperator(
task_id=task_id,
release_label=release_label,
job_type=job_type,
client_request_token=client_request_token,
config=config,
waiter_delay=waiter_delay,
waiter_max_attempts=waiter_max_attempts,
waiter_countdown=waiter_countdown,
waiter_check_interval_seconds=waiter_check_interval_seconds,
)
assert operator.wait_for_completion is True
assert operator.waiter_delay == expected[0]
assert operator.waiter_max_attempts == expected[1]
Expand Down Expand Up @@ -755,28 +778,51 @@ def test_cancel_job_run(self, mock_conn):
)

@pytest.mark.parametrize(
"waiter_delay, waiter_max_attempts, waiter_countdown, waiter_check_interval_seconds, expected",
"waiter_delay, waiter_max_attempts, waiter_countdown, waiter_check_interval_seconds, expected, warning",
[
(NOTSET, NOTSET, NOTSET, NOTSET, [60, 25]),
(30, 10, NOTSET, NOTSET, [30, 10]),
(NOTSET, NOTSET, 30 * 15, 15, [15, 30]),
(10, 20, 30, 40, [10, 20]),
(NOTSET, NOTSET, NOTSET, NOTSET, [60, 25], False),
(30, 10, NOTSET, NOTSET, [30, 10], False),
(NOTSET, NOTSET, 30 * 15, 15, [15, 30], True),
(10, 20, 30, 40, [10, 20], True),
],
)
def test_start_job_waiter_params(
self, waiter_delay, waiter_max_attempts, waiter_countdown, waiter_check_interval_seconds, expected
self,
waiter_delay,
waiter_max_attempts,
waiter_countdown,
waiter_check_interval_seconds,
expected,
warning,
):
operator = EmrServerlessStartJobOperator(
task_id=task_id,
application_id=application_id,
execution_role_arn=execution_role_arn,
job_driver=job_driver,
configuration_overrides=configuration_overrides,
waiter_delay=waiter_delay,
waiter_max_attempts=waiter_max_attempts,
waiter_countdown=waiter_countdown,
waiter_check_interval_seconds=waiter_check_interval_seconds,
)
if warning:
with pytest.warns(
AirflowProviderDeprecationWarning,
match="The parameter waiter_.* has been deprecated to standardize naming conventions. Please use waiter_.* instead. .*In the future this will default to None and defer to the waiter's default value.",
):
operator = EmrServerlessStartJobOperator(
task_id=task_id,
application_id=application_id,
execution_role_arn=execution_role_arn,
job_driver=job_driver,
configuration_overrides=configuration_overrides,
waiter_delay=waiter_delay,
waiter_max_attempts=waiter_max_attempts,
waiter_countdown=waiter_countdown,
waiter_check_interval_seconds=waiter_check_interval_seconds,
)
else:
operator = EmrServerlessStartJobOperator(
task_id=task_id,
application_id=application_id,
execution_role_arn=execution_role_arn,
job_driver=job_driver,
configuration_overrides=configuration_overrides,
waiter_delay=waiter_delay,
waiter_max_attempts=waiter_max_attempts,
waiter_countdown=waiter_countdown,
waiter_check_interval_seconds=waiter_check_interval_seconds,
)
assert operator.wait_for_completion is True
assert operator.waiter_delay == expected[0]
assert operator.waiter_max_attempts == expected[1]
Expand Down Expand Up @@ -1209,25 +1255,45 @@ def test_delete_application_failed_deletion(self, mock_conn, mock_get_waiter):
mock_conn.delete_application.assert_called_once_with(applicationId=application_id_delete_operator)

@pytest.mark.parametrize(
"waiter_delay, waiter_max_attempts, waiter_countdown, waiter_check_interval_seconds, expected",
"waiter_delay, waiter_max_attempts, waiter_countdown, waiter_check_interval_seconds, expected, warning",
[
(NOTSET, NOTSET, NOTSET, NOTSET, [60, 25]),
(30, 10, NOTSET, NOTSET, [30, 10]),
(NOTSET, NOTSET, 30 * 15, 15, [15, 30]),
(10, 20, 30, 40, [10, 20]),
(NOTSET, NOTSET, NOTSET, NOTSET, [60, 25], False),
(30, 10, NOTSET, NOTSET, [30, 10], False),
(NOTSET, NOTSET, 30 * 15, 15, [15, 30], True),
(10, 20, 30, 40, [10, 20], True),
],
)
def test_delete_application_waiter_params(
self, waiter_delay, waiter_max_attempts, waiter_countdown, waiter_check_interval_seconds, expected
self,
waiter_delay,
waiter_max_attempts,
waiter_countdown,
waiter_check_interval_seconds,
expected,
warning,
):
operator = EmrServerlessDeleteApplicationOperator(
task_id=task_id,
application_id=application_id,
waiter_delay=waiter_delay,
waiter_max_attempts=waiter_max_attempts,
waiter_countdown=waiter_countdown,
waiter_check_interval_seconds=waiter_check_interval_seconds,
)
if warning:
with pytest.warns(
AirflowProviderDeprecationWarning,
match="The parameter waiter_.* has been deprecated to standardize naming conventions. Please use waiter_.* instead. .*In the future this will default to None and defer to the waiter's default value.",
):
operator = EmrServerlessDeleteApplicationOperator(
task_id=task_id,
application_id=application_id,
waiter_delay=waiter_delay,
waiter_max_attempts=waiter_max_attempts,
waiter_countdown=waiter_countdown,
waiter_check_interval_seconds=waiter_check_interval_seconds,
)
else:
operator = EmrServerlessDeleteApplicationOperator(
task_id=task_id,
application_id=application_id,
waiter_delay=waiter_delay,
waiter_max_attempts=waiter_max_attempts,
waiter_countdown=waiter_countdown,
waiter_check_interval_seconds=waiter_check_interval_seconds,
)
assert operator.wait_for_completion is True
assert operator.waiter_delay == expected[0]
assert operator.waiter_max_attempts == expected[1]
Expand Down