Skip to content

There is a bug in connecting to EKS using the airflow.providers.amazon.aws.operators.eks library in China. #45368

@QiaoLiar

Description

@QiaoLiar

Apache Airflow Provider(s)

amazon

Versions of Apache Airflow Providers

9.2.0

Even though I'm using the latest version, I think this bug exists in all historical versions.

Apache Airflow version

2.10.1

Operating System

Amazon Linux 2023

Deployment

Amazon (AWS) MWAA

Deployment details

You can reproduce this bug stably without any customization.

What happened

Their dag script to connect to EKS cluster using [from airflow.providers.amazon.aws.operators.eks import EksPodOperator] returned 401 .

script:

from airflow import DAG
from airflow.operators.dummy import DummyOperator
from airflow.providers.amazon.aws.operators.eks import EksPodOperator
from kubernetes.client import models as k8s

from datetime import datetime

DEFAULT_ARGS = {
    'owner': 'XXX',
}


with DAG(
        'test2_eks_pod_operator_poc',
        default_args=DEFAULT_ARGS,
        schedule_interval=None,  # trigger manually for now
        start_date=datetime(2024, 4, 28),
        catchup=False,
        tags=['examples']
) as dag:

    start = DummyOperator(task_id='start', retries=2)
    end = DummyOperator(task_id='end', retries=2)

    test2_eks_pod_operator = EksPodOperator(
        task_id='test2_eks_pod_operator',
        region='cn-north-1',
        cluster_name='eks-cluster',
        namespace='mwaa',
        service_account_name='default',
        pod_name='eks_pod_operator_poc',
        image='amazon/aws-cli:latest',
        image_pull_policy='IfNotPresent',
        node_selector={
            'type': 'app'
        },
        tolerations=[
            k8s.V1Toleration(
                effect='NoSchedule',
                key='type',
                operator='Equal',
                value='app'
            )
        ],
        cmds=['/bin/bash', '-c'],
        arguments=['echo "hello world"'],
        is_delete_operator_pod=True,
    )

    start >> test2_eks_pod_operator >> end

error log:

*** Reading remote log from Cloudwatch log_group: airflow-mwaa-Task log_stream: dag_id=test2_eks_pod_operator_poc/run_id=manual__2024-12-31T08_47_12.225701+00_00/task_id=test2_eks_pod_operator/attempt=1.log.
[2024-12-31, 08:47:15 UTC] {local_task_job_runner.py:123} ▶ Pre task execution logs
[2024-12-31, 08:47:16 UTC] {base.py:84} INFO - Retrieving connection 'aws_default'
[2024-12-31, 08:47:16 UTC] {baseoperator.py:405} WARNING - EksPodOperator.execute cannot be called outside TaskInstance!
[2024-12-31, 08:47:16 UTC] {pod.py:1133} INFO - Building pod eks-pod-operator-poc-ylyc6uh3 with labels: {'dag_id': 'test2_eks_pod_operator_poc', 'task_id': 'test2_eks_pod_operator', 'run_id': 'manual__2024-12-31T084712.2257010000-a8af56277', 'kubernetes_pod_operator': 'True', 'try_number': '1'}
[2024-12-31, 08:47:16 UTC] {base.py:84} INFO - Retrieving connection 'kubernetes_default'
[2024-12-31, 08:47:19 UTC] {taskinstance.py:3310} ERROR - Task failed with exception
Traceback (most recent call last):
  File "/usr/local/airflow/.local/lib/python3.11/site-packages/airflow/models/taskinstance.py", line 767, in _execute_task
    result = _execute_callable(context=context, **execute_callable_kwargs)
             ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/usr/local/airflow/.local/lib/python3.11/site-packages/airflow/models/taskinstance.py", line 733, in _execute_callable
    return ExecutionCallableRunner(
           ^^^^^^^^^^^^^^^^^^^^^^^^
  File "/usr/local/airflow/.local/lib/python3.11/site-packages/airflow/utils/operator_helpers.py", line 252, in run
    return self.func(*args, **kwargs)
           ^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/usr/local/airflow/.local/lib/python3.11/site-packages/airflow/models/baseoperator.py", line 406, in wrapper
    return func(self, *args, **kwargs)
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/usr/local/airflow/.local/lib/python3.11/site-packages/airflow/providers/amazon/aws/operators/eks.py", line 1103, in execute
    return super().execute(context)
           ^^^^^^^^^^^^^^^^^^^^^^^^
  File "/usr/local/airflow/.local/lib/python3.11/site-packages/airflow/models/baseoperator.py", line 406, in wrapper
    return func(self, *args, **kwargs)
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/usr/local/airflow/.local/lib/python3.11/site-packages/airflow/providers/cncf/kubernetes/operators/pod.py", line 593, in execute
    return self.execute_sync(context)
           ^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/usr/local/airflow/.local/lib/python3.11/site-packages/airflow/providers/cncf/kubernetes/operators/pod.py", line 603, in execute_sync
    self.pod = self.get_or_create_pod(  # must set `self.pod` for `on_kill`
               ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/usr/local/airflow/.local/lib/python3.11/site-packages/airflow/providers/cncf/kubernetes/operators/pod.py", line 561, in get_or_create_pod
    pod = self.find_pod(self.namespace or pod_request_obj.metadata.namespace, context=context)
          ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/usr/local/airflow/.local/lib/python3.11/site-packages/airflow/providers/cncf/kubernetes/operators/pod.py", line 534, in find_pod
    pod_list = self.client.list_namespaced_pod(
               ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/usr/local/airflow/.local/lib/python3.11/site-packages/kubernetes/client/api/core_v1_api.py", line 15823, in list_namespaced_pod
    return self.list_namespaced_pod_with_http_info(namespace, **kwargs)  # noqa: E501
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/usr/local/airflow/.local/lib/python3.11/site-packages/kubernetes/client/api/core_v1_api.py", line 15942, in list_namespaced_pod_with_http_info
    return self.api_client.call_api(
           ^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/usr/local/airflow/.local/lib/python3.11/site-packages/kubernetes/client/api_client.py", line 348, in call_api
    return self.__call_api(resource_path, method,
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/usr/local/airflow/.local/lib/python3.11/site-packages/kubernetes/client/api_client.py", line 180, in __call_api
    response_data = self.request(
                    ^^^^^^^^^^^^^
  File "/usr/local/airflow/.local/lib/python3.11/site-packages/kubernetes/client/api_client.py", line 373, in request
    return self.rest_client.GET(url,
           ^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/usr/local/airflow/.local/lib/python3.11/site-packages/kubernetes/client/rest.py", line 244, in GET
    return self.request("GET", url,
           ^^^^^^^^^^^^^^^^^^^^^^^^
  File "/usr/local/airflow/.local/lib/python3.11/site-packages/kubernetes/client/rest.py", line 238, in request
    raise ApiException(http_resp=r)
kubernetes.client.exceptions.ApiException: (401)
Reason: Unauthorized
HTTP response headers: HTTPHeaderDict({'Audit-Id': '6bab071a-ed5b-41b9-9df7-d76d7247ebcd', 'Cache-Control': 'no-cache, private', 'Content-Type': 'application/json', 'Date': 'Tue, 31 Dec 2024 08:47:19 GMT', 'Content-Length': '129'})
HTTP response body: {"kind":"Status","apiVersion":"v1","metadata":{},"status":"Failure","message":"Unauthorized","reason":"Unauthorized","code":401}


[2024-12-31, 08:47:19 UTC] {taskinstance.py:1225} INFO - Marking task as FAILED. dag_id=test2_eks_pod_operator_poc, task_id=test2_eks_pod_operator, run_id=manual__2024-12-31T08:47:12.225701+00:00, execution_date=20241231T084712, start_date=20241231T084715, end_date=20241231T084719
[2024-12-31, 08:47:19 UTC] {taskinstance.py:340} ▶ Post task execution logs

What you think should happen instead

Here's the investigation I've done:

Examining the source code for EksPodOperator shows that this class automatically generates a kube_config file if no external kube_config file is specified during initialization:

https://airflow.apache.org/docs/apache-airflow-providers-amazon/stable/_modules/airflow/providers/amazon/aws/operators/eks.html
        # There is no need to manage the kube_config file, as it will be generated automatically.
        # All Kubernetes parameters (except config_file) are also valid for the EksPodOperator.

There seems to be a problem with this auto-generated kube_config file, so I printed the contents of the file in debug and examined the source code associated with it generating the contents of the file:

[docs]    def execute(self, context: Context):
        eks_hook = EksHook(
            aws_conn_id=self.aws_conn_id,
            region_name=self.region,
        )
        with eks_hook.generate_config_file(
            eks_cluster_name=self.cluster_name, pod_namespace=self.namespace
        ) as self.config_file:
            return super().execute(context)

https://airflow.apache.org/docs/apache-airflow-providers-amazon/stable/_modules/airflow/providers/amazon/aws/hooks/eks.html

        cluster_config = {
            "apiVersion": "v1",
            "kind": "Config",
            "clusters": [
                {
                    "cluster": {"server": cluster_ep, "certificate-authority-data": cluster_cert},
                    "name": eks_cluster_name,
                }
            ],
            "contexts": [
                {
                    "context": {
                        "cluster": eks_cluster_name,
                        "namespace": pod_namespace,
                        "user": _POD_USERNAME,
                    },
                    "name": _CONTEXT_NAME,
                }
            ],
            "current-context": _CONTEXT_NAME,
            "preferences": {},
            "users": [
                {
                    "name": _POD_USERNAME,
                    "user": {
                        "exec": {
                            "apiVersion": AUTHENTICATION_API_VERSION,
                            "command": "sh",
                            "args": [
                                "-c",
                                COMMAND.format(
                                    python_executable=python_executable,
                                    eks_cluster_name=eks_cluster_name,
                                    args=args,
                                ),
                            ],
                            "interactiveMode": "Never",
                        }
                    },
                }
            ],
        }

Here it is executing a bash command, searching the COMMAND variable you can see the exact command executed as follows, you can see it is getting the eks token.

COMMAND = """
            output=$({python_executable} -m airflow.providers.amazon.aws.utils.eks_get_token \
                --cluster-name {eks_cluster_name} {args} 2>&1)

            if [ $? -ne 0 ]; then
                echo "Error running the script"
                exit 1
            fi

            expiration_timestamp=$(echo "$output" | grep -oP 'expirationTimestamp: \\K[^,]+')
            token=$(echo "$output" | grep -oP 'token: \\K[^,]+')

            json_string=$(printf '{{"kind": "ExecCredential","apiVersion": \
                "client.authentication.k8s.io/v1alpha1","spec": {{}},"status": \
                {{"expirationTimestamp": "%s","token": "%s"}}}}' "$expiration_timestamp" "$token")
            echo $json_string
            """

https://airflow.apache.org/docs/apache-airflow-providers-amazon/stable/_modules/airflow/providers/amazon/aws/utils/eks_get_token.html

[docs]def main():
    parser = get_parser()
    args = parser.parse_args()
    eks_hook = EksHook(aws_conn_id=args.aws_conn_id, region_name=args.region_name)
    access_token = eks_hook.fetch_access_token_for_cluster(args.cluster_name)
    access_token_expiration = get_expiration_time()
    print(f"expirationTimestamp: {access_token_expiration}, token: {access_token}")

access_token from eks_hook.fetch_access_token_for_cluster(args.cluster_name) Check out the implementation of the eks_hook.fetch_access_token_for_cluster method: https://airflow.apache.org/docs/apache-airflow-providers-amazon/stable/_modules/airflow/providers/amazon/aws/hooks/eks.html

def fetch_access_token_for_cluster(self, eks_cluster_name: str) -> str:
        session = self.get_session()
        service_id = self.conn.meta.service_model.service_id
        sts_url = (
            f"https://sts.{session.region_name}.amazonaws.com/?Action=GetCallerIdentity&Version=2011-06-15"
        )

The address to access STS here points to the global address, not the China STS service address. So the eks token obtained cannot be used in China. The sts_url that should be used in China is f “https://sts.{session.region_name}.amazonaws.com.cn/?Action=GetCallerIdentity&Version=2011-06-15”

How to reproduce

You can easily reproduce this using the dag script above, provided you use the identity credentials of your China AWS account.

Anything else

no

Are you willing to submit PR?

  • Yes I am willing to submit a PR!

Code of Conduct

Metadata

Metadata

Assignees

No one assigned

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions