Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
42 changes: 23 additions & 19 deletions airflow/providers/amazon/aws/example_dags/example_eks_templated.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,10 +15,9 @@
# specific language governing permissions and limitations
# under the License.

# Ignore missing args provided by default_args
# type: ignore[call-arg]
# mypy ignore arg types (for templated fields)
# type: ignore[arg-type]

import os
from datetime import datetime

from airflow.models.dag import DAG
Expand All @@ -37,62 +36,63 @@
{
"cluster_name": "templated-cluster",
"cluster_role_arn": "arn:aws:iam::123456789012:role/role_name",
"nodegroup_subnets": ["subnet-12345ab", "subnet-67890cd"],
"resources_vpc_config": {
"subnetIds": ["subnet-12345ab", "subnet-67890cd"],
"endpointPublicAccess": true,
"endpointPrivateAccess": false
},
"nodegroup_name": "templated-nodegroup",
"nodegroup_subnets": "['subnet-12345ab', 'subnet-67890cd']",
"nodegroup_role_arn": "arn:aws:iam::123456789012:role/role_name"
}
"""

with DAG(
dag_id='to-publish-manuals-templated',
default_args={'cluster_name': "{{ dag_run.conf['cluster_name'] }}"},
dag_id='example_eks_templated',
schedule_interval=None,
start_date=datetime(2021, 1, 1),
catchup=False,
max_active_runs=1,
tags=['example', 'templated'],
catchup=False,
# render_template_as_native_obj=True is what converts the Jinja to Python objects, instead of a string.
render_template_as_native_obj=True,
) as dag:
SUBNETS = os.environ.get('EKS_DEMO_SUBNETS', 'subnet-12345ab subnet-67890cd').split(' ')
VPC_CONFIG = {
'subnetIds': SUBNETS,
'endpointPublicAccess': True,
'endpointPrivateAccess': False,
}

CLUSTER_NAME = "{{ dag_run.conf['cluster_name'] }}"
NODEGROUP_NAME = "{{ dag_run.conf['nodegroup_name'] }}"

# Create an Amazon EKS Cluster control plane without attaching a compute service.
create_cluster = EksCreateClusterOperator(
task_id='create_eks_cluster',
cluster_name=CLUSTER_NAME,
compute=None,
cluster_role_arn="{{ dag_run.conf['cluster_role_arn'] }}",
resources_vpc_config=VPC_CONFIG,
resources_vpc_config="{{ dag_run.conf['resources_vpc_config'] }}",
)

await_create_cluster = EksClusterStateSensor(
task_id='wait_for_create_cluster',
cluster_name=CLUSTER_NAME,
target_state=ClusterStates.ACTIVE,
)

create_nodegroup = EksCreateNodegroupOperator(
task_id='create_eks_nodegroup',
nodegroup_name="{{ dag_run.conf['nodegroup_name'] }}",
cluster_name=CLUSTER_NAME,
nodegroup_name=NODEGROUP_NAME,
nodegroup_subnets="{{ dag_run.conf['nodegroup_subnets'] }}",
nodegroup_role_arn="{{ dag_run.conf['nodegroup_role_arn'] }}",
)

await_create_nodegroup = EksNodegroupStateSensor(
task_id='wait_for_create_nodegroup',
nodegroup_name="{{ dag_run.conf['nodegroup_name'] }}",
cluster_name=CLUSTER_NAME,
nodegroup_name=NODEGROUP_NAME,
target_state=NodegroupStates.ACTIVE,
)

start_pod = EksPodOperator(
task_id="run_pod",
cluster_name=CLUSTER_NAME,
pod_name="run_pod",
image="amazon/aws-cli:latest",
cmds=["sh", "-c", "ls"],
Expand All @@ -104,21 +104,25 @@

delete_nodegroup = EksDeleteNodegroupOperator(
task_id='delete_eks_nodegroup',
nodegroup_name="{{ dag_run.conf['nodegroup_name'] }}",
cluster_name=CLUSTER_NAME,
nodegroup_name=NODEGROUP_NAME,
)

await_delete_nodegroup = EksNodegroupStateSensor(
task_id='wait_for_delete_nodegroup',
nodegroup_name="{{ dag_run.conf['nodegroup_name'] }}",
cluster_name=CLUSTER_NAME,
nodegroup_name=NODEGROUP_NAME,
target_state=NodegroupStates.NONEXISTENT,
)

delete_cluster = EksDeleteClusterOperator(
task_id='delete_eks_cluster',
cluster_name=CLUSTER_NAME,
)

await_delete_cluster = EksClusterStateSensor(
task_id='wait_for_delete_cluster',
cluster_name=CLUSTER_NAME,
target_state=ClusterStates.NONEXISTENT,
)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,19 +43,18 @@


with DAG(
dag_id='example-create-cluster-and-fargate-all-in-one',
default_args={'cluster_name': CLUSTER_NAME},
dag_id='example_eks_with_fargate_in_one_step',
schedule_interval=None,
start_date=datetime(2021, 1, 1),
catchup=False,
max_active_runs=1,
tags=['example'],
catchup=False,
) as dag:

# [START howto_operator_eks_create_cluster_with_fargate_profile]
# Create an Amazon EKS cluster control plane and an AWS Fargate compute platform in one step.
create_cluster_and_fargate_profile = EksCreateClusterOperator(
task_id='create_eks_cluster_and_fargate_profile',
cluster_name=CLUSTER_NAME,
cluster_role_arn=ROLE_ARN,
resources_vpc_config=VPC_CONFIG,
compute='fargate',
Expand All @@ -68,13 +67,15 @@

await_create_fargate_profile = EksFargateProfileStateSensor(
task_id='wait_for_create_fargate_profile',
cluster_name=CLUSTER_NAME,
fargate_profile_name=FARGATE_PROFILE_NAME,
target_state=FargateProfileStates.ACTIVE,
)

start_pod = EksPodOperator(
task_id="run_pod",
pod_name="run_pod",
cluster_name=CLUSTER_NAME,
image="amazon/aws-cli:latest",
cmds=["sh", "-c", "echo Test Airflow; date"],
labels={"demo": "hello_world"},
Expand All @@ -86,11 +87,14 @@
# An Amazon EKS cluster can not be deleted with attached resources such as nodegroups or Fargate profiles.
# Setting the `force` to `True` will delete any attached resources before deleting the cluster.
delete_all = EksDeleteClusterOperator(
task_id='delete_fargate_profile_and_cluster', force_delete_compute=True
task_id='delete_fargate_profile_and_cluster',
cluster_name=CLUSTER_NAME,
force_delete_compute=True,
)

await_delete_cluster = EksClusterStateSensor(
task_id='wait_for_delete_cluster',
cluster_name=CLUSTER_NAME,
target_state=ClusterStates.NONEXISTENT,
)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,45 +46,50 @@


with DAG(
dag_id='example_eks_with_fargate_profile_dag',
default_args={'cluster_name': CLUSTER_NAME},
dag_id='example_eks_with_fargate_profile',
schedule_interval=None,
start_date=datetime(2021, 1, 1),
catchup=False,
max_active_runs=1,
tags=['example'],
catchup=False,
) as dag:

# Create an Amazon EKS Cluster control plane without attaching a compute service.
create_cluster = EksCreateClusterOperator(
task_id='create_eks_cluster',
cluster_name=CLUSTER_NAME,
cluster_role_arn=ROLE_ARN,
resources_vpc_config=VPC_CONFIG,
compute=None,
)

await_create_cluster = EksClusterStateSensor(
task_id='wait_for_create_cluster',
cluster_name=CLUSTER_NAME,
target_state=ClusterStates.ACTIVE,
)

# [START howto_operator_eks_create_fargate_profile]
create_fargate_profile = EksCreateFargateProfileOperator(
task_id='create_eks_fargate_profile',
cluster_name=CLUSTER_NAME,
pod_execution_role_arn=ROLE_ARN,
fargate_profile_name=FARGATE_PROFILE_NAME,
selectors=SELECTORS,
)
# [END howto_operator_eks_create_fargate_profile]

# [START howto_sensor_eks_fargate]
await_create_fargate_profile = EksFargateProfileStateSensor(
task_id='wait_for_create_fargate_profile',
cluster_name=CLUSTER_NAME,
fargate_profile_name=FARGATE_PROFILE_NAME,
target_state=FargateProfileStates.ACTIVE,
)
# [END howto_sensor_eks_fargate]

start_pod = EksPodOperator(
task_id="run_pod",
cluster_name=CLUSTER_NAME,
pod_name="run_pod",
image="amazon/aws-cli:latest",
cmds=["sh", "-c", "echo Test Airflow; date"],
Expand All @@ -97,20 +102,26 @@
# [START howto_operator_eks_delete_fargate_profile]
delete_fargate_profile = EksDeleteFargateProfileOperator(
task_id='delete_eks_fargate_profile',
cluster_name=CLUSTER_NAME,
fargate_profile_name=FARGATE_PROFILE_NAME,
)
# [END howto_operator_eks_delete_fargate_profile]

await_delete_fargate_profile = EksFargateProfileStateSensor(
task_id='wait_for_delete_fargate_profile',
cluster_name=CLUSTER_NAME,
fargate_profile_name=FARGATE_PROFILE_NAME,
target_state=FargateProfileStates.NONEXISTENT,
)

delete_cluster = EksDeleteClusterOperator(task_id='delete_eks_cluster')
delete_cluster = EksDeleteClusterOperator(
task_id='delete_eks_cluster',
cluster_name=CLUSTER_NAME,
)

await_delete_cluster = EksClusterStateSensor(
task_id='wait_for_delete_cluster',
cluster_name=CLUSTER_NAME,
target_state=ClusterStates.NONEXISTENT,
)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,19 +42,18 @@


with DAG(
dag_id='example_eks_using_defaults_dag',
default_args={'cluster_name': CLUSTER_NAME},
dag_id='example_eks_with_nodegroup_in_one_step',
schedule_interval=None,
start_date=datetime(2021, 1, 1),
catchup=False,
max_active_runs=1,
tags=['example'],
catchup=False,
) as dag:

# [START howto_operator_eks_create_cluster_with_nodegroup]
# Create an Amazon EKS cluster control plane and an EKS nodegroup compute platform in one step.
create_cluster_and_nodegroup = EksCreateClusterOperator(
task_id='create_eks_cluster_and_nodegroup',
cluster_name=CLUSTER_NAME,
nodegroup_name=NODEGROUP_NAME,
cluster_role_arn=ROLE_ARN,
nodegroup_role_arn=ROLE_ARN,
Expand All @@ -68,12 +67,14 @@

await_create_nodegroup = EksNodegroupStateSensor(
task_id='wait_for_create_nodegroup',
cluster_name=CLUSTER_NAME,
nodegroup_name=NODEGROUP_NAME,
target_state=NodegroupStates.ACTIVE,
)

start_pod = EksPodOperator(
task_id="run_pod",
cluster_name=CLUSTER_NAME,
pod_name="run_pod",
image="amazon/aws-cli:latest",
cmds=["sh", "-c", "echo Test Airflow; date"],
Expand All @@ -86,11 +87,16 @@
# [START howto_operator_eks_force_delete_cluster]
# An Amazon EKS cluster can not be deleted with attached resources such as nodegroups or Fargate profiles.
# Setting the `force` to `True` will delete any attached resources before deleting the cluster.
delete_all = EksDeleteClusterOperator(task_id='delete_nodegroup_and_cluster', force_delete_compute=True)
delete_all = EksDeleteClusterOperator(
task_id='delete_nodegroup_and_cluster',
cluster_name=CLUSTER_NAME,
force_delete_compute=True,
)
# [END howto_operator_eks_force_delete_cluster]

await_delete_cluster = EksClusterStateSensor(
task_id='wait_for_delete_cluster',
cluster_name=CLUSTER_NAME,
target_state=ClusterStates.NONEXISTENT,
)

Expand Down
Loading