Skip to content
This repository was archived by the owner on May 17, 2024. It is now read-only.

Compound keys implementation, using product order #375

Merged
merged 5 commits into from
Feb 28, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 6 additions & 6 deletions data_diff/__init__.py
Original file line number Diff line number Diff line change
@@ -1,14 +1,14 @@
from typing import Sequence, Tuple, Iterator, Optional, Union

from sqeleton.abcs import DbKey, DbTime, DbPath
from sqeleton.abcs import DbTime, DbPath

from .tracking import disable_tracking
from .databases import connect
from .diff_tables import Algorithm
from .hashdiff_tables import HashDiffer, DEFAULT_BISECTION_THRESHOLD, DEFAULT_BISECTION_FACTOR
from .joindiff_tables import JoinDiffer, TABLE_WRITE_LIMIT
from .table_segment import TableSegment
from .utils import eval_name_template
from .utils import eval_name_template, Vector


def connect_to_table(
Expand Down Expand Up @@ -51,8 +51,8 @@ def diff_tables(
# Extra columns to compare
extra_columns: Tuple[str, ...] = None,
# Start/end key_column values, used to restrict the segment
min_key: DbKey = None,
max_key: DbKey = None,
min_key: Vector = None,
max_key: Vector = None,
# Start/end update_column values, used to restrict the segment
min_update: DbTime = None,
max_update: DbTime = None,
Expand Down Expand Up @@ -87,8 +87,8 @@ def diff_tables(
update_column (str, optional): Name of updated column, which signals that rows changed.
Usually updated_at or last_update. Used by `min_update` and `max_update`.
extra_columns (Tuple[str, ...], optional): Extra columns to compare
min_key (:data:`DbKey`, optional): Lowest key value, used to restrict the segment
max_key (:data:`DbKey`, optional): Highest key value, used to restrict the segment
min_key (:data:`Vector`, optional): Lowest key value, used to restrict the segment
max_key (:data:`Vector`, optional): Highest key value, used to restrict the segment
min_update (:data:`DbTime`, optional): Lowest update_column value, used to restrict the segment
max_update (:data:`DbTime`, optional): Highest update_column value, used to restrict the segment
threaded (bool): Enable/disable threaded diffing. Needed to take advantage of database threads.
Expand Down
2 changes: 2 additions & 0 deletions data_diff/dbt.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@

import requests


def import_dbt():
try:
from dbt_artifacts_parser.parser import parse_run_results, parse_manifest
Expand All @@ -19,6 +20,7 @@ def import_dbt():

return parse_run_results, parse_manifest, ProfileRenderer, yaml


from .tracking import (
set_entrypoint_name,
create_end_event_json,
Expand Down
83 changes: 47 additions & 36 deletions data_diff/diff_tables.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,9 +14,9 @@

from data_diff.info_tree import InfoTree, SegmentInfo

from .utils import run_as_daemon, safezip, getLogger, truncate_error
from .utils import run_as_daemon, safezip, getLogger, truncate_error, Vector
from .thread_utils import ThreadedYielder
from .table_segment import TableSegment
from .table_segment import TableSegment, create_mesh_from_points
from .tracking import create_end_event_json, create_start_event_json, send_event_json, is_tracking_enabled
from sqeleton.abcs import IKey

Expand Down Expand Up @@ -135,7 +135,6 @@ def _get_stats(self, is_dbt: bool = False) -> DiffStats:

return DiffStats(diff_by_sign, table1_count, table2_count, unchanged, diff_percent, extra_column_diffs)


def get_stats_string(self, is_dbt: bool = False):
diff_stats = self._get_stats(is_dbt)

Expand Down Expand Up @@ -271,63 +270,75 @@ def _diff_segments(
):
...

def _bisect_and_diff_tables(self, table1, table2, info_tree):
if len(table1.key_columns) > 1:
raise NotImplementedError("Composite key not supported yet!")
if len(table2.key_columns) > 1:
raise NotImplementedError("Composite key not supported yet!")
def _bisect_and_diff_tables(self, table1: TableSegment, table2: TableSegment, info_tree):
if len(table1.key_columns) != len(table2.key_columns):
raise ValueError("Tables should have an equivalent number of key columns!")
(key1,) = table1.key_columns
(key2,) = table2.key_columns

key_type = table1._schema[key1]
key_type2 = table2._schema[key2]
if not isinstance(key_type, IKey):
raise NotImplementedError(f"Cannot use column of type {key_type} as a key")
if not isinstance(key_type2, IKey):
raise NotImplementedError(f"Cannot use column of type {key_type2} as a key")
if key_type.python_type is not key_type2.python_type:
raise TypeError(f"Incompatible key types: {key_type} and {key_type2}")

key_types1 = [table1._schema[i] for i in table1.key_columns]
key_types2 = [table2._schema[i] for i in table2.key_columns]

for kt in key_types1 + key_types2:
if not isinstance(kt, IKey):
raise NotImplementedError(f"Cannot use a column of type {kt} as a key")

for kt1, kt2 in safezip(key_types1, key_types2):
if kt1.python_type is not kt2.python_type:
raise TypeError(f"Incompatible key types: {kt1} and {kt2}")

# Query min/max values
key_ranges = self._threaded_call_as_completed("query_key_range", [table1, table2])

# Start with the first completed value, so we don't waste time waiting
min_key1, max_key1 = self._parse_key_range_result(key_type, next(key_ranges))
min_key1, max_key1 = self._parse_key_range_result(key_types1, next(key_ranges))

table1, table2 = [t.new(min_key=min_key1, max_key=max_key1) for t in (table1, table2)]
btable1, btable2 = [t.new_key_bounds(min_key=min_key1, max_key=max_key1) for t in (table1, table2)]

logger.info(
f"Diffing segments at key-range: {table1.min_key}..{table2.max_key}. "
f"size: table1 <= {table1.approximate_size()}, table2 <= {table2.approximate_size()}"
f"Diffing segments at key-range: {btable1.min_key}..{btable2.max_key}. "
f"size: table1 <= {btable1.approximate_size()}, table2 <= {btable2.approximate_size()}"
)

ti = ThreadedYielder(self.max_threadpool_size)
# Bisect (split) the table into segments, and diff them recursively.
ti.submit(self._bisect_and_diff_segments, ti, table1, table2, info_tree)
ti.submit(self._bisect_and_diff_segments, ti, btable1, btable2, info_tree)

# Now we check for the second min-max, to diff the portions we "missed".
min_key2, max_key2 = self._parse_key_range_result(key_type, next(key_ranges))
# This is achieved by subtracting the table ranges, and dividing the resulting space into aligned boxes.
# For example, given tables A & B, and a 2D compound key, where A was queried first for key-range,
# the regions of B we need to diff in this second pass are marked by B1..8:
# ┌──┬──────┬──┐
# │B1│ B2 │B3│
# ├──┼──────┼──┤
# │B4│ A │B5│
# ├──┼──────┼──┤
# │B6│ B7 │B8│
# └──┴──────┴──┘
# Overall, the max number of new regions in this 2nd pass is 3^|k| - 1

if min_key2 < min_key1:
pre_tables = [t.new(min_key=min_key2, max_key=min_key1) for t in (table1, table2)]
ti.submit(self._bisect_and_diff_segments, ti, *pre_tables, info_tree)
min_key2, max_key2 = self._parse_key_range_result(key_types1, next(key_ranges))

if max_key2 > max_key1:
post_tables = [t.new(min_key=max_key1, max_key=max_key2) for t in (table1, table2)]
ti.submit(self._bisect_and_diff_segments, ti, *post_tables, info_tree)
points = [list(sorted(p)) for p in safezip(min_key1, min_key2, max_key1, max_key2)]
box_mesh = create_mesh_from_points(*points)

new_regions = [(p1, p2) for p1, p2 in box_mesh if p1 < p2 and not (p1 >= min_key1 and p2 <= max_key1)]

for p1, p2 in new_regions:
extra_tables = [t.new_key_bounds(min_key=p1, max_key=p2) for t in (table1, table2)]
ti.submit(self._bisect_and_diff_segments, ti, *extra_tables, info_tree)

return ti

def _parse_key_range_result(self, key_type, key_range):
mn, mx = key_range
cls = key_type.make_value
def _parse_key_range_result(self, key_types, key_range) -> Tuple[Vector, Vector]:
min_key_values, max_key_values = key_range

# We add 1 because our ranges are exclusive of the end (like in Python)
try:
return cls(mn), cls(mx) + 1
min_key = Vector(key_type.make_value(mn) for key_type, mn in safezip(key_types, min_key_values))
max_key = Vector(key_type.make_value(mx) + 1 for key_type, mx in safezip(key_types, max_key_values))
except (TypeError, ValueError) as e:
raise type(e)(f"Cannot apply {key_type} to '{mn}', '{mx}'.") from e
raise type(e)(f"Cannot apply {key_types} to '{min_key_values}', '{max_key_values}'.") from e

return min_key, max_key

def _bisect_and_diff_segments(
self,
Expand Down
Loading