Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
64 changes: 36 additions & 28 deletions airflow/providers/amazon/aws/operators/emr.py
Original file line number Diff line number Diff line change
Expand Up @@ -263,30 +263,34 @@ def __init__(
wait_for_completion: bool = False,
aws_conn_id: str | None = "aws_default",
# TODO: waiter_max_attempts and waiter_delay should default to None when the other two are deprecated.
waiter_max_attempts: int | None | ArgNotSet = NOTSET,
waiter_delay: int | None | ArgNotSet = NOTSET,
waiter_countdown: int = 25 * 60,
waiter_check_interval_seconds: int = 60,
waiter_max_attempts: int | None = None,
waiter_delay: int | None = None,
waiter_countdown: int | None = None,
waiter_check_interval_seconds: int | None = None,
**kwargs: Any,
):
if waiter_max_attempts is NOTSET:
if waiter_check_interval_seconds:
warnings.warn(
"The parameter waiter_countdown has been deprecated to standardize "
"naming conventions. Please use waiter_max_attempts instead. In the "
"The parameter `waiter_check_interval_seconds` has been deprecated to "
"standardize naming conventions. Please `use waiter_delay instead`. In the "
"future this will default to None and defer to the waiter's default value.",
AirflowProviderDeprecationWarning,
stacklevel=2,
)
waiter_max_attempts = waiter_countdown // waiter_check_interval_seconds
if waiter_delay is NOTSET:
else:
waiter_check_interval_seconds = 60
if waiter_countdown:
warnings.warn(
"The parameter waiter_check_interval_seconds has been deprecated to "
"standardize naming conventions. Please use waiter_delay instead. In the "
"The parameter waiter_countdown has been deprecated to standardize "
"naming conventions. Please use waiter_max_attempts instead. In the "
"future this will default to None and defer to the waiter's default value.",
AirflowProviderDeprecationWarning,
stacklevel=2,
)
waiter_delay = waiter_check_interval_seconds
# waiter_countdown defaults to never timing out, which is not supported
# by boto waiters, so we will set it here to "a very long time" for now.
waiter_max_attempts = (waiter_countdown or 999) // waiter_check_interval_seconds

super().__init__(**kwargs)
self.editor_id = editor_id
self.relative_path = relative_path
Expand All @@ -298,8 +302,8 @@ def __init__(
self.wait_for_completion = wait_for_completion
self.cluster_id = cluster_id
self.aws_conn_id = aws_conn_id
self.waiter_max_attempts = waiter_max_attempts
self.waiter_delay = waiter_delay
self.waiter_max_attempts = waiter_max_attempts or 25
self.waiter_delay = waiter_delay or waiter_check_interval_seconds or 60
self.master_instance_security_group_id = master_instance_security_group_id

def execute(self, context: Context):
Expand Down Expand Up @@ -387,36 +391,40 @@ def __init__(
wait_for_completion: bool = False,
aws_conn_id: str | None = "aws_default",
# TODO: waiter_max_attempts and waiter_delay should default to None when the other two are deprecated.
waiter_max_attempts: int | None | ArgNotSet = NOTSET,
waiter_delay: int | None | ArgNotSet = NOTSET,
waiter_countdown: int = 25 * 60,
waiter_check_interval_seconds: int = 60,
waiter_max_attempts: int | None = None,
waiter_delay: int | None = None,
waiter_countdown: int | None = None,
waiter_check_interval_seconds: int | None = None,
**kwargs: Any,
):
if waiter_max_attempts is NOTSET:
if waiter_check_interval_seconds:
warnings.warn(
"The parameter waiter_countdown has been deprecated to standardize "
"naming conventions. Please use waiter_max_attempts instead. In the "
"The parameter `waiter_check_interval_seconds` has been deprecated to "
"standardize naming conventions. Please `use waiter_delay instead`. In the "
"future this will default to None and defer to the waiter's default value.",
AirflowProviderDeprecationWarning,
stacklevel=2,
)
waiter_max_attempts = waiter_countdown // waiter_check_interval_seconds
if waiter_delay is NOTSET:
else:
waiter_check_interval_seconds = 60
if waiter_countdown:
warnings.warn(
"The parameter waiter_check_interval_seconds has been deprecated to "
"standardize naming conventions. Please use waiter_delay instead. In the "
"The parameter waiter_countdown has been deprecated to standardize "
"naming conventions. Please use waiter_max_attempts instead. In the "
"future this will default to None and defer to the waiter's default value.",
AirflowProviderDeprecationWarning,
stacklevel=2,
)
waiter_delay = waiter_check_interval_seconds
# waiter_countdown defaults to never timing out, which is not supported
# by boto waiters, so we will set it here to "a very long time" for now.
waiter_max_attempts = (waiter_countdown or 999) // waiter_check_interval_seconds

super().__init__(**kwargs)
self.notebook_execution_id = notebook_execution_id
self.wait_for_completion = wait_for_completion
self.aws_conn_id = aws_conn_id
self.waiter_max_attempts = waiter_max_attempts
self.waiter_delay = waiter_delay
self.waiter_max_attempts = waiter_max_attempts or 25
self.waiter_delay = waiter_delay or waiter_check_interval_seconds or 60

def execute(self, context: Context) -> None:
emr_hook = EmrHook(aws_conn_id=self.aws_conn_id)
Expand Down
1 change: 0 additions & 1 deletion tests/always/test_example_dags.py
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,6 @@
# Generally, these should be resolved as soon as a parameter or operator is deprecated.
# If the deprecation is postponed, the item should be added to this tuple,
# and a corresponding Issue should be created on GitHub.
"tests/system/providers/amazon/aws/example_emr_notebook_execution.py",
"tests/system/providers/google/cloud/bigquery/example_bigquery_operations.py",
"tests/system/providers/google/cloud/dataproc/example_dataproc_gke.py",
"tests/system/providers/google/cloud/gcs/example_gcs_sensor.py",
Expand Down