Skip to content

DbtCloudHook.wait_for_job_run_status can silently succeed on terminal job failure #61297

@SameerMesiah97

Description

@SameerMesiah97

Apache Airflow Provider(s)

dbt-cloud

Versions of Apache Airflow Providers

apache-airflow-providers-dbt-cloud>=4.6.4rc1

Apache Airflow version

main

Operating System

Debian GNU/Linux 12 (bookworm)

Deployment

Other

Deployment details

No response

What happened

When calling DbtCloudHook.wait_for_job_run_status directly from a task, a dbt Cloud job that reaches a terminal failure state (ERROR or CANCELLED) can cause the Airflow task to succeed silently instead of failing.

wait_for_job_run_status blocks until the job run completes. However, if the job run reaches a terminal failure state before reaching the expected status (default: SUCCESS), the method returns False instead of raising an exception. In an Airflow context, where task success and failure are exception-driven, this allows task execution to complete successfully even though the external dbt Cloud job failed.

This behavior is surprising given the blocking nature of the method and its default expectation of SUCCESS.

What you think should happen instead

If DbtCloudHook.wait_for_job_run_status reaches an unexpected terminal state (ERROR or CANCELLED) before reaching the expected status, wait_for_job_run_status should raise DbtCloudJobRunException.

This ensures:

  • Airflow task state correctly reflects dbt Cloud job outcome
  • Terminal job failures are not silently suppressed
  • Callers can safely use wait_for_job_run_status as a synchronization primitive
  • Behavior is consistent with Airflow’s exception-driven execution model

How to reproduce

  1. Create a dbt Cloud job that fails (for example, a job with invalid SQL).
  2. Configure a dbt Cloud connection in Airflow.
    (The connection ID dbt_cloud_default is used for this reproduction.)
  3. Create the following DAG:
from datetime import datetime

from airflow import DAG
from airflow.decorators import task
from airflow.providers.dbt.cloud.hooks.dbt import DbtCloudHook


@task
def wait_for_failed_dbt_job():
    hook = DbtCloudHook(dbt_cloud_conn_id="dbt_cloud_default")

    response = hook.trigger_job_run(
        job_id=<FAILED_JOB_ID>,
        cause="airflow repro",
    )

    run_id = response.json()["data"]["id"]

    hook.wait_for_job_run_status(
        run_id=run_id,
        check_interval=10,
        timeout=600,
    )


with DAG(
    dag_id="dbt_cloud_wait_for_job_run_status_silent_failure",
    start_date=datetime(2024, 1, 1),
    schedule=None,
    catchup=False,
):
    wait_for_failed_dbt_job()
  1. Trigger the DAG and wait for the dbt Cloud job to complete and reach a terminal failure state (ERROR or CANCELLED).

Observed Behavior

The Airflow task completes successfully without raising an exception.

Anything else

wait_for_job_run_status is part of the public dbt Cloud hook API and is callable directly from user code. Given its blocking behavior and default expectation of SUCCESS, it is reasonable for users to rely on this method as a synchronization primitive and expect terminal job failures to raise exceptions.

Requiring callers to explicitly inspect a boolean return value to detect terminal failure states is error-prone in an Airflow context, where task success and failure are exception-driven. Supporting safe direct usage of this method helps prevent silent failures and aligns its behavior with common Airflow execution patterns.

Are you willing to submit PR?

  • Yes I am willing to submit a PR!

Code of Conduct

Metadata

Metadata

Assignees

No one assigned

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions