Skip to content
22 changes: 22 additions & 0 deletions docs/apache-airflow-providers-dbt-cloud/operators.rst
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,28 @@ via the ``additional_run_config`` dictionary.
:start-after: [START howto_operator_dbt_cloud_run_job_async]
:end-before: [END howto_operator_dbt_cloud_run_job_async]

You can trigger a dbt Cloud job in two ways:

1. Directly using the ``job_id`` parameter
2. Looking up the job using a combination of identifiers:

* Project: either ``project_id`` or ``project_name``
* Environment: either ``environment_id`` or ``environment_name``
* Job: ``job_name``

When using the lookup method, you must provide either IDs or names for both the project and environment,
along with the job name. For example, you could use ``project_name``, ``environment_id``, and ``job_name``,
or ``project_id``, ``environment_name``, and ``job_name``.

Please note that the lookup method will only work if the provided combination uniquely identifies a job
in your account. The job name must be unique within the specified project and environment.

.. exampleinclude:: /../../providers/tests/system/dbt/cloud/example_dbt_cloud.py
:language: python
:dedent: 4
:start-after: [START howto_operator_dbt_cloud_run_job_without_job_id]
:end-before: [END howto_operator_dbt_cloud_run_job_without_job_id]

.. _howto/operator:DbtCloudJobRunSensor:

Poll for status of a dbt Cloud Job run
Expand Down
177 changes: 173 additions & 4 deletions providers/src/airflow/providers/dbt/cloud/hooks/dbt.py
Original file line number Diff line number Diff line change
Expand Up @@ -135,6 +135,10 @@ class DbtCloudJobRunException(AirflowException):
"""An exception that indicates a job run failed to complete."""


class DbtCloudResourceLookupError(AirflowException):
"""Exception raised when a dbt Cloud resource cannot be uniquely identified."""


T = TypeVar("T", bound=Any)


Expand Down Expand Up @@ -356,14 +360,23 @@ def get_account(self, account_id: int | None = None) -> Response:
return self._run_and_get_response(endpoint=f"{account_id}/")

@fallback_to_default_account
def list_projects(self, account_id: int | None = None) -> list[Response]:
def list_projects(
self, account_id: int | None = None, name_contains: str | None = None
) -> list[Response]:
"""
Retrieve metadata for all projects tied to a specified dbt Cloud account.

:param account_id: Optional. The ID of a dbt Cloud account.
:param name_contains: Optional. The case-insensitive substring of a dbt Cloud project name to filter by.
:return: List of request responses.
"""
return self._run_and_get_response(endpoint=f"{account_id}/projects/", paginate=True, api_version="v3")
payload = {"name__icontains": name_contains} if name_contains else None
return self._run_and_get_response(
endpoint=f"{account_id}/projects/",
payload=payload,
paginate=True,
api_version="v3",
)

@fallback_to_default_account
def get_project(self, project_id: int, account_id: int | None = None) -> Response:
Expand All @@ -376,27 +389,126 @@ def get_project(self, project_id: int, account_id: int | None = None) -> Respons
"""
return self._run_and_get_response(endpoint=f"{account_id}/projects/{project_id}/", api_version="v3")

@fallback_to_default_account
def get_project_by_name(self, project_name: str, account_id: int | None = None) -> dict:
"""
Retrieve metadata for a specific project using project_name.

Raises DbtCloudResourceLookupError if the project is not found or cannot be uniquely identified by provided parameters.

:param project_name: The name of a dbt Cloud project.
:param account_id: Optional. The ID of a dbt Cloud account.
:return: The details of a project.
"""
list_projects_responses = self.list_projects(name_contains=project_name, account_id=account_id)
# flatten & filter the list of responses to find the exact match
projects = [
project
for response in list_projects_responses
for project in response.json()["data"]
if project["name"] == project_name
]
if len(projects) != 1:
raise DbtCloudResourceLookupError(f"Found {len(projects)} projects with name `{project_name}`.")
return projects[0]

@fallback_to_default_account
def list_environments(
self, project_id: int, *, name_contains: str | None = None, account_id: int | None = None
) -> list[Response]:
"""
Retrieve metadata for all environments tied to a specified dbt Cloud project.

