Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 11 additions & 0 deletions providers/src/airflow/providers/dbt/cloud/CHANGELOG.rst
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,17 @@
Changelog
---------

main
.....

.. warning::
All deprecated classes, parameters and features have been removed from the DBT provider package.
The following breaking changes were introduced:

* Sensors
* Remove ``airflow.providers.dbt.cloud.sensors.dbt.DbtCloudJobRunAsyncSensor``. Use ``airflow.providers.dbt.cloud.sensors.dbt.DbtCloudJobRunSensor`` with ``deferrable`` set to ``True`` instead.
* Removed ``polling_interval`` parameter from ``DbtCloudJobRunSensor``. Use ``poke_interval`` instead.

3.11.2
......

Expand Down
36 changes: 2 additions & 34 deletions providers/src/airflow/providers/dbt/cloud/sensors/dbt.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,14 +17,11 @@
from __future__ import annotations

import time
import warnings
from functools import cached_property
from typing import TYPE_CHECKING, Any

from deprecated import deprecated

from airflow.configuration import conf
from airflow.exceptions import AirflowException, AirflowProviderDeprecationWarning
from airflow.exceptions import AirflowException
from airflow.providers.dbt.cloud.hooks.dbt import DbtCloudHook, DbtCloudJobRunException, DbtCloudJobRunStatus
from airflow.providers.dbt.cloud.triggers.dbt import DbtCloudRunJobTrigger
from airflow.providers.dbt.cloud.utils.openlineage import generate_openlineage_events_from_dbt_cloud_run
Expand Down Expand Up @@ -62,17 +59,7 @@ def __init__(
) -> None:
if deferrable:
if "poke_interval" not in kwargs:
# TODO: Remove once deprecated
if "polling_interval" in kwargs:
kwargs["poke_interval"] = kwargs["polling_interval"]
warnings.warn(
"Argument `poll_interval` is deprecated and will be removed "
"in a future release. Please use `poke_interval` instead.",
AirflowProviderDeprecationWarning,
stacklevel=2,
)
else:
kwargs["poke_interval"] = 5
kwargs["poke_interval"] = 5

if "timeout" not in kwargs:
kwargs["timeout"] = 60 * 60 * 24 * 7
Expand Down Expand Up @@ -142,22 +129,3 @@ def execute_complete(self, context: Context, event: dict[str, Any]) -> int:
def get_openlineage_facets_on_complete(self, task_instance) -> OperatorLineage:
"""Implement _on_complete because job_run needs to be triggered first in execute method."""
return generate_openlineage_events_from_dbt_cloud_run(operator=self, task_instance=task_instance)


@deprecated(
reason=(
"Class `DbtCloudJobRunAsyncSensor` is deprecated and will be removed in a future release. "
"Please use `DbtCloudJobRunSensor` and set `deferrable` attribute to `True` instead"
),
category=AirflowProviderDeprecationWarning,
)
class DbtCloudJobRunAsyncSensor(DbtCloudJobRunSensor):
"""
This class is deprecated.

Please use :class:`airflow.providers.dbt.cloud.sensor.dbt.DbtCloudJobRunSensor`
with ``deferrable=True``.
"""

def __init__(self, **kwargs: Any) -> None:
super().__init__(deferrable=True, **kwargs)
69 changes: 1 addition & 68 deletions providers/tests/dbt/cloud/sensors/test_dbt.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,12 +23,11 @@

from airflow.exceptions import (
AirflowException,
AirflowProviderDeprecationWarning,
TaskDeferred,
)
from airflow.models.connection import Connection
from airflow.providers.dbt.cloud.hooks.dbt import DbtCloudHook, DbtCloudJobRunException, DbtCloudJobRunStatus
from airflow.providers.dbt.cloud.sensors.dbt import DbtCloudJobRunAsyncSensor, DbtCloudJobRunSensor
from airflow.providers.dbt.cloud.sensors.dbt import DbtCloudJobRunSensor
from airflow.providers.dbt.cloud.triggers.dbt import DbtCloudRunJobTrigger
from airflow.utils import db

Expand Down Expand Up @@ -168,69 +167,3 @@ def test_execute_complete_failure(self, mock_status, mock_message):
task.execute_complete(
context={}, event={"status": mock_status, "message": mock_message, "run_id": self.DBT_RUN_ID}
)


class TestDbtCloudJobRunAsyncSensor:
TASK_ID = "dbt_cloud_run_job"
CONN_ID = "dbt_cloud_default"
DBT_RUN_ID = 1234
TIMEOUT = 300

depcrecation_message = (
"Class `DbtCloudJobRunAsyncSensor` is deprecated and will be removed in a future release. "
"Please use `DbtCloudJobRunSensor` and set `deferrable` attribute to `True` instead"
)

@mock.patch("airflow.providers.dbt.cloud.sensors.dbt.DbtCloudHook")
def test_dbt_job_run_async_sensor(self, mock_hook):
"""Assert execute method defer for Dbt cloud job run status sensors"""

with pytest.warns(AirflowProviderDeprecationWarning, match=self.depcrecation_message):
task = DbtCloudJobRunAsyncSensor(
dbt_cloud_conn_id=self.CONN_ID,
task_id=self.TASK_ID,
run_id=self.DBT_RUN_ID,
timeout=self.TIMEOUT,
)
mock_hook.return_value.get_job_run_status.return_value = DbtCloudJobRunStatus.STARTING.value
with pytest.raises(TaskDeferred) as exc:
task.execute({})
assert isinstance(exc.value.trigger, DbtCloudRunJobTrigger), "Trigger is not a DbtCloudRunJobTrigger"

def test_dbt_job_run_async_sensor_execute_complete_success(self):
"""Assert execute_complete log success message when trigger fire with target status"""
with pytest.warns(AirflowProviderDeprecationWarning, match=self.depcrecation_message):
task = DbtCloudJobRunAsyncSensor(
dbt_cloud_conn_id=self.CONN_ID,
task_id=self.TASK_ID,
run_id=self.DBT_RUN_ID,
timeout=self.TIMEOUT,
)

msg = f"Job run {self.DBT_RUN_ID} has completed successfully."
with mock.patch.object(task.log, "info") as mock_log_info:
task.execute_complete(
context={}, event={"status": "success", "message": msg, "run_id": self.DBT_RUN_ID}
)
mock_log_info.assert_called_with(msg)

@pytest.mark.parametrize(
"mock_status, mock_message",
[
("cancelled", "Job run 1234 has been cancelled."),
("error", "Job run 1234 has failed."),
],
)
def test_dbt_job_run_async_sensor_execute_complete_failure(self, mock_status, mock_message):
"""Assert execute_complete method to raise exception on the cancelled and error status"""
with pytest.warns(AirflowProviderDeprecationWarning, match=self.depcrecation_message):
task = DbtCloudJobRunAsyncSensor(
dbt_cloud_conn_id=self.CONN_ID,
task_id=self.TASK_ID,
run_id=self.DBT_RUN_ID,
timeout=self.TIMEOUT,
)
with pytest.raises(AirflowException):
task.execute_complete(
context={}, event={"status": mock_status, "message": mock_message, "run_id": self.DBT_RUN_ID}
)