Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 6 additions & 9 deletions airflow/providers/dbt/cloud/operators/dbt.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@
from typing import TYPE_CHECKING, Any

from airflow.configuration import conf
from airflow.exceptions import AirflowException
from airflow.models import BaseOperator, BaseOperatorLink, XCom
from airflow.providers.dbt.cloud.hooks.dbt import (
DbtCloudHook,
Expand Down Expand Up @@ -189,15 +188,13 @@ def execute(self, context: Context):
return self.run_id

def execute_complete(self, context: Context, event: dict[str, Any]) -> int:
"""
Execute when the trigger fires - returns immediately.

Relies on trigger to throw an exception, otherwise it assumes execution was successful.
"""
if event["status"] == "error":
raise AirflowException(event["message"])
self.log.info(event["message"])
"""Execute when the trigger fires - returns immediately."""
self.run_id = event["run_id"]
if event["status"] == "cancelled":
raise DbtCloudJobRunException(f"Job run {self.run_id} has been cancelled.")
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Has it been discussed somewhere already as to whether we want to raise an error when the job may have deliberately been cancelled by the user?
Just unsure if we want to raise an error in such scenarios, looking for more thoughts.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It currently works like this in the non-deferrable mode. I also think it makes sense to prevent the start of any following tasks or DAGs when the dbt job gets cancelled.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes, makes sense 👍🏽

elif event["status"] == "error":
raise DbtCloudJobRunException(f"Job run {self.run_id} has failed.")
self.log.info(event["message"])
return int(event["run_id"])

def on_kill(self) -> None:
Expand Down