-
Notifications
You must be signed in to change notification settings - Fork 16.4k
Labels
area:providersgood first issuekind:bugThis is a clearly a bugThis is a clearly a bugprovider:amazonAWS/Amazon - related issuesAWS/Amazon - related issues
Description
Apache Airflow Provider(s)
amazon
Versions of Apache Airflow Providers
9.2.0
Even though I'm using the latest version, I think this bug exists in all historical versions.
Apache Airflow version
2.10.1
Operating System
Amazon Linux 2023
Deployment
Amazon (AWS) MWAA
Deployment details
You can reproduce this bug stably without any customization.
What happened
Their dag script to connect to EKS cluster using [from airflow.providers.amazon.aws.operators.eks import EksPodOperator] returned 401 .
script:
from airflow import DAG
from airflow.operators.dummy import DummyOperator
from airflow.providers.amazon.aws.operators.eks import EksPodOperator
from kubernetes.client import models as k8s
from datetime import datetime
DEFAULT_ARGS = {
'owner': 'XXX',
}
with DAG(
'test2_eks_pod_operator_poc',
default_args=DEFAULT_ARGS,
schedule_interval=None, # trigger manually for now
start_date=datetime(2024, 4, 28),
catchup=False,
tags=['examples']
) as dag:
start = DummyOperator(task_id='start', retries=2)
end = DummyOperator(task_id='end', retries=2)
test2_eks_pod_operator = EksPodOperator(
task_id='test2_eks_pod_operator',
region='cn-north-1',
cluster_name='eks-cluster',
namespace='mwaa',
service_account_name='default',
pod_name='eks_pod_operator_poc',
image='amazon/aws-cli:latest',
image_pull_policy='IfNotPresent',
node_selector={
'type': 'app'
},
tolerations=[
k8s.V1Toleration(
effect='NoSchedule',
key='type',
operator='Equal',
value='app'
)
],
cmds=['/bin/bash', '-c'],
arguments=['echo "hello world"'],
is_delete_operator_pod=True,
)
start >> test2_eks_pod_operator >> end
error log:
*** Reading remote log from Cloudwatch log_group: airflow-mwaa-Task log_stream: dag_id=test2_eks_pod_operator_poc/run_id=manual__2024-12-31T08_47_12.225701+00_00/task_id=test2_eks_pod_operator/attempt=1.log.
[2024-12-31, 08:47:15 UTC] {local_task_job_runner.py:123} ▶ Pre task execution logs
[2024-12-31, 08:47:16 UTC] {base.py:84} INFO - Retrieving connection 'aws_default'
[2024-12-31, 08:47:16 UTC] {baseoperator.py:405} WARNING - EksPodOperator.execute cannot be called outside TaskInstance!
[2024-12-31, 08:47:16 UTC] {pod.py:1133} INFO - Building pod eks-pod-operator-poc-ylyc6uh3 with labels: {'dag_id': 'test2_eks_pod_operator_poc', 'task_id': 'test2_eks_pod_operator', 'run_id': 'manual__2024-12-31T084712.2257010000-a8af56277', 'kubernetes_pod_operator': 'True', 'try_number': '1'}
[2024-12-31, 08:47:16 UTC] {base.py:84} INFO - Retrieving connection 'kubernetes_default'
[2024-12-31, 08:47:19 UTC] {taskinstance.py:3310} ERROR - Task failed with exception
Traceback (most recent call last):
File "/usr/local/airflow/.local/lib/python3.11/site-packages/airflow/models/taskinstance.py", line 767, in _execute_task
result = _execute_callable(context=context, **execute_callable_kwargs)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/usr/local/airflow/.local/lib/python3.11/site-packages/airflow/models/taskinstance.py", line 733, in _execute_callable
return ExecutionCallableRunner(
^^^^^^^^^^^^^^^^^^^^^^^^
File "/usr/local/airflow/.local/lib/python3.11/site-packages/airflow/utils/operator_helpers.py", line 252, in run
return self.func(*args, **kwargs)
^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/usr/local/airflow/.local/lib/python3.11/site-packages/airflow/models/baseoperator.py", line 406, in wrapper
return func(self, *args, **kwargs)
^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/usr/local/airflow/.local/lib/python3.11/site-packages/airflow/providers/amazon/aws/operators/eks.py", line 1103, in execute
return super().execute(context)
^^^^^^^^^^^^^^^^^^^^^^^^
File "/usr/local/airflow/.local/lib/python3.11/site-packages/airflow/models/baseoperator.py", line 406, in wrapper
return func(self, *args, **kwargs)
^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/usr/local/airflow/.local/lib/python3.11/site-packages/airflow/providers/cncf/kubernetes/operators/pod.py", line 593, in execute
return self.execute_sync(context)
^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/usr/local/airflow/.local/lib/python3.11/site-packages/airflow/providers/cncf/kubernetes/operators/pod.py", line 603, in execute_sync
self.pod = self.get_or_create_pod( # must set `self.pod` for `on_kill`
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/usr/local/airflow/.local/lib/python3.11/site-packages/airflow/providers/cncf/kubernetes/operators/pod.py", line 561, in get_or_create_pod
pod = self.find_pod(self.namespace or pod_request_obj.metadata.namespace, context=context)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/usr/local/airflow/.local/lib/python3.11/site-packages/airflow/providers/cncf/kubernetes/operators/pod.py", line 534, in find_pod
pod_list = self.client.list_namespaced_pod(
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/usr/local/airflow/.local/lib/python3.11/site-packages/kubernetes/client/api/core_v1_api.py", line 15823, in list_namespaced_pod
return self.list_namespaced_pod_with_http_info(namespace, **kwargs) # noqa: E501
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/usr/local/airflow/.local/lib/python3.11/site-packages/kubernetes/client/api/core_v1_api.py", line 15942, in list_namespaced_pod_with_http_info
return self.api_client.call_api(
^^^^^^^^^^^^^^^^^^^^^^^^^
File "/usr/local/airflow/.local/lib/python3.11/site-packages/kubernetes/client/api_client.py", line 348, in call_api
return self.__call_api(resource_path, method,
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/usr/local/airflow/.local/lib/python3.11/site-packages/kubernetes/client/api_client.py", line 180, in __call_api
response_data = self.request(
^^^^^^^^^^^^^
File "/usr/local/airflow/.local/lib/python3.11/site-packages/kubernetes/client/api_client.py", line 373, in request
return self.rest_client.GET(url,
^^^^^^^^^^^^^^^^^^^^^^^^^
File "/usr/local/airflow/.local/lib/python3.11/site-packages/kubernetes/client/rest.py", line 244, in GET
return self.request("GET", url,
^^^^^^^^^^^^^^^^^^^^^^^^
File "/usr/local/airflow/.local/lib/python3.11/site-packages/kubernetes/client/rest.py", line 238, in request
raise ApiException(http_resp=r)
kubernetes.client.exceptions.ApiException: (401)
Reason: Unauthorized
HTTP response headers: HTTPHeaderDict({'Audit-Id': '6bab071a-ed5b-41b9-9df7-d76d7247ebcd', 'Cache-Control': 'no-cache, private', 'Content-Type': 'application/json', 'Date': 'Tue, 31 Dec 2024 08:47:19 GMT', 'Content-Length': '129'})
HTTP response body: {"kind":"Status","apiVersion":"v1","metadata":{},"status":"Failure","message":"Unauthorized","reason":"Unauthorized","code":401}
[2024-12-31, 08:47:19 UTC] {taskinstance.py:1225} INFO - Marking task as FAILED. dag_id=test2_eks_pod_operator_poc, task_id=test2_eks_pod_operator, run_id=manual__2024-12-31T08:47:12.225701+00:00, execution_date=20241231T084712, start_date=20241231T084715, end_date=20241231T084719
[2024-12-31, 08:47:19 UTC] {taskinstance.py:340} ▶ Post task execution logs
What you think should happen instead
Here's the investigation I've done:
Examining the source code for EksPodOperator shows that this class automatically generates a kube_config file if no external kube_config file is specified during initialization:
https://airflow.apache.org/docs/apache-airflow-providers-amazon/stable/_modules/airflow/providers/amazon/aws/operators/eks.html
# There is no need to manage the kube_config file, as it will be generated automatically.
# All Kubernetes parameters (except config_file) are also valid for the EksPodOperator.
There seems to be a problem with this auto-generated kube_config file, so I printed the contents of the file in debug and examined the source code associated with it generating the contents of the file:
[docs] def execute(self, context: Context):
eks_hook = EksHook(
aws_conn_id=self.aws_conn_id,
region_name=self.region,
)
with eks_hook.generate_config_file(
eks_cluster_name=self.cluster_name, pod_namespace=self.namespace
) as self.config_file:
return super().execute(context)
https://airflow.apache.org/docs/apache-airflow-providers-amazon/stable/_modules/airflow/providers/amazon/aws/hooks/eks.html
cluster_config = {
"apiVersion": "v1",
"kind": "Config",
"clusters": [
{
"cluster": {"server": cluster_ep, "certificate-authority-data": cluster_cert},
"name": eks_cluster_name,
}
],
"contexts": [
{
"context": {
"cluster": eks_cluster_name,
"namespace": pod_namespace,
"user": _POD_USERNAME,
},
"name": _CONTEXT_NAME,
}
],
"current-context": _CONTEXT_NAME,
"preferences": {},
"users": [
{
"name": _POD_USERNAME,
"user": {
"exec": {
"apiVersion": AUTHENTICATION_API_VERSION,
"command": "sh",
"args": [
"-c",
COMMAND.format(
python_executable=python_executable,
eks_cluster_name=eks_cluster_name,
args=args,
),
],
"interactiveMode": "Never",
}
},
}
],
}
Here it is executing a bash command, searching the COMMAND variable you can see the exact command executed as follows, you can see it is getting the eks token.
COMMAND = """
output=$({python_executable} -m airflow.providers.amazon.aws.utils.eks_get_token \
--cluster-name {eks_cluster_name} {args} 2>&1)
if [ $? -ne 0 ]; then
echo "Error running the script"
exit 1
fi
expiration_timestamp=$(echo "$output" | grep -oP 'expirationTimestamp: \\K[^,]+')
token=$(echo "$output" | grep -oP 'token: \\K[^,]+')
json_string=$(printf '{{"kind": "ExecCredential","apiVersion": \
"client.authentication.k8s.io/v1alpha1","spec": {{}},"status": \
{{"expirationTimestamp": "%s","token": "%s"}}}}' "$expiration_timestamp" "$token")
echo $json_string
"""
https://airflow.apache.org/docs/apache-airflow-providers-amazon/stable/_modules/airflow/providers/amazon/aws/utils/eks_get_token.html
[docs]def main():
parser = get_parser()
args = parser.parse_args()
eks_hook = EksHook(aws_conn_id=args.aws_conn_id, region_name=args.region_name)
access_token = eks_hook.fetch_access_token_for_cluster(args.cluster_name)
access_token_expiration = get_expiration_time()
print(f"expirationTimestamp: {access_token_expiration}, token: {access_token}")
access_token from eks_hook.fetch_access_token_for_cluster(args.cluster_name) Check out the implementation of the eks_hook.fetch_access_token_for_cluster method: https://airflow.apache.org/docs/apache-airflow-providers-amazon/stable/_modules/airflow/providers/amazon/aws/hooks/eks.html
def fetch_access_token_for_cluster(self, eks_cluster_name: str) -> str:
session = self.get_session()
service_id = self.conn.meta.service_model.service_id
sts_url = (
f"https://sts.{session.region_name}.amazonaws.com/?Action=GetCallerIdentity&Version=2011-06-15"
)
The address to access STS here points to the global address, not the China STS service address. So the eks token obtained cannot be used in China. The sts_url that should be used in China is f “https://sts.{session.region_name}.amazonaws.com.cn/?Action=GetCallerIdentity&Version=2011-06-15”
How to reproduce
You can easily reproduce this using the dag script above, provided you use the identity credentials of your China AWS account.
Anything else
no
Are you willing to submit PR?
- Yes I am willing to submit a PR!
Code of Conduct
- I agree to follow this project's Code of Conduct
Metadata
Metadata
Assignees
Labels
area:providersgood first issuekind:bugThis is a clearly a bugThis is a clearly a bugprovider:amazonAWS/Amazon - related issuesAWS/Amazon - related issues