-
Notifications
You must be signed in to change notification settings - Fork 16.4k
Description
Apache Airflow version
Other Airflow 2 version (please specify below)
What happened
Google Cloud Composer Version - 2.1.5
Airflow Version - 2.4.3
We are trying to use dynamic task mapping with Kubernetes Pod Operator. Our use-case is to return the pod's CPU and memory requirements from a function which is included as a macro in DAG
Without dynamic task mapping it works perfectly, but when used with the dynamic task mapping, it is unable to recognize the macro.
container_resources is a templated field as per the docs, the feature was introduced in this PR.
We also tried the toggling the boolean render_template_as_native_obj, but still no luck.
Providing below a trimmed version of our DAG to help reproduce the issue. (function to return cpu and memory is trivial here just to show example)
What you think should happen instead
It should have worked similar with or without dynamic task mapping.
How to reproduce
Deployed the following DAG in Google Cloud Composer.
import datetime
import os
from airflow import models
from airflow.providers.cncf.kubernetes.operators.kubernetes_pod import (
KubernetesPodOperator,
)
from kubernetes.client import models as k8s_models
dvt_image = os.environ.get("DVT_IMAGE")
default_dag_args = {"start_date": datetime.datetime(2022, 1, 1)}
def pod_mem():
return "4000M"
def pod_cpu():
return "1000m"
with models.DAG(
"sample_dag",
schedule_interval=None,
default_args=default_dag_args,
render_template_as_native_obj=True,
user_defined_macros={
"pod_mem": pod_mem,
"pod_cpu": pod_cpu,
},
) as dag:
task_1 = KubernetesPodOperator(
task_id="task_1",
name="task_1",
namespace="default",
image=dvt_image,
cmds=["bash", "-cx"],
arguments=["echo hello"],
service_account_name="sa-k8s",
container_resources=k8s_models.V1ResourceRequirements(
limits={
"memory": "{{ pod_mem() }}",
"cpu": "{{ pod_cpu() }}",
}
),
startup_timeout_seconds=1800,
get_logs=True,
image_pull_policy="Always",
config_file="/home/airflow/composer_kube_config",
dag=dag,
)
task_2 = KubernetesPodOperator.partial(
task_id="task_2",
name="task_2",
namespace="default",
image=dvt_image,
cmds=["bash", "-cx"],
service_account_name="sa-k8s",
container_resources=k8s_models.V1ResourceRequirements(
limits={
"memory": "{{ pod_mem() }}",
"cpu": "{{ pod_cpu() }}",
}
),
startup_timeout_seconds=1800,
get_logs=True,
image_pull_policy="Always",
config_file="/home/airflow/composer_kube_config",
dag=dag,
).expand(arguments=[["echo hello"]])
task_1 >> task_2
task_1 (without dynamic task mapping) completes successfully, while task_2(with dynamic task mapping) fails.
Looking at the error logs, it failed while rendering the Pod spec since the calls to pod_cpu() and pod_mem() are unresolved.
Here is the traceback:
Exception when attempting to create Namespaced Pod: { "apiVersion": "v1", "kind": "Pod", "metadata": { "annotations": {}, "labels": { "dag_id": "sample_dag", "task_id": "task_2", "run_id": "manual__2023-02-08T183926.890852Z-eee90e4ee", "kubernetes_pod_operator": "True", "map_index": "0", "try_number": "2", "airflow_version": "2.4.3-composer", "airflow_kpo_in_cluster": "False" }, "name": "task-2-46f76eb0432d42ae9a331a6fc53835b3", "namespace": "default" }, "spec": { "affinity": {}, "containers": [ { "args": [ "echo hello" ], "command": [ "bash", "-cx" ], "env": [], "envFrom": [], "image": "us.gcr.io/ams-e2e-testing/edw-dvt-tool", "imagePullPolicy": "Always", "name": "base", "ports": [], "resources": { "limits": { "memory": "{{ pod_mem() }}", "cpu": "{{ pod_cpu() }}" } }, "volumeMounts": [] } ], "hostNetwork": false, "imagePullSecrets": [], "initContainers": [], "nodeSelector": {}, "restartPolicy": "Never", "securityContext": {}, "serviceAccountName": "sa-k8s", "tolerations": [], "volumes": [] } }
Traceback (most recent call last):
File "/opt/python3.8/lib/python3.8/site-packages/airflow/providers/cncf/kubernetes/utils/pod_manager.py", line 143, in run_pod_async
resp = self._client.create_namespaced_pod(
File "/opt/python3.8/lib/python3.8/site-packages/kubernetes/client/api/core_v1_api.py", line 7356, in create_namespaced_pod
return self.create_namespaced_pod_with_http_info(namespace, body, **kwargs) # noqa: E501
File "/opt/python3.8/lib/python3.8/site-packages/kubernetes/client/api/core_v1_api.py", line 7455, in create_namespaced_pod_with_http_info
return self.api_client.call_api(
File "/opt/python3.8/lib/python3.8/site-packages/kubernetes/client/api_client.py", line 348, in call_api
return self.__call_api(resource_path, method,
File "/opt/python3.8/lib/python3.8/site-packages/kubernetes/client/api_client.py", line 180, in __call_api
response_data = self.request(
File "/opt/python3.8/lib/python3.8/site-packages/kubernetes/client/api_client.py", line 391, in request
return self.rest_client.POST(url,
File "/opt/python3.8/lib/python3.8/site-packages/kubernetes/client/rest.py", line 275, in POST
return self.request("POST", url,
File "/opt/python3.8/lib/python3.8/site-packages/kubernetes/client/rest.py", line 234, in request
raise ApiException(http_resp=r)
kubernetes.client.exceptions.ApiException: (400)
Reason: Bad Request
HTTP response headers: HTTPHeaderDict({'Audit-Id': '1ef20c0b-6980-4173-b9cc-9af5b4792e86', 'Cache-Control': 'no-cache, private', 'Content-Type': 'application/json', 'X-Kubernetes-Pf-Flowschema-Uid': '1b263a21-4c75-4ef8-8147-c18780a13f0e', 'X-Kubernetes-Pf-Prioritylevel-Uid': '3cd4cda4-908c-4944-a422-5512b0fb88d6', 'Date': 'Wed, 08 Feb 2023 18:45:23 GMT', 'Content-Length': '256'})
HTTP response body: {"kind":"Status","apiVersion":"v1","metadata":{},"status":"Failure","message":"Pod in version "v1" cannot be handled as a Pod: quantities must match the regular expression '^([+-]?[0-9.]+)([eEinumkKMGTP][-+]?[0-9])$'","reason":"BadRequest","code":400}
Operating System
Google Composer Kubernetes Cluster
Versions of Apache Airflow Providers
No response
Deployment
Composer
Deployment details
No response
Anything else
No response
Are you willing to submit PR?
- Yes I am willing to submit a PR!
Code of Conduct
- I agree to follow this project's Code of Conduct