:param project_id: The ID of a dbt Cloud project.
:param name_contains: Optional. The case-insensitive substring of a dbt Cloud environment name to filter by.
:param account_id: Optional. The ID of a dbt Cloud account.
:return: List of request responses.
"""
payload = {"name__icontains": name_contains} if name_contains else None
return self._run_and_get_response(
endpoint=f"{account_id}/projects/{project_id}/environments/",
payload=payload,
paginate=True,
api_version="v3",
)

@fallback_to_default_account
def get_environment(
self, project_id: int, environment_id: int, *, account_id: int | None = None
) -> Response:
"""
Retrieve metadata for a specific project's environment.

:param project_id: The ID of a dbt Cloud project.
:param environment_id: The ID of a dbt Cloud environment.
:param account_id: Optional. The ID of a dbt Cloud account.
:return: The request response.
"""
return self._run_and_get_response(
endpoint=f"{account_id}/projects/{project_id}/environments/{environment_id}/", api_version="v3"
)

@fallback_to_default_account
def get_environment_by_name(
self, project_id: int, environment_name: str, *, account_id: int | None = None
) -> dict:
"""
Retrieve metadata for a specific project's environment using project_id and environment_name.

Raises DbtCloudResourceLookupError if the environment is not found or cannot be uniquely identified by provided parameters.

:param project_id: The ID of a dbt Cloud project.
:param environment_name: The name of a dbt Cloud environment.
:param account_id: Optional. The ID of a dbt Cloud account.
:return: The details of an environment.
"""
list_environments_responses = self.list_environments(
project_id=project_id, name_contains=environment_name, account_id=account_id
)
# flatten & filter the list of responses to find the exact match
environments = [
env
for response in list_environments_responses
for env in response.json()["data"]
if env["name"] == environment_name
]
if len(environments) != 1:
raise DbtCloudResourceLookupError(
f"Found {len(environments)} environments with name `{environment_name}` in project `{project_id}`."
)
return environments[0]

@fallback_to_default_account
def list_jobs(
self,
account_id: int | None = None,
order_by: str | None = None,
project_id: int | None = None,
environment_id: int | None = None,
name_contains: str | None = None,
) -> list[Response]:
"""
Retrieve metadata for all jobs tied to a specified dbt Cloud account.

If a ``project_id`` is supplied, only jobs pertaining to this project will be retrieved.
If an ``environment_id`` is supplied, only jobs pertaining to this environment will be retrieved.

:param account_id: Optional. The ID of a dbt Cloud account.
:param order_by: Optional. Field to order the result by. Use '-' to indicate reverse order.
For example, to use reverse order by the run ID use ``order_by=-id``.
:param project_id: The ID of a dbt Cloud project.
:param project_id: Optional. The ID of a dbt Cloud project.
:param environment_id: Optional. The ID of a dbt Cloud environment.
:param name_contains: Optional. The case-insensitive substring of a dbt Cloud job name to filter by.
:return: List of request responses.
"""
payload = {"order_by": order_by, "project_id": project_id}
if environment_id:
payload["environment_id"] = environment_id
if name_contains:
payload["name__icontains"] = name_contains
return self._run_and_get_response(
endpoint=f"{account_id}/jobs/",
payload={"order_by": order_by, "project_id": project_id},
payload=payload,
paginate=True,
)

Expand All @@ -411,6 +523,63 @@ def get_job(self, job_id: int, account_id: int | None = None) -> Response:
"""
return self._run_and_get_response(endpoint=f"{account_id}/jobs/{job_id}")

@fallback_to_default_account
def get_job_by_name(
self,
*,
project_id: int | None = None,
project_name: str | None = None,
environment_id: int | None = None,
environment_name: str | None = None,
job_name: str,
account_id: int | None = None,
) -> dict:
"""
Retrieve metadata for a specific job by combination of project, environment, and job name.

Raises DbtCloudResourceLookupError if the job is not found or cannot be uniquely identified by provided parameters.

:param project_id: The ID of the dbt Cloud project. Can be used interchangeably with
project_name. Either project_id or project_name must be provided.
:param project_name: The name of the dbt Cloud project. Can be used interchangeably with
project_id. Either project_id or project_name must be provided.
:param environment_id: The ID of the dbt Cloud environment. Can be used interchangeably with
environment_name. Either environment_id or environment_name must be provided.
:param environment_name: The name of the dbt Cloud environment. Can be used interchangeably with
environment_id. Either environment_id or environment_name must be provided.
:param job_name: The name of the dbt Cloud job to look up. Required.
:param account_id: Optional. The ID of a dbt Cloud account.
:return: The details of a job.
"""
if not project_id:
project_id = self.get_project_by_name(project_name=project_name, account_id=account_id)["id"]

