Skip to content
This repository was archived by the owner on May 17, 2024. It is now read-only.

Add events for dbt feature #380

Merged
merged 2 commits into from
Feb 8, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
72 changes: 56 additions & 16 deletions data_diff/dbt.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
import json
import logging
import os
import time
import rich
import yaml
from dataclasses import dataclass
Expand All @@ -11,6 +12,14 @@
from dbt_artifacts_parser.parser import parse_run_results, parse_manifest
from dbt.config.renderer import ProfileRenderer

from .tracking import (
set_entrypoint_name,
create_end_event_json,
create_start_event_json,
send_event_json,
is_tracking_enabled,
)
from .utils import run_as_daemon, truncate_error
from . import connect_to_table, diff_tables, Algorithm

RUN_RESULTS_PATH = "/target/run_results.json"
Expand All @@ -33,6 +42,7 @@ class DiffVars:
def dbt_diff(
profiles_dir_override: Optional[str] = None, project_dir_override: Optional[str] = None, is_cloud: bool = False
) -> None:
set_entrypoint_name("CLI-dbt")
dbt_parser = DbtParser(profiles_dir_override, project_dir_override, is_cloud)
models = dbt_parser.get_models()
dbt_parser.set_project_dict()
Expand Down Expand Up @@ -190,22 +200,53 @@ def _cloud_diff(diff_vars: DiffVars) -> None:
"Authorization": f"Key {api_key}",
"Content-Type": "application/json",
}
if is_tracking_enabled():
event_json = create_start_event_json({"is_cloud": True, "datasource_id": diff_vars.datasource_id})
run_as_daemon(send_event_json, event_json)

response = requests.request("POST", url, headers=headers, json=payload, timeout=30)
response.raise_for_status()
data = response.json()
diff_id = data["id"]
# TODO in future we should support self hosted datafold
diff_url = f"https://app.datafold.com/datadiffs/{diff_id}/overview"
rich.print(
"[red]"
+ ".".join(diff_vars.dev_path)
+ " <> "
+ ".".join(diff_vars.prod_path)
+ "[/] \n Diff in progress: \n "
+ diff_url
+ "\n"
)
start = time.monotonic()
error = None
diff_id = None
try:
response = requests.request("POST", url, headers=headers, json=payload, timeout=30)
response.raise_for_status()
data = response.json()
diff_id = data["id"]
# TODO in future we should support self hosted datafold
diff_url = f"https://app.datafold.com/datadiffs/{diff_id}/overview"
rich.print(
"[red]"
+ ".".join(diff_vars.dev_path)
+ " <> "
+ ".".join(diff_vars.prod_path)
+ "[/] \n Diff in progress: \n "
+ diff_url
+ "\n"
)
except BaseException as ex: # Catch KeyboardInterrupt too
error = ex
finally:
# we don't currently have much of this information
# but I imagine a future iteration of this _cloud method
# will poll for results
if is_tracking_enabled():
err_message = truncate_error(repr(error))
event_json = create_end_event_json(
is_success=error is None,
runtime_seconds=time.monotonic() - start,
data_source_1_type="",
data_source_2_type="",
table1_count=0,
table2_count=0,
diff_count=0,
error=err_message,
diff_id=diff_id,
is_cloud=True,
)
send_event_json(event_json)

if error:
raise error


class DbtParser:
Expand All @@ -230,7 +271,6 @@ def get_models(self):

dbt_version = parse_version(run_results_obj.metadata.dbt_version)

# TODO 1.4 support
if dbt_version < parse_version(LOWER_DBT_V) or dbt_version >= parse_version(UPPER_DBT_V):
raise Exception(
f"Found dbt: v{dbt_version} Expected the dbt project's version to be >= {LOWER_DBT_V} and < {UPPER_DBT_V}"
Expand Down
10 changes: 1 addition & 9 deletions data_diff/diff_tables.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@

from data_diff.info_tree import InfoTree, SegmentInfo

from .utils import run_as_daemon, safezip, getLogger
from .utils import run_as_daemon, safezip, getLogger, truncate_error
from .thread_utils import ThreadedYielder
from .table_segment import TableSegment
from .tracking import create_end_event_json, create_start_event_json, send_event_json, is_tracking_enabled
Expand All @@ -32,11 +32,6 @@ class Algorithm(Enum):
DiffResult = Iterator[Tuple[str, tuple]] # Iterator[Tuple[Literal["+", "-"], tuple]]


def truncate_error(error: str):
first_line = error.split("\n", 1)[0]
return re.sub("'(.*?)'", "'***'", first_line)


@dataclass
class ThreadBase:
"Provides utility methods for optional threading"
Expand Down Expand Up @@ -124,7 +119,6 @@ def _get_stats(self) -> DiffStats:
return DiffStats(diff_by_sign, table1_count, table2_count, unchanged, diff_percent)

def get_stats_string(self):

diff_stats = self._get_stats()
string_output = ""
string_output += f"{diff_stats.table1_count} rows in table A\n"
Expand All @@ -143,7 +137,6 @@ def get_stats_string(self):
return string_output

def get_stats_dict(self):

diff_stats = self._get_stats()
json_output = {
"rows_A": diff_stats.table1_count,
Expand Down Expand Up @@ -190,7 +183,6 @@ def _diff_tables_wrapper(self, table1: TableSegment, table2: TableSegment, info_
start = time.monotonic()
error = None
try:

# Query and validate schema
table1, table2 = self._threaded_call("with_schema", [table1, table2])
self._validate_and_adjust_columns(table1, table2)
Expand Down
12 changes: 8 additions & 4 deletions data_diff/tracking.py
Original file line number Diff line number Diff line change
Expand Up @@ -85,12 +85,14 @@ def create_start_event_json(diff_options: Dict[str, Any]):
def create_end_event_json(
is_success: bool,
runtime_seconds: float,
db1: str,
db2: str,
data_source_1_type: str,
Copy link
Contributor Author

@dlawin dlawin Feb 6, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I changed these two just because I found the names confusing/misleading

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

non-blocking: I'll need to update downstream reporting

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this just impacts the python code, the actual event is not affected

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

oh I see, still "data_source_1_type" 👍

data_source_2_type: str,
table1_count: int,
table2_count: int,
diff_count: int,
error: Optional[str],
diff_id: Optional[int] = None,
is_cloud: bool = False,
):
return {
"event": "os_diff_run_end",
Expand All @@ -100,14 +102,16 @@ def create_end_event_json(
"time": time(),
"is_success": is_success,
"runtime_seconds": runtime_seconds,
"data_source_1_type": db1,
"data_source_2_type": db2,
"data_source_1_type": data_source_1_type,
"data_source_2_type": data_source_2_type,
"table_1_rows_cnt": table1_count,
"table_2_rows_cnt": table2_count,
"diff_rows_cnt": diff_count,
"error_message": error,
"data_diff_version:": __version__,
"entrypoint_name": entrypoint_name,
"is_cloud": is_cloud,
"diff_id": diff_id,
},
}

Expand Down
5 changes: 5 additions & 0 deletions data_diff/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -72,3 +72,8 @@ def get_timestamp(_match):
return datetime.now().isoformat("_", "seconds").replace(":", "_")

return re.sub("%t", get_timestamp, name)


def truncate_error(error: str):
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

moved this from diff_tables.py so I can reuse it

first_line = error.split("\n", 1)[0]
return re.sub("'(.*?)'", "'***'", first_line)