Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
161 changes: 85 additions & 76 deletions ingestion/src/metadata/ingestion/source/pipeline/dbtcloud/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@
"""

import traceback
from typing import List, Optional
from typing import Iterable, List, Optional, Tuple

from metadata.generated.schema.entity.services.connections.pipeline.dbtCloudConnection import (
DBTCloudConnection,
Expand All @@ -22,13 +22,13 @@
from metadata.ingestion.source.pipeline.dbtcloud.models import (
DBTJob,
DBTJobList,
DBTModel,
DBTModelList,
DBTRun,
DBTRunList,
)
from metadata.ingestion.source.pipeline.dbtcloud.queries import (
DBT_GET_MODEL_DEPENDS_ON,
DBT_GET_MODELS_SEEDS,
DBT_GET_MODELS_WITH_LINEAGE,
)
from metadata.utils.constants import AUTHORIZATION_HEADER
from metadata.utils.helpers import clean_uri
Expand Down Expand Up @@ -69,14 +69,15 @@
self.graphql_client = REST(graphql_client_config)

def _get_jobs(
self, job_id: str = None, project_id: str = None
) -> Optional[List[DBTJob]]:
self,
job_id: str = None,
project_id: str = None,
) -> Iterable[DBTJob]:
"""
fetch jobs for an account in dbt cloud
Fetch jobs for an account in dbt cloud
"""
job_list = []
# we will get 100 jobs at a time
query_params = {"offset": 0, "limit": 100}

try:
job_path = f"{job_id}/" if job_id else ""
project_path = f"?project_id={project_id}" if project_id else ""
Expand All @@ -86,10 +87,10 @@
)

if job_id:
job_list = [DBTJob.model_validate(result["data"])]
yield DBTJob.model_validate(result["data"])
else:
job_list_response = DBTJobList.model_validate(result)
job_list = job_list_response.Jobs
yield from job_list_response.Jobs

while job_list_response.extra and job_list_response.extra.pagination:
total_count = job_list_response.extra.pagination.total_count
Expand All @@ -104,14 +105,13 @@
data=query_params,
)
job_list_response = DBTJobList.model_validate(result)
job_list.extend(job_list_response.Jobs)
yield from job_list_response.Jobs

except Exception as exc:
logger.debug(traceback.format_exc())
logger.error(
f"Failed to get job info for project_id: `{project_id}` or job_id: `{job_id}` : {exc}"
)
return job_list

def test_get_jobs(self) -> List[DBTJob]:
"""
Expand All @@ -128,47 +128,68 @@
run_list = DBTRunList.model_validate(result).Runs
return run_list

def get_jobs(self) -> Optional[List[DBTJob]]:
def get_jobs(self) -> Iterable[DBTJob]:

Check failure on line 131 in ingestion/src/metadata/ingestion/source/pipeline/dbtcloud/client.py

View check run for this annotation

SonarQubeCloud / [open-metadata-ingestion] SonarCloud Code Analysis

Refactor this function to reduce its Cognitive Complexity from 22 to the 15 allowed.

See more on https://sonarcloud.io/project/issues?id=open-metadata-ingestion&issues=AZrwU5OiGGKBy2om_hD_&open=AZrwU5OiGGKBy2om_hD_&pullRequest=24722
"""
list jobs for an account in dbt cloud
List jobs for an account in dbt cloud using generator pattern.
yields job one at a time for memory efficiency.
"""
try:
jobs = []
# case when job_ids are specified and project_ids are not
if self.job_ids and not self.project_ids:
for job_id in self.job_ids:
jobs.extend(self._get_jobs(job_id=job_id))
yield from self._get_jobs(job_id=job_id)
# case when project_ids are specified or both are specified
elif self.project_ids:
for project_id in self.project_ids:
results = self._get_jobs(project_id=project_id)
if self.job_ids:
jobs.extend(
[
result
for result in results
if str(result.id) in self.job_ids
]
)
else:
jobs.extend(results)
for job in self._get_jobs(project_id=project_id):
if self.job_ids:
if str(job.id) in self.job_ids:
yield job
else:
yield job
else:
results = self._get_jobs()
jobs.extend(results)
return jobs
yield from self._get_jobs()
except Exception as exc:
logger.debug(traceback.format_exc())
logger.error(f"Unable to get job info :{exc}")

return None
def get_latest_successful_run_id(self, job_id: int) -> Optional[int]:
"""
Get the latest successful run ID for a given job.
"""
try:
query_params = {
"job_definition_id": job_id,
"order_by": "-finished_at",
"limit": "1",
"status": "10", # 10 = Success in dbt Cloud API
}

def get_runs(self, job_id: int) -> Optional[List[DBTRun]]:
result = self.client.get(
f"/accounts/{self.config.accountId}/runs/", data=query_params
)
run_list_response = DBTRunList.model_validate(result)

