-
Notifications
You must be signed in to change notification settings - Fork 16.6k
Description
Apache Airflow Provider(s)
dbt-cloud
Versions of Apache Airflow Providers
apache-airflow-providers-dbt-cloud>=4.6.4rc1
Apache Airflow version
main
Operating System
Debian GNU/Linux 12 (bookworm)
Deployment
Other
Deployment details
No response
What happened
When calling DbtCloudHook.wait_for_job_run_status directly from a task, a dbt Cloud job that reaches a terminal failure state (ERROR or CANCELLED) can cause the Airflow task to succeed silently instead of failing.
wait_for_job_run_status blocks until the job run completes. However, if the job run reaches a terminal failure state before reaching the expected status (default: SUCCESS), the method returns False instead of raising an exception. In an Airflow context, where task success and failure are exception-driven, this allows task execution to complete successfully even though the external dbt Cloud job failed.
This behavior is surprising given the blocking nature of the method and its default expectation of SUCCESS.
What you think should happen instead
If DbtCloudHook.wait_for_job_run_status reaches an unexpected terminal state (ERROR or CANCELLED) before reaching the expected status, wait_for_job_run_status should raise DbtCloudJobRunException.
This ensures:
- Airflow task state correctly reflects dbt Cloud job outcome
- Terminal job failures are not silently suppressed
- Callers can safely use
wait_for_job_run_statusas a synchronization primitive - Behavior is consistent with Airflow’s exception-driven execution model
How to reproduce
- Create a dbt Cloud job that fails (for example, a job with invalid SQL).
- Configure a dbt Cloud connection in Airflow.
(The connection IDdbt_cloud_defaultis used for this reproduction.) - Create the following DAG:
from datetime import datetime
from airflow import DAG
from airflow.decorators import task
from airflow.providers.dbt.cloud.hooks.dbt import DbtCloudHook
@task
def wait_for_failed_dbt_job():
hook = DbtCloudHook(dbt_cloud_conn_id="dbt_cloud_default")
response = hook.trigger_job_run(
job_id=<FAILED_JOB_ID>,
cause="airflow repro",
)
run_id = response.json()["data"]["id"]
hook.wait_for_job_run_status(
run_id=run_id,
check_interval=10,
timeout=600,
)
with DAG(
dag_id="dbt_cloud_wait_for_job_run_status_silent_failure",
start_date=datetime(2024, 1, 1),
schedule=None,
catchup=False,
):
wait_for_failed_dbt_job()- Trigger the DAG and wait for the dbt Cloud job to complete and reach a terminal failure state (
ERRORorCANCELLED).
Observed Behavior
The Airflow task completes successfully without raising an exception.
Anything else
wait_for_job_run_status is part of the public dbt Cloud hook API and is callable directly from user code. Given its blocking behavior and default expectation of SUCCESS, it is reasonable for users to rely on this method as a synchronization primitive and expect terminal job failures to raise exceptions.
Requiring callers to explicitly inspect a boolean return value to detect terminal failure states is error-prone in an Airflow context, where task success and failure are exception-driven. Supporting safe direct usage of this method helps prevent silent failures and aligns its behavior with common Airflow execution patterns.
Are you willing to submit PR?
- Yes I am willing to submit a PR!
Code of Conduct
- I agree to follow this project's Code of Conduct