Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 16 additions & 0 deletions airflow/providers/dbt/cloud/CHANGELOG.rst
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,22 @@
Changelog
---------

3.0.0
.....

Breaking changes
~~~~~~~~~~~~~~~~

Beginning with version 2.0.0, users could specify single-tenant dbt Cloud domains via the ``schema`` parameter
in an Airflow connection. Subsequently in version 2.3.1, users could also connect to the dbt Cloud instances
outside of the US region as well as private instances by using the ``host`` parameter of their Airflow
connection to specify the entire tenant domain. Backwards compatibility for using ``schema`` was left in
place. Version 3.0.0 removes support for using ``schema`` to specify the tenant domain of a dbt Cloud
instance. If you wish to connect to a single-tenant, instance outside of the US, or a private instance, you
must use the ``host`` parameter to specify the _entire_ tenant domain name in your Airflow connection.

* ``Drop Connection.schema use in DbtCloudHook (#29166)``

2.3.1
.....

Expand Down
29 changes: 8 additions & 21 deletions airflow/providers/dbt/cloud/hooks/dbt.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@

import json
import time
import warnings
from enum import Enum
from functools import wraps
from inspect import signature
Expand Down Expand Up @@ -181,33 +180,33 @@ def __init__(self, dbt_cloud_conn_id: str = default_conn_name, *args, **kwargs)
super().__init__(auth_type=TokenAuth)
self.dbt_cloud_conn_id = dbt_cloud_conn_id

@staticmethod
def _get_tenant_domain(conn: Connection) -> str:
return conn.host or "cloud.getdbt.com"

@staticmethod
def get_request_url_params(
tenant: str, endpoint: str, include_related: list[str] | None = None
) -> tuple[str, dict[str, Any]]:
"""
Form URL from base url and endpoint url

:param tenant: The tenant name which is need to be replaced in base url.
:param tenant: The tenant domain name which is need to be replaced in base url.
:param endpoint: Endpoint url to be requested.
:param include_related: Optional. List of related fields to pull with the run.
Valid values are "trigger", "job", "repository", and "environment".
"""
data: dict[str, Any] = {}
base_url = f"https://{tenant}.getdbt.com/api/v2/accounts/"
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Old logic that is no longer valid.

if include_related:
data = {"include_related": include_related}
if base_url and not base_url.endswith("/") and endpoint and not endpoint.startswith("/"):
url = base_url + "/" + endpoint
else:
url = (base_url or "") + (endpoint or "")
Copy link
Contributor Author

@josh-fell josh-fell Jan 25, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Refactoring the branching here since the else would never be executed anyway.

url = f"https://{tenant}/api/v2/accounts/{endpoint or ''}"
return url, data

async def get_headers_tenants_from_connection(self) -> tuple[dict[str, Any], str]:
"""Get Headers, tenants from the connection details"""
headers: dict[str, Any] = {}
connection: Connection = await sync_to_async(self.get_connection)(self.dbt_cloud_conn_id)
tenant: str = connection.schema if connection.schema else "cloud"
tenant = self._get_tenant_domain(connection)
package_name, provider_version = _get_provider_info()
headers["User-Agent"] = f"{package_name}-v{provider_version}"
headers["Content-Type"] = "application/json"
Expand Down Expand Up @@ -267,19 +266,7 @@ def connection(self) -> Connection:
return _connection

def get_conn(self, *args, **kwargs) -> Session:
if self.connection.schema:
warnings.warn(
"The `schema` parameter is deprecated and use within a dbt Cloud connection will be removed "
"in a future version. Please use `host` instead and specify the entire tenant domain name.",
category=DeprecationWarning,
stacklevel=2,
)
# Prior to deprecation, the connection.schema value could _only_ modify the third-level
# domain value while '.getdbt.com' was always used as the remainder of the domain name.
tenant = f"{self.connection.schema}.getdbt.com"
else:
tenant = self.connection.host or "cloud.getdbt.com"

tenant = self._get_tenant_domain(self.connection)
self.base_url = f"https://{tenant}/api/v2/accounts/"

session = Session()
Expand Down
1 change: 1 addition & 0 deletions airflow/providers/dbt/cloud/provider.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ description: |
`dbt Cloud <https://www.getdbt.com/product/what-is-dbt/>`__

versions:
- 3.0.0
- 2.3.1
- 2.3.0
- 2.2.0
Expand Down