Skip to content

render_template_as_native_obj does not work in combination with the CloudRunExecuteJobOperator #49695

@ramonvermeulen

Description

@ramonvermeulen

Hello Everyone!

Not entirely sure if this is a bug within the CloudRunExecuteJobOperator or a bug afterall (I am quite new to Airflow).

I am running into an issue with render_template_as_native_obj and I can't figure out what is going wrong.
Maybe one of you has an idea why render_template_as_native_obj works correctly on the PythonOperator but seems to fail on the CloudRunExecuteJobOperator?

My DAG:

import datetime

from airflow import DAG
from airflow.providers.google.cloud.operators.cloud_run import (
    CloudRunExecuteJobOperator,
)
from airflow.models.param import Param, ParamsDict
from airflow.providers.standard.operators.python import PythonOperator


default_args = {
    "start_date": datetime.datetime(2025, 1, 1),
    "retries": 0,
}

params = ParamsDict(
    {
        "project_id": Param(
            default="XXXXXXXXXXX",
            type="string",
            description="GCP Project ID in which the cloud run job is deployed",
        ),
        "job_name": Param(
            default="XXXXXXXXXXX",
            type="string",
            description="Name of the cloud run job to execute",
        ),
        "region": Param(
            default="europe-west4",
            type="string",
            description="Region in which the cloud run job is deployed",
        ),
        "container_command": Param(
            default=["build", "--target", "test"],
            type="array",
            description="Command to execute in the container",
        ),
        "polling_period_seconds": Param(
            default=5,
            type="integer",
            description="Frequency of polling the underlying cloud run job",
        ),
        "timeout_seconds": Param(
            default=600,
            type="integer",
            description="Timeout for the cloud run job execution to cancel the execution",
        ),
    }
)

with DAG(
    dag_id="ae_adh_dbt",
    default_args=default_args,
    schedule_interval="*/15 * * * *",
    catchup=False,
    tags=["cloud_run"],
    params=params,
    render_template_as_native_obj=True,
) as dag:

    def test(**kwargs):
        print("project_id = " + str(type(kwargs["project_id"])))
        print("polling_period_seconds = " + str(type(kwargs["polling_period_seconds"])))
        print("timeout_seconds = " + str(type(kwargs["timeout_seconds"])))

    logging_job = PythonOperator(
        task_id="logging_job",
        python_callable=test,
        op_kwargs={
            "project_id": "{{ params.project_id }}",
            "polling_period_seconds": "{{ params.polling_period_seconds }}",
            "timeout_seconds": "{{ params.timeout_seconds }}",
        },
    )
    trigger_cloud_run_job = CloudRunExecuteJobOperator(
        task_id="trigger_cloud_run_job",
        project_id="{{ params.project_id }}",
        region="{{ params.region }}",
        job_name="{{ params.job_name }}",
        overrides={
            "container_overrides": [
                {
                    "args": "{{ params.container_command }}",
                }
            ],
        },
        polling_period_seconds="{{ params.polling_period_seconds }}",
        timeout_seconds="{{ params.timeout_seconds }}",
        gcp_conn_id="google_cloud_default",
        deferrable=True,
    )

    logging_job >> trigger_cloud_run_job

The logging_job logs indicates that render_template_as_native_obj works correctly:

[2025-04-24, 08:30:03 UTC] {logging_mixin.py:190} INFO - project_id = <class 'str'>
[2025-04-24, 08:30:03 UTC] {logging_mixin.py:190} INFO - polling_period_seconds = <class 'int'>
[2025-04-24, 08:30:03 UTC] {logging_mixin.py:190} INFO - timeout_seconds = <class 'int'>

But then in the CloudRunExecuteJobOperator I get the following type related error:

[2025-04-24, 09:45:10 CEST] {baseoperator.py:1818} ERROR - Trigger failed:\nTraceback (most recent call last):\n\n  File "/opt/python3.11/lib/python3.11/site-packages/airflow/jobs/triggerer_job_runner.py", line 558, in cleanup_finished_triggers\n    result = details["task"].result()\n             ^^^^^^^^^^^^^^^^^^^^^^^^\n\n  File "/opt/python3.11/lib/python3.11/site-packages/airflow/jobs/triggerer_job_runner.py", line 631, in run_trigger\n    async for event in trigger.run():\n\n  File "/opt/python3.11/lib/python3.11/site-packages/airflow/providers/google/cloud/triggers/cloud_run.py", line 133, in run\n    await asyncio.sleep(self.polling_period_seconds)\n\n  File "/opt/python3.11/lib/python3.11/asyncio/tasks.py", line 639, in sleep\n    if delay <= 0:\n       ^^^^^^^^^^\n\nTypeError: '<=' not supported between instances of 'str' and 'int'\n

Apache Airflow Provider(s)

google

Versions of Apache Airflow Providers

I am using Airflow on cloud-composer, version (composer-3-airflow-2.10.5-build.0):
apache-airflow==2.10.5+composer with apache-airflow-providers-google==14.0.0

Apache Airflow version

2.10.5

Operating System

Not sure

Deployment

Google Cloud Composer

Deployment details

No response

What happened

I get a type error, that polling_period_seconds is of type str when I would expect int instead.

What you think should happen instead

That polling_period_seconds should be an int because I am using render_template_as_native_obj on my DAG. Weirdly in the PythonOperator this works correct (see example).

How to reproduce

Use the provided DAG with a cloud run job instance in the cloud

Anything else

No response

Are you willing to submit PR?

  • Yes I am willing to submit a PR!

Code of Conduct

Metadata

Metadata

Assignees

No one assigned

    Labels

    area:providerskind:bugThis is a clearly a bugneeds-triagelabel for new issues that we didn't triage yetprovider:googleGoogle (including GCP) related issues

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions