-
Notifications
You must be signed in to change notification settings - Fork 16.4k
Description
Hello Everyone!
Not entirely sure if this is a bug within the CloudRunExecuteJobOperator or a bug afterall (I am quite new to Airflow).
I am running into an issue with render_template_as_native_obj and I can't figure out what is going wrong.
Maybe one of you has an idea why render_template_as_native_obj works correctly on the PythonOperator but seems to fail on the CloudRunExecuteJobOperator?
My DAG:
import datetime
from airflow import DAG
from airflow.providers.google.cloud.operators.cloud_run import (
CloudRunExecuteJobOperator,
)
from airflow.models.param import Param, ParamsDict
from airflow.providers.standard.operators.python import PythonOperator
default_args = {
"start_date": datetime.datetime(2025, 1, 1),
"retries": 0,
}
params = ParamsDict(
{
"project_id": Param(
default="XXXXXXXXXXX",
type="string",
description="GCP Project ID in which the cloud run job is deployed",
),
"job_name": Param(
default="XXXXXXXXXXX",
type="string",
description="Name of the cloud run job to execute",
),
"region": Param(
default="europe-west4",
type="string",
description="Region in which the cloud run job is deployed",
),
"container_command": Param(
default=["build", "--target", "test"],
type="array",
description="Command to execute in the container",
),
"polling_period_seconds": Param(
default=5,
type="integer",
description="Frequency of polling the underlying cloud run job",
),
"timeout_seconds": Param(
default=600,
type="integer",
description="Timeout for the cloud run job execution to cancel the execution",
),
}
)
with DAG(
dag_id="ae_adh_dbt",
default_args=default_args,
schedule_interval="*/15 * * * *",
catchup=False,
tags=["cloud_run"],
params=params,
render_template_as_native_obj=True,
) as dag:
def test(**kwargs):
print("project_id = " + str(type(kwargs["project_id"])))
print("polling_period_seconds = " + str(type(kwargs["polling_period_seconds"])))
print("timeout_seconds = " + str(type(kwargs["timeout_seconds"])))
logging_job = PythonOperator(
task_id="logging_job",
python_callable=test,
op_kwargs={
"project_id": "{{ params.project_id }}",
"polling_period_seconds": "{{ params.polling_period_seconds }}",
"timeout_seconds": "{{ params.timeout_seconds }}",
},
)
trigger_cloud_run_job = CloudRunExecuteJobOperator(
task_id="trigger_cloud_run_job",
project_id="{{ params.project_id }}",
region="{{ params.region }}",
job_name="{{ params.job_name }}",
overrides={
"container_overrides": [
{
"args": "{{ params.container_command }}",
}
],
},
polling_period_seconds="{{ params.polling_period_seconds }}",
timeout_seconds="{{ params.timeout_seconds }}",
gcp_conn_id="google_cloud_default",
deferrable=True,
)
logging_job >> trigger_cloud_run_job
The logging_job logs indicates that render_template_as_native_obj works correctly:
[2025-04-24, 08:30:03 UTC] {logging_mixin.py:190} INFO - project_id = <class 'str'>
[2025-04-24, 08:30:03 UTC] {logging_mixin.py:190} INFO - polling_period_seconds = <class 'int'>
[2025-04-24, 08:30:03 UTC] {logging_mixin.py:190} INFO - timeout_seconds = <class 'int'>
But then in the CloudRunExecuteJobOperator I get the following type related error:
[2025-04-24, 09:45:10 CEST] {baseoperator.py:1818} ERROR - Trigger failed:\nTraceback (most recent call last):\n\n File "/opt/python3.11/lib/python3.11/site-packages/airflow/jobs/triggerer_job_runner.py", line 558, in cleanup_finished_triggers\n result = details["task"].result()\n ^^^^^^^^^^^^^^^^^^^^^^^^\n\n File "/opt/python3.11/lib/python3.11/site-packages/airflow/jobs/triggerer_job_runner.py", line 631, in run_trigger\n async for event in trigger.run():\n\n File "/opt/python3.11/lib/python3.11/site-packages/airflow/providers/google/cloud/triggers/cloud_run.py", line 133, in run\n await asyncio.sleep(self.polling_period_seconds)\n\n File "/opt/python3.11/lib/python3.11/asyncio/tasks.py", line 639, in sleep\n if delay <= 0:\n ^^^^^^^^^^\n\nTypeError: '<=' not supported between instances of 'str' and 'int'\n
Apache Airflow Provider(s)
Versions of Apache Airflow Providers
I am using Airflow on cloud-composer, version (composer-3-airflow-2.10.5-build.0):
apache-airflow==2.10.5+composer with apache-airflow-providers-google==14.0.0
Apache Airflow version
2.10.5
Operating System
Not sure
Deployment
Google Cloud Composer
Deployment details
No response
What happened
I get a type error, that polling_period_seconds is of type str when I would expect int instead.
What you think should happen instead
That polling_period_seconds should be an int because I am using render_template_as_native_obj on my DAG. Weirdly in the PythonOperator this works correct (see example).
How to reproduce
Use the provided DAG with a cloud run job instance in the cloud
Anything else
No response
Are you willing to submit PR?
- Yes I am willing to submit a PR!
Code of Conduct
- I agree to follow this project's Code of Conduct