Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
21 changes: 20 additions & 1 deletion metadata-ingestion/src/datahub/ingestion/source/dbt/dbt_cloud.py
Original file line number Diff line number Diff line change
Expand Up @@ -347,7 +347,10 @@ def load_nodes(self) -> Tuple[List[DBTNode], Dict[str, Optional[str]]]:
"runId": self.config.run_id,
},
)

if node_type == "tests":
logger.debug(f"Tests data: {data}")
logger.debug(f"Tests data: {data['job'][node_type]}")
logger.debug(f"Tests data length: {len(data['job'][node_type])}")
raw_nodes.extend(data["job"][node_type])

nodes = [self._parse_into_dbt_node(node) for node in raw_nodes]
Expand Down Expand Up @@ -446,32 +449,45 @@ def _parse_into_dbt_node(self, node: Dict) -> DBTNode:
test_info = None
test_result = None
if node["resourceType"] == "test":
logger.debug(f"Parsing test node: {key}, name={name}")
qualified_test_name = name

# The qualified test name should be the test name from the dbt project.
# It can be simple (e.g. 'unique') or prefixed (e.g. 'dbt_expectations.expect_column_values_to_not_be_null').
# We attempt to guess the test name based on the macros used.
logger.debug(f"Test node {key} dependsOn: {node['dependsOn']}")
for dependency in node["dependsOn"]:
# An example dependsOn list could be:
# ['model.sample_dbt.monthly_billing_with_cust', 'macro.dbt.test_not_null', 'macro.dbt.get_where_subquery']
# In that case, the test should be `not_null`.

if dependency.startswith("macro."):
_, macro = dependency.split(".", 1)
logger.debug(f"Found macro dependency: {macro}")
if macro.startswith("dbt."):
if not macro.startswith("dbt.test_"):
logger.debug(f"Skipping non-test macro: {macro}")
continue
macro = macro[len("dbt.test_") :]
logger.debug(f"Extracted test name from dbt macro: {macro}")

qualified_test_name = macro
logger.debug(f"Using qualified test name: {qualified_test_name}")
break

logger.debug(
f"Creating DBTTest with qualified_test_name={qualified_test_name}, "
f"column_name={node['columnName']}"
)
test_info = DBTTest(
qualified_test_name=qualified_test_name,
column_name=node["columnName"],
kw_args={}, # TODO: dbt Cloud doesn't expose the args.
)
if not node["skip"]:
logger.debug(
f"Creating test result for non-skipped test: {key}, status={node['status']}"
)
test_result = DBTTestResult(
invocation_id=f"job{node['jobId']}-run{node['runId']}",
execution_time=datetime.now(), # TODO: dbt Cloud doesn't expose this.
Expand All @@ -489,6 +505,9 @@ def _parse_into_dbt_node(self, node: Dict) -> DBTNode:
}
},
)
logger.debug(f"Created test result: {test_result}")
else:
logger.debug(f"Test {key} is skipped, not creating test result")

return DBTNode(
dbt_name=key,
Expand Down
45 changes: 40 additions & 5 deletions metadata-ingestion/src/datahub/ingestion/source/dbt/dbt_common.py
Original file line number Diff line number Diff line change
Expand Up @@ -897,16 +897,24 @@ def create_test_entity_mcps(
extra_custom_props: Dict[str, str],
all_nodes_map: Dict[str, DBTNode],
) -> Iterable[MetadataChangeProposalWrapper]:
logger.debug(f"Creating test entity MCPs for {len(test_nodes)} test nodes")
for node in sorted(test_nodes, key=lambda n: n.dbt_name):
logger.debug(f"Processing test node: {node.dbt_name}")
upstreams = get_upstreams_for_test(
test_node=node,
all_nodes_map=all_nodes_map,
platform_instance=self.config.platform_instance,
environment=self.config.env,
)
logger.debug(
f"Found {len(upstreams)} upstreams for test {node.dbt_name}: {list(upstreams.keys())}"
)

# In case a dbt test depends on multiple tables, we create separate assertions for each.
for upstream_node_name, upstream_urn in upstreams.items():
logger.debug(
f"Creating assertion for test {node.dbt_name} on upstream {upstream_node_name}"
)
guid_upstream_part = {}
if len(upstreams) > 1:
# If we depend on multiple upstreams, we need to generate a unique guid for each assertion.
Expand Down Expand Up @@ -946,6 +954,9 @@ def create_test_entity_mcps(
}

if self.config.entities_enabled.can_emit_test_definitions:
logger.debug(
f"Emitting test definition for {node.dbt_name} with assertion_urn={assertion_urn}"
)
yield MetadataChangeProposalWrapper(
entityUrn=assertion_urn,
aspect=self._make_data_platform_instance_aspect(),
Expand All @@ -957,9 +968,19 @@ def create_test_entity_mcps(
assertion_urn,
upstream_urn,
)
else:
logger.debug(
f"Skipping test definition emission for {node.dbt_name} (disabled)"
)

for test_result in node.test_results:
logger.debug(
f"Processing test result for {node.dbt_name}: {test_result.invocation_id}, status={test_result.status}"
)
if self.config.entities_enabled.can_emit_test_results:
logger.debug(
f"Emitting test result for {node.dbt_name} ({test_result.invocation_id})"
)
yield make_assertion_result_from_test(
node,
test_result,
Expand Down Expand Up @@ -1025,6 +1046,9 @@ def get_workunits_internal(
test_nodes = [test_node for test_node in nodes if test_node.node_type == "test"]

logger.info(f"Creating dbt metadata for {len(nodes)} nodes")
logger.debug(
f"Found {len(test_nodes)} test nodes and {len(non_test_nodes)} non-test nodes"
)
yield from self.create_dbt_platform_mces(
non_test_nodes,
additional_custom_props_filtered,
Expand All @@ -1034,11 +1058,15 @@ def get_workunits_internal(
logger.info(f"Updating {self.config.target_platform} metadata")
yield from self.create_target_platform_mces(non_test_nodes)

yield from self.create_test_entity_mcps(
test_nodes,
additional_custom_props_filtered,
all_nodes_map,
)
if test_nodes:
logger.debug(f"Creating test entity MCPs for {len(test_nodes)} test nodes")
yield from self.create_test_entity_mcps(
test_nodes,
additional_custom_props_filtered,
all_nodes_map,
)
else:
logger.debug("No test nodes found, skipping test entity MCP creation")

def _is_allowed_node(self, node: DBTNode) -> bool:
"""
Expand Down Expand Up @@ -1083,15 +1111,22 @@ def _is_allowed_materialized_node(self, node: DBTNode) -> bool:

def _filter_nodes(self, all_nodes: List[DBTNode]) -> List[DBTNode]:
nodes: List[DBTNode] = []
test_nodes_filtered = 0
for node in all_nodes:
key = node.dbt_name

if not self._is_allowed_node(node):
self.report.nodes_filtered.append(key)
if node.node_type == "test":
test_nodes_filtered += 1
logger.debug(f"Filtered out test node: {key}")
continue

nodes.append(node)

logger.debug(
f"Filtered {test_nodes_filtered} test nodes out of {len(all_nodes)} total nodes"
)
return nodes

def _drop_duplicate_sources(self, original_nodes: List[DBTNode]) -> List[DBTNode]:
Expand Down
29 changes: 29 additions & 0 deletions metadata-ingestion/src/datahub/ingestion/source/dbt/dbt_tests.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import json
import logging
import re
from dataclasses import dataclass
from datetime import datetime
Expand All @@ -25,6 +26,8 @@
if TYPE_CHECKING:
from datahub.ingestion.source.dbt.dbt_common import DBTNode

logger = logging.getLogger(__name__)


@dataclass
class DBTTest:
Expand Down Expand Up @@ -165,8 +168,19 @@ def make_assertion_from_test(
column_name = node.test_info.column_name
kw_args = node.test_info.kw_args

logger.debug(
f"Creating assertion from test: node={node.dbt_name}, "
f"qualified_test_name={qualified_test_name}, column_name={column_name}, "
f"kw_args={kw_args}, assertion_urn={assertion_urn}, upstream_urn={upstream_urn}"
)

if qualified_test_name in _DBT_TEST_NAME_TO_ASSERTION_MAP:
assertion_params = _DBT_TEST_NAME_TO_ASSERTION_MAP[qualified_test_name]
logger.debug(
f"Using mapped assertion for test {qualified_test_name}: "
f"scope={assertion_params.scope}, operator={assertion_params.operator}, "
f"aggregation={assertion_params.aggregation}"
)
assertion_info = AssertionInfoClass(
type=AssertionTypeClass.DATASET,
customProperties=extra_custom_props,
Expand Down Expand Up @@ -200,6 +214,10 @@ def make_assertion_from_test(
)
elif column_name:
# no match with known test types, column-level test
logger.debug(
f"Creating column-level native assertion for test {qualified_test_name} "
f"on column {column_name}"
)
assertion_info = AssertionInfoClass(
type=AssertionTypeClass.DATASET,
customProperties=extra_custom_props,
Expand All @@ -216,6 +234,10 @@ def make_assertion_from_test(
)
else:
# no match with known test types, default to row-level test
logger.debug(
f"Creating row-level native assertion for test {qualified_test_name} "
f"(no column specified)"
)
assertion_info = AssertionInfoClass(
type=AssertionTypeClass.DATASET,
customProperties=extra_custom_props,
Expand Down Expand Up @@ -243,6 +265,13 @@ def make_assertion_result_from_test(
upstream_urn: str,
test_warnings_are_errors: bool,
) -> MetadataChangeProposalWrapper:
logger.debug(
f"Creating assertion result from test: node={node.dbt_name}, "
f"status={test_result.status}, invocation_id={test_result.invocation_id}, "
f"test_warnings_are_errors={test_warnings_are_errors}, "
f"native_results={test_result.native_results}"
)

assertionResult = AssertionRunEventClass(
timestampMillis=int(test_result.execution_time.timestamp() * 1000.0),
assertionUrn=assertion_urn,
Expand Down
Loading