Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -217,7 +217,6 @@ class EksCreateClusterOperator(AwsBaseOperator[EksHook]):
"fargate_selectors",
"create_fargate_profile_kwargs",
"wait_for_completion",
"region",
)

def __init__(
Expand Down Expand Up @@ -259,18 +258,16 @@ def __init__(
self.fargate_selectors = fargate_selectors or [{"namespace": DEFAULT_NAMESPACE_NAME}]
self.fargate_profile_name = fargate_profile_name
self.deferrable = deferrable
self.region = region
super().__init__(
**kwargs,
)

if region is not None:
self.region_name = region
warnings.warn(
message="Parameter `region` will be deprecated. Use the parameter `region_name` instead",
message="Parameter `region` is deprecated. Use the parameter `region_name` instead",
category=AirflowProviderDeprecationWarning,
stacklevel=2,
)
kwargs["region_name"] = region

super().__init__(**kwargs)

def execute(self, context: Context):
if self.compute:
Expand Down Expand Up @@ -468,7 +465,6 @@ class EksCreateNodegroupOperator(AwsBaseOperator[EksHook]):
"nodegroup_name",
"create_nodegroup_kwargs",
"wait_for_completion",
"region",
)

def __init__(
Expand Down Expand Up @@ -497,16 +493,16 @@ def __init__(
self.waiter_delay = waiter_delay
self.waiter_max_attempts = waiter_max_attempts
self.deferrable = deferrable
self.region = region
super().__init__(**kwargs)

if region is not None:
self.region_name = region
warnings.warn(
message="Parameter `region` will be deprecated. Use the parameter `region_name` instead",
message="Parameter `region` is deprecated. Use the parameter `region_name` instead",
category=AirflowProviderDeprecationWarning,
stacklevel=2,
)
kwargs["region_name"] = region

super().__init__(**kwargs)

def execute(self, context: Context):
self.log.info(self.task_id)
Expand Down Expand Up @@ -599,7 +595,6 @@ class EksCreateFargateProfileOperator(AwsBaseOperator[EksHook]):
"fargate_profile_name",
"create_fargate_profile_kwargs",
"wait_for_completion",
"region",
)

def __init__(
Expand Down Expand Up @@ -628,17 +623,15 @@ def __init__(
self.waiter_max_attempts = waiter_max_attempts
self.deferrable = deferrable
self.compute = "fargate"
self.region = region
super().__init__(
**kwargs,
)

if region is not None:
self.region_name = region
warnings.warn(
message="Parameter `region` will be deprecated. Use the parameter `region_name` instead",
message="Parameter `region` is deprecated. Use the parameter `region_name` instead",
category=AirflowProviderDeprecationWarning,
stacklevel=2,
)
kwargs["region_name"] = region
super().__init__(**kwargs)

def execute(self, context: Context):
_create_compute(
Expand Down Expand Up @@ -710,7 +703,7 @@ class EksDeleteClusterOperator(AwsBaseOperator[EksHook]):

aws_hook_class = EksHook
template_fields: Sequence[str] = aws_template_fields(
"cluster_name", "force_delete_compute", "wait_for_completion", "region"
"cluster_name", "force_delete_compute", "wait_for_completion"
)

def __init__(
Expand All @@ -732,16 +725,16 @@ def __init__(
self.deferrable = deferrable
self.waiter_delay = waiter_delay
self.waiter_max_attempts = waiter_max_attempts
self.region = region
super().__init__(**kwargs)

if region is not None:
self.region_name = region
warnings.warn(
message="Parameter `region` will be deprecated. Use the parameter `region_name` instead",
message="Parameter `region` is deprecated. Use the parameter `region_name` instead",
category=AirflowProviderDeprecationWarning,
stacklevel=2,
)
kwargs["region_name"] = region

super().__init__(**kwargs)

def execute(self, context: Context):
if self.deferrable:
Expand Down Expand Up @@ -842,7 +835,7 @@ class EksDeleteNodegroupOperator(AwsBaseOperator[EksHook]):

aws_hook_class = EksHook
template_fields: Sequence[str] = aws_template_fields(
"cluster_name", "nodegroup_name", "wait_for_completion", "region"
"cluster_name", "nodegroup_name", "wait_for_completion"
)

def __init__(
Expand All @@ -862,16 +855,15 @@ def __init__(
self.waiter_delay = waiter_delay
self.waiter_max_attempts = waiter_max_attempts
self.deferrable = deferrable
self.region = region
super().__init__(**kwargs)

if region is not None:
self.region_name = region
warnings.warn(
message="Parameter `region` will be deprecated. Use the parameter `region_name` instead",
message="Parameter `region` is deprecated. Use the parameter `region_name` instead",
category=AirflowProviderDeprecationWarning,
stacklevel=2,
)
kwargs["region_name"] = region
super().__init__(**kwargs)

def execute(self, context: Context):
self.hook.delete_nodegroup(clusterName=self.cluster_name, nodegroupName=self.nodegroup_name)
Expand Down Expand Up @@ -932,7 +924,7 @@ class EksDeleteFargateProfileOperator(AwsBaseOperator[EksHook]):

aws_hook_class = EksHook
template_fields: Sequence[str] = aws_template_fields(
"cluster_name", "fargate_profile_name", "wait_for_completion", "region"
"cluster_name", "fargate_profile_name", "wait_for_completion"
)

def __init__(
Expand All @@ -946,21 +938,20 @@ def __init__(
deferrable: bool = conf.getboolean("operators", "default_deferrable", fallback=False),
**kwargs,
) -> None:
super().__init__(**kwargs)
self.cluster_name = cluster_name
self.fargate_profile_name = fargate_profile_name
self.wait_for_completion = wait_for_completion
self.waiter_delay = waiter_delay
self.waiter_max_attempts = waiter_max_attempts
self.deferrable = deferrable
self.region = region
if region is not None:
self.region_name = region
warnings.warn(
message="Parameter `region` will be deprecated. Use the parameter `region_name` instead",
message="Parameter `region` is deprecated. Use the parameter `region_name` instead",
category=AirflowProviderDeprecationWarning,
stacklevel=2,
)
kwargs["region_name"] = region
super().__init__(**kwargs)

def execute(self, context: Context):
self.hook.delete_fargate_profile(
Expand Down
32 changes: 14 additions & 18 deletions providers/amazon/src/airflow/providers/amazon/aws/sensors/eks.py
Original file line number Diff line number Diff line change
Expand Up @@ -133,7 +133,7 @@ class EksClusterStateSensor(EksBaseSensor):
https://boto3.amazonaws.com/v1/documentation/api/latest/reference/core/session.html
"""

template_fields: Sequence[str] = aws_template_fields("cluster_name", "target_state", "region")
template_fields: Sequence[str] = aws_template_fields("cluster_name", "target_state")
ui_color = "#ff9900"
ui_fgcolor = "#232F3E"

Expand All @@ -144,14 +144,14 @@ def __init__(
region: str | None = None,
**kwargs,
):
super().__init__(target_state=target_state, target_state_type=ClusterStates, **kwargs)
if region is not None:
self.region_name = region
warnings.warn(
message="Parameter `region` will be deprecated. Use the parameter `region_name` instead",
message="Parameter `region` is deprecated. Use the parameter `region_name` instead",
category=AirflowProviderDeprecationWarning,
stacklevel=2,
)
kwargs["region_name"] = region
super().__init__(target_state=target_state, target_state_type=ClusterStates, **kwargs)

def get_state(self) -> ClusterStates:
return self.hook.get_cluster_state(clusterName=self.cluster_name)
Expand Down Expand Up @@ -182,7 +182,7 @@ class EksFargateProfileStateSensor(EksBaseSensor):
"""

template_fields: Sequence[str] = aws_template_fields(
"cluster_name", "fargate_profile_name", "target_state", "region"
"cluster_name", "fargate_profile_name", "target_state"
)
ui_color = "#ff9900"
ui_fgcolor = "#232F3E"
Expand All @@ -195,16 +195,15 @@ def __init__(
target_state: FargateProfileStates = FargateProfileStates.ACTIVE,
**kwargs,
):
super().__init__(target_state=target_state, target_state_type=FargateProfileStates, **kwargs)
self.fargate_profile_name = fargate_profile_name
self.region = region
if region is not None:
self.region_name = region
warnings.warn(
message="Parameter `region` will be deprecated. Use the parameter `region_name` instead",
message="Parameter `region` is deprecated. Use the parameter `region_name` instead",
category=AirflowProviderDeprecationWarning,
stacklevel=2,
)
kwargs["region_name"] = region
super().__init__(target_state=target_state, target_state_type=FargateProfileStates, **kwargs)
self.fargate_profile_name = fargate_profile_name

def get_state(self) -> FargateProfileStates:
return self.hook.get_fargate_profile_state(
Expand Down Expand Up @@ -236,9 +235,7 @@ class EksNodegroupStateSensor(EksBaseSensor):
https://boto3.amazonaws.com/v1/documentation/api/latest/reference/core/session.html
"""

template_fields: Sequence[str] = aws_template_fields(
"cluster_name", "nodegroup_name", "target_state", "region"
)
template_fields: Sequence[str] = aws_template_fields("cluster_name", "nodegroup_name", "target_state")
ui_color = "#ff9900"
ui_fgcolor = "#232F3E"

Expand All @@ -250,16 +247,15 @@ def __init__(
region: str | None = None,
**kwargs,
):
super().__init__(target_state=target_state, target_state_type=NodegroupStates, **kwargs)
self.region = region
self.nodegroup_name = nodegroup_name
if region is not None:
self.region_name = region
warnings.warn(
message="Parameter `region` will be deprecated. Use the parameter `region_name` instead",
message="Parameter `region` is deprecated. Use the parameter `region_name` instead",
category=AirflowProviderDeprecationWarning,
stacklevel=2,
)
kwargs["region_name"] = region
super().__init__(target_state=target_state, target_state_type=NodegroupStates, **kwargs)
self.nodegroup_name = nodegroup_name

def get_state(self) -> NodegroupStates:
return self.hook.get_nodegroup_state(clusterName=self.cluster_name, nodegroupName=self.nodegroup_name)
Expand Down