Skip to content

Jinja templating doesn't work with container_resources when using dymanic task mapping with Kubernetes Pod Operator #29432

@pshrivastava27

Description

@pshrivastava27

Apache Airflow version

Other Airflow 2 version (please specify below)

What happened

Google Cloud Composer Version - 2.1.5
Airflow Version - 2.4.3

We are trying to use dynamic task mapping with Kubernetes Pod Operator. Our use-case is to return the pod's CPU and memory requirements from a function which is included as a macro in DAG

Without dynamic task mapping it works perfectly, but when used with the dynamic task mapping, it is unable to recognize the macro.

container_resources is a templated field as per the docs, the feature was introduced in this PR.
We also tried the toggling the boolean render_template_as_native_obj, but still no luck.

Providing below a trimmed version of our DAG to help reproduce the issue. (function to return cpu and memory is trivial here just to show example)

What you think should happen instead

It should have worked similar with or without dynamic task mapping.

How to reproduce

Deployed the following DAG in Google Cloud Composer.

import datetime
import os

from airflow import models
from airflow.providers.cncf.kubernetes.operators.kubernetes_pod import (
    KubernetesPodOperator,
)
from kubernetes.client import models as k8s_models

dvt_image = os.environ.get("DVT_IMAGE")

default_dag_args = {"start_date": datetime.datetime(2022, 1, 1)}


def pod_mem():
    return "4000M"


def pod_cpu():
    return "1000m"


with models.DAG(
    "sample_dag",
    schedule_interval=None,
    default_args=default_dag_args,
    render_template_as_native_obj=True,
    user_defined_macros={
        "pod_mem": pod_mem,
        "pod_cpu": pod_cpu,
    },
) as dag:

    task_1 = KubernetesPodOperator(
        task_id="task_1",
        name="task_1",
        namespace="default",
        image=dvt_image,
        cmds=["bash", "-cx"],
        arguments=["echo hello"],
        service_account_name="sa-k8s",
        container_resources=k8s_models.V1ResourceRequirements(
            limits={
                "memory": "{{ pod_mem() }}",
                "cpu": "{{ pod_cpu() }}",
            }
        ),
        startup_timeout_seconds=1800,
        get_logs=True,
        image_pull_policy="Always",
        config_file="/home/airflow/composer_kube_config",
        dag=dag,
    )

    task_2 = KubernetesPodOperator.partial(
        task_id="task_2",
        name="task_2",
        namespace="default",
        image=dvt_image,
        cmds=["bash", "-cx"],
        service_account_name="sa-k8s",
        container_resources=k8s_models.V1ResourceRequirements(
            limits={
                "memory": "{{ pod_mem() }}",
                "cpu": "{{ pod_cpu() }}",
            }
        ),
        startup_timeout_seconds=1800,
        get_logs=True,
        image_pull_policy="Always",
        config_file="/home/airflow/composer_kube_config",
        dag=dag,
    ).expand(arguments=[["echo hello"]])

    task_1 >> task_2

task_1 (without dynamic task mapping) completes successfully, while task_2(with dynamic task mapping) fails.

Looking at the error logs, it failed while rendering the Pod spec since the calls to pod_cpu() and pod_mem() are unresolved.

Here is the traceback:

Exception when attempting to create Namespaced Pod: { "apiVersion": "v1", "kind": "Pod", "metadata": { "annotations": {}, "labels": { "dag_id": "sample_dag", "task_id": "task_2", "run_id": "manual__2023-02-08T183926.890852Z-eee90e4ee", "kubernetes_pod_operator": "True", "map_index": "0", "try_number": "2", "airflow_version": "2.4.3-composer", "airflow_kpo_in_cluster": "False" }, "name": "task-2-46f76eb0432d42ae9a331a6fc53835b3", "namespace": "default" }, "spec": { "affinity": {}, "containers": [ { "args": [ "echo hello" ], "command": [ "bash", "-cx" ], "env": [], "envFrom": [], "image": "us.gcr.io/ams-e2e-testing/edw-dvt-tool", "imagePullPolicy": "Always", "name": "base", "ports": [], "resources": { "limits": { "memory": "{{ pod_mem() }}", "cpu": "{{ pod_cpu() }}" } }, "volumeMounts": [] } ], "hostNetwork": false, "imagePullSecrets": [], "initContainers": [], "nodeSelector": {}, "restartPolicy": "Never", "securityContext": {}, "serviceAccountName": "sa-k8s", "tolerations": [], "volumes": [] } }
Traceback (most recent call last):
File "/opt/python3.8/lib/python3.8/site-packages/airflow/providers/cncf/kubernetes/utils/pod_manager.py", line 143, in run_pod_async
resp = self._client.create_namespaced_pod(
File "/opt/python3.8/lib/python3.8/site-packages/kubernetes/client/api/core_v1_api.py", line 7356, in create_namespaced_pod
return self.create_namespaced_pod_with_http_info(namespace, body, **kwargs) # noqa: E501
File "/opt/python3.8/lib/python3.8/site-packages/kubernetes/client/api/core_v1_api.py", line 7455, in create_namespaced_pod_with_http_info
return self.api_client.call_api(
File "/opt/python3.8/lib/python3.8/site-packages/kubernetes/client/api_client.py", line 348, in call_api
return self.__call_api(resource_path, method,
File "/opt/python3.8/lib/python3.8/site-packages/kubernetes/client/api_client.py", line 180, in __call_api
response_data = self.request(
File "/opt/python3.8/lib/python3.8/site-packages/kubernetes/client/api_client.py", line 391, in request
return self.rest_client.POST(url,
File "/opt/python3.8/lib/python3.8/site-packages/kubernetes/client/rest.py", line 275, in POST
return self.request("POST", url,
File "/opt/python3.8/lib/python3.8/site-packages/kubernetes/client/rest.py", line 234, in request
raise ApiException(http_resp=r)
kubernetes.client.exceptions.ApiException: (400)
Reason: Bad Request
HTTP response headers: HTTPHeaderDict({'Audit-Id': '1ef20c0b-6980-4173-b9cc-9af5b4792e86', 'Cache-Control': 'no-cache, private', 'Content-Type': 'application/json', 'X-Kubernetes-Pf-Flowschema-Uid': '1b263a21-4c75-4ef8-8147-c18780a13f0e', 'X-Kubernetes-Pf-Prioritylevel-Uid': '3cd4cda4-908c-4944-a422-5512b0fb88d6', 'Date': 'Wed, 08 Feb 2023 18:45:23 GMT', 'Content-Length': '256'})
HTTP response body: {"kind":"Status","apiVersion":"v1","metadata":{},"status":"Failure","message":"Pod in version "v1" cannot be handled as a Pod: quantities must match the regular expression '^([+-]?[0-9.]+)([eEinumkKMGTP][-+]?[0-9])$'","reason":"BadRequest","code":400}

Operating System

Google Composer Kubernetes Cluster

Versions of Apache Airflow Providers

No response

Deployment

Composer

Deployment details

No response

Anything else

No response

Are you willing to submit PR?

  • Yes I am willing to submit a PR!

Code of Conduct

Metadata

Metadata

Assignees

Labels

Type

No type

Projects

No projects

Milestone

No milestone

Relationships

None yet

Development

No branches or pull requests

Issue actions