if run_list_response.Runs:
return run_list_response.Runs[0].id

return None

except Exception as exc:
logger.debug(traceback.format_exc())
logger.warning(
f"Unable to get latest successful run for job {job_id}: {exc}"
)
return None

def get_runs(self, job_id: int) -> Iterable[DBTRun]:

Check failure on line 185 in ingestion/src/metadata/ingestion/source/pipeline/dbtcloud/client.py

View check run for this annotation

SonarQubeCloud / [open-metadata-ingestion] SonarCloud Code Analysis

Refactor this function to reduce its Cognitive Complexity from 21 to the 15 allowed.

See more on https://sonarcloud.io/project/issues?id=open-metadata-ingestion&issues=AZrwU5OiGGKBy2om_hEA&open=AZrwU5OiGGKBy2om_hEA&pullRequest=24722
"""
list runs for a job in dbt cloud
List runs for a job in dbt cloud using generator pattern.
yields run one at a time for memory efficiency.
"""
try:
number_of_runs = self.config.numberOfRuns
runs = []
yielded_count = 0
# we will get 100 runs at a time and order by created_at in descending order
query_params = {
"job_definition_id": job_id,
Expand All @@ -181,13 +202,17 @@
f"/accounts/{self.config.accountId}/runs/", data=query_params
)
run_list_response = DBTRunList.model_validate(result)
runs.extend(run_list_response.Runs)

while (
(number_of_runs is None or len(runs) < number_of_runs)
and run_list_response.extra
and run_list_response.extra.pagination
):
for run in run_list_response.Runs or []:
if number_of_runs is not None and yielded_count >= number_of_runs:
return
yield run
yielded_count += 1

while run_list_response.extra and run_list_response.extra.pagination:
if number_of_runs is not None and yielded_count >= number_of_runs:
return

total_count = run_list_response.extra.pagination.total_count
current_count = run_list_response.extra.pagination.count

Expand All @@ -200,60 +225,44 @@
data=query_params,
)
run_list_response = DBTRunList.model_validate(result)
runs.extend(run_list_response.Runs)

return runs[:number_of_runs] if number_of_runs is not None else runs
for run in run_list_response.Runs or []:
if number_of_runs is not None and yielded_count >= number_of_runs:
return
yield run
yielded_count += 1

except Exception as exc:
logger.debug(traceback.format_exc())
logger.warning(f"Unable to get run info :{exc}")

return None

def get_model_details(self, job_id: int, run_id: int):
def get_models_with_lineage(
self, job_id: int, run_id: int
) -> Tuple[
Optional[List[DBTModel]], Optional[List[DBTModel]], Optional[List[DBTModel]]
]:
"""
get model details for a job in dbt cloud for lineage
Get models with dependsOn and seeds in a single GraphQL call.
"""
try:
query_params = {
"query": DBT_GET_MODEL_DEPENDS_ON,
"variables": {"jobId": job_id, "runId": run_id},
}

result = self.graphql_client.post("", json=query_params)

if result.get("data") and result["data"].get("job"):
model_list = DBTModelList.model_validate(result["data"]["job"]).models
logger.debug(
f"Successfully fetched models from dbt for job_id:{job_id} run_id:{run_id}: {model_list}"
)
return model_list

except Exception as exc:
logger.debug(traceback.format_exc())
logger.warning(f"Unable to get model info :{exc}")
return None

def get_models_and_seeds_details(self, job_id: int, run_id: int):
"""
get parent model details for a job in dbt cloud for lineage
"""
try:
query_params = {
"query": DBT_GET_MODELS_SEEDS,
"query": DBT_GET_MODELS_WITH_LINEAGE,
"variables": {"jobId": job_id, "runId": run_id},
}

result = self.graphql_client.post("", json=query_params)

if result.get("data") and result["data"].get("job"):
result = DBTModelList.model_validate(result["data"]["job"])
parents_list = result.models + result.seeds
model_list = DBTModelList.model_validate(result["data"]["job"])
logger.debug(
f"Successfully fetched parent models from dbt for job_id:{job_id} run_id:{run_id}: {parents_list}"
f"Successfully fetched models and seeds from dbt for "
f"job_id:{job_id} run_id:{run_id}: "
f"models={len(model_list.models or [])}, seeds={len(model_list.seeds or [])}, sources={len(model_list.sources or [])}"
)
return parents_list
return model_list.models, model_list.seeds, model_list.sources

except Exception as exc:
logger.debug(traceback.format_exc())
logger.warning(f"Unable to get parents model info :{exc}")
return None
logger.warning(f"Unable to get models with lineage info: {exc}")
return None, None
Loading
Loading