if not environment_id:
environment_id = self.get_environment_by_name(
project_id=project_id, environment_name=environment_name, account_id=account_id
)["id"]

# get job using project_id, environment_id and job_name
list_jobs_responses = self.list_jobs(
project_id=project_id,
environment_id=environment_id,
name_contains=job_name,
account_id=account_id,
)
# flatten & filter the list of responses to find the exact match
jobs = [
job
for response in list_jobs_responses
for job in response.json()["data"]
if job["name"] == job_name
]
if len(jobs) != 1:
raise DbtCloudResourceLookupError(
f"Found {len(jobs)} jobs with name `{job_name}` in environment `{(environment_id or environment_name)}` in project `{(project_id or project_name)}`."
)

return jobs[0]

@fallback_to_default_account
def trigger_job_run(
self,
Expand Down
53 changes: 51 additions & 2 deletions providers/src/airflow/providers/dbt/cloud/operators/dbt.py
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,19 @@ class DbtCloudRunJobOperator(BaseOperator):
:ref:`howto/operator:DbtCloudRunJobOperator`

:param dbt_cloud_conn_id: The connection ID for connecting to dbt Cloud.
:param job_id: The ID of a dbt Cloud job.
:param job_id: The ID of a dbt Cloud job. You can either provide this ID directly, or let
the operator look it up using a combination of project, environment, and job name.
:param job_name: The name of the dbt Cloud job to run. Only used for job lookup when
job_id is not provided. Must be used together with either project_id/project_name
and environment_id/environment_name.
:param project_id: The ID of the dbt Cloud project. Only used for job lookup when
job_id is not provided. Can be used interchangeably with project_name.
:param project_name: The name of the dbt Cloud project. Only used for job lookup when
job_id is not provided. Can be used interchangeably with project_id.
:param environment_id: The ID of the dbt Cloud environment. Only used for job lookup when
job_id is not provided. Can be used interchangeably with environment_name.
:param environment_name: The name of the dbt Cloud environment. Only used for job lookup when
job_id is not provided. Can be used interchangeably with environment_id.
:param account_id: Optional. The ID of a dbt Cloud account.
:param trigger_reason: Optional. Description of the reason to trigger the job.
Defaults to "Triggered via Apache Airflow by task <task_id> in the <dag_id> DAG."
Expand Down Expand Up @@ -86,6 +98,11 @@ class DbtCloudRunJobOperator(BaseOperator):
template_fields = (
"dbt_cloud_conn_id",
"job_id",
"job_name",
"project_id",
"project_name",
"environment_id",
"environment_name",
"account_id",
"trigger_reason",
"steps_override",
Expand All @@ -99,7 +116,12 @@ def __init__(
self,
*,
dbt_cloud_conn_id: str = DbtCloudHook.default_conn_name,
job_id: int,
job_id: int | None = None,
job_name: str | None = None,
project_id: int | None = None,
project_name: str | None = None,
environment_id: int | None = None,
environment_name: str | None = None,
account_id: int | None = None,
trigger_reason: str | None = None,
steps_override: list[str] | None = None,
Expand All @@ -117,6 +139,11 @@ def __init__(
self.dbt_cloud_conn_id = dbt_cloud_conn_id
self.account_id = account_id
self.job_id = job_id
self.job_name = job_name
self.project_id = project_id
self.project_name = project_name
self.environment_id = environment_id
self.environment_name = environment_name
self.trigger_reason = trigger_reason
self.steps_override = steps_override
self.schema_override = schema_override
Expand All @@ -135,6 +162,28 @@ def execute(self, context: Context):
f"Triggered via Apache Airflow by task {self.task_id!r} in the {self.dag.dag_id} DAG."
)

if self.job_id is None:
if not (
(self.project_id or self.project_name)
and (self.environment_id or self.environment_name)
and self.job_name
):
raise ValueError(
"Not enough information to lookup the job_id. "
"To identify a job, you must provide either:\n"
"1. A job_id, or\n"
"2. All of the following:\n"
" - project_id or project_name\n"
" - environment_id or environment_name\n"
" - job_name"
)
self.job_id = self.hook.get_job_by_name(
account_id=self.account_id,
project_name=self.project_name,
environment_name=self.environment_name,
job_name=self.job_name,
)["id"]

non_terminal_runs = None
if self.reuse_existing_run:
non_terminal_runs = self.hook.get_job_runs(
Expand Down
Loading