Skip to content
This repository was archived by the owner on May 17, 2024. It is now read-only.
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 6 additions & 6 deletions data_diff/__main__.py
Original file line number Diff line number Diff line change
Expand Up @@ -374,6 +374,7 @@ def _main(
]

diff_iter = differ.diff_tables(*segments)
info = diff_iter.info_tree.info

if limit:
diff_iter = islice(diff_iter, int(limit))
Expand All @@ -395,9 +396,8 @@ def _main(
for sign in diff_by_key.values():
diff_by_sign[sign] += 1

table1_count = differ.stats.pop("table1_count")
table2_count = differ.stats.pop("table2_count")
del differ.stats["diff_count"]
table1_count = info.rowcounts[1]
table2_count = info.rowcounts[2]
unchanged = table1_count - diff_by_sign["-"] - diff_by_sign["!"]
diff_percent = 1 - unchanged / max(table1_count, table2_count)

Expand All @@ -423,9 +423,9 @@ def _main(
rich.print(f"{100*diff_percent:.2f}% difference score")

if differ.stats:
print("Extra-Info:")
for k, v in differ.stats.items():
print(f" {k} = {v}")
print("\nExtra-Info:")
for k, v in sorted(differ.stats.items()):
rich.print(f" {k} = {v}")
else:
for op, values in diff_iter:
color = COLOR_SCHEME[op]
Expand Down
56 changes: 41 additions & 15 deletions data_diff/diff_tables.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,11 +7,13 @@
from enum import Enum
from contextlib import contextmanager
from operator import methodcaller
from typing import Tuple, Iterator, Optional
from typing import Iterable, Tuple, Iterator, Optional
from concurrent.futures import ThreadPoolExecutor, as_completed

from runtype import dataclass

from data_diff.info_tree import InfoTree, SegmentInfo

from .utils import run_as_daemon, safezip, getLogger
from .thread_utils import ThreadedYielder
from .table_segment import TableSegment
Expand Down Expand Up @@ -76,11 +78,20 @@ def _run_in_background(self, *funcs):
f.result()


@dataclass
class DiffResultWrapper:
diff: iter # DiffResult
info_tree: InfoTree

def __iter__(self):
return iter(self.diff)


class TableDiffer(ThreadBase, ABC):
bisection_factor = 32
stats: dict = {}

def diff_tables(self, table1: TableSegment, table2: TableSegment) -> DiffResult:
def diff_tables(self, table1: TableSegment, table2: TableSegment, info_tree: InfoTree = None) -> DiffResultWrapper:
"""Diff the given tables.

Parameters:
Expand All @@ -93,14 +104,17 @@ def diff_tables(self, table1: TableSegment, table2: TableSegment) -> DiffResult:
('+', row) for items in table2 but not in table1.
Where `row` is a tuple of values, corresponding to the diffed columns.
"""
if info_tree is None:
info_tree = InfoTree(SegmentInfo([table1, table2]))
return DiffResultWrapper(self._diff_tables_wrapper(table1, table2, info_tree), info_tree)

def _diff_tables_wrapper(self, table1: TableSegment, table2: TableSegment, info_tree: InfoTree) -> DiffResult:
if is_tracking_enabled():
options = dict(self)
options["differ_name"] = type(self).__name__
event_json = create_start_event_json(options)
run_as_daemon(send_event_json, event_json)

self.stats["diff_count"] = 0
start = time.monotonic()
error = None
try:
Expand All @@ -109,16 +123,18 @@ def diff_tables(self, table1: TableSegment, table2: TableSegment) -> DiffResult:
table1, table2 = self._threaded_call("with_schema", [table1, table2])
self._validate_and_adjust_columns(table1, table2)

yield from self._diff_tables(table1, table2)
yield from self._diff_tables_root(table1, table2, info_tree)

except BaseException as e: # Catch KeyboardInterrupt too
error = e
finally:
info_tree.aggregate_info()

if is_tracking_enabled():
runtime = time.monotonic() - start
table1_count = self.stats.get("table1_count")
table2_count = self.stats.get("table2_count")
diff_count = self.stats.get("diff_count")
table1_count = info_tree.info.rowcounts[1]
table2_count = info_tree.info.rowcounts[2]
diff_count = info_tree.info.diff_count
err_message = truncate_error(repr(error))
event_json = create_end_event_json(
error is None,
Expand All @@ -138,23 +154,24 @@ def diff_tables(self, table1: TableSegment, table2: TableSegment) -> DiffResult:
def _validate_and_adjust_columns(self, table1: TableSegment, table2: TableSegment) -> DiffResult:
pass

def _diff_tables(self, table1: TableSegment, table2: TableSegment) -> DiffResult:
return self._bisect_and_diff_tables(table1, table2)
def _diff_tables_root(self, table1: TableSegment, table2: TableSegment, info_tree: InfoTree) -> DiffResult:
return self._bisect_and_diff_tables(table1, table2, info_tree)

@abstractmethod
def _diff_segments(
self,
ti: ThreadedYielder,
table1: TableSegment,
table2: TableSegment,
info_tree: InfoTree,
max_rows: int,
level=0,
segment_index=None,
segment_count=None,
):
...

def _bisect_and_diff_tables(self, table1, table2):
def _bisect_and_diff_tables(self, table1, table2, info_tree):
if len(table1.key_columns) > 1:
raise NotImplementedError("Composite key not supported yet!")
if len(table2.key_columns) > 1:
Expand Down Expand Up @@ -185,18 +202,18 @@ def _bisect_and_diff_tables(self, table1, table2):

ti = ThreadedYielder(self.max_threadpool_size)
# Bisect (split) the table into segments, and diff them recursively.
ti.submit(self._bisect_and_diff_segments, ti, table1, table2)
ti.submit(self._bisect_and_diff_segments, ti, table1, table2, info_tree)

# Now we check for the second min-max, to diff the portions we "missed".
min_key2, max_key2 = self._parse_key_range_result(key_type, next(key_ranges))

if min_key2 < min_key1:
pre_tables = [t.new(min_key=min_key2, max_key=min_key1) for t in (table1, table2)]
ti.submit(self._bisect_and_diff_segments, ti, *pre_tables)
ti.submit(self._bisect_and_diff_segments, ti, *pre_tables, info_tree)

if max_key2 > max_key1:
post_tables = [t.new(min_key=max_key1, max_key=max_key2) for t in (table1, table2)]
ti.submit(self._bisect_and_diff_segments, ti, *post_tables)
ti.submit(self._bisect_and_diff_segments, ti, *post_tables, info_tree)

return ti

Expand All @@ -210,7 +227,13 @@ def _parse_key_range_result(self, key_type, key_range):
raise type(e)(f"Cannot apply {key_type} to '{mn}', '{mx}'.") from e

def _bisect_and_diff_segments(
self, ti: ThreadedYielder, table1: TableSegment, table2: TableSegment, level=0, max_rows=None
self,
ti: ThreadedYielder,
table1: TableSegment,
table2: TableSegment,
info_tree: InfoTree,
level=0,
max_rows=None,
):
assert table1.is_bounded and table2.is_bounded

Expand All @@ -224,4 +247,7 @@ def _bisect_and_diff_segments(

# Recursively compare each pair of corresponding segments between table1 and table2
for i, (t1, t2) in enumerate(safezip(segmented1, segmented2)):
ti.submit(self._diff_segments, ti, t1, t2, max_rows, level + 1, i + 1, len(segmented1), priority=level)
info_node = info_tree.add_node(t1, t2, max_rows=max_rows)
ti.submit(
self._diff_segments, ti, t1, t2, info_node, max_rows, level + 1, i + 1, len(segmented1), priority=level
)
40 changes: 24 additions & 16 deletions data_diff/hashdiff_tables.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,8 @@

from runtype import dataclass

from data_diff.info_tree import InfoTree

from .utils import safezip
from .thread_utils import ThreadedYielder
from .sqeleton.databases import ColType_UUID, NumericType, PrecisionType, StringType
Expand Down Expand Up @@ -123,6 +125,7 @@ def _diff_segments(
ti: ThreadedYielder,
table1: TableSegment,
table2: TableSegment,
info_tree: InfoTree,
max_rows: int,
level=0,
segment_index=None,
Expand All @@ -140,10 +143,13 @@ def _diff_segments(
# the threshold) and _then_ download it.
if BENCHMARK:
if max_rows < self.bisection_threshold:
return self._bisect_and_diff_segments(ti, table1, table2, level=level, max_rows=max_rows)
return self._bisect_and_diff_segments(ti, table1, table2, info_tree, level=level, max_rows=max_rows)

(count1, checksum1), (count2, checksum2) = self._threaded_call("count_and_checksum", [table1, table2])

assert not info_tree.info.rowcounts
info_tree.info.rowcounts = {1: count1, 2: count2}

if count1 == 0 and count2 == 0:
logger.debug(
"Uneven distribution of keys detected in segment %s..%s (big gaps in the key column). "
Expand All @@ -152,42 +158,44 @@ def _diff_segments(
table1.max_key,
)
assert checksum1 is None and checksum2 is None
info_tree.info.is_diff = False
return

if level == 1:
self.stats["table1_count"] = self.stats.get("table1_count", 0) + count1
self.stats["table2_count"] = self.stats.get("table2_count", 0) + count2
if checksum1 == checksum2:
info_tree.info.is_diff = False
return

if checksum1 != checksum2:
return self._bisect_and_diff_segments(ti, table1, table2, level=level, max_rows=max(count1, count2))
info_tree.info.is_diff = True
return self._bisect_and_diff_segments(ti, table1, table2, info_tree, level=level, max_rows=max(count1, count2))

def _bisect_and_diff_segments(
self, ti: ThreadedYielder, table1: TableSegment, table2: TableSegment, level=0, max_rows=None
self,
ti: ThreadedYielder,
table1: TableSegment,
table2: TableSegment,
info_tree: InfoTree,
level=0,
max_rows=None,
):
assert table1.is_bounded and table2.is_bounded

max_space_size = max(table1.approximate_size(), table2.approximate_size())
if max_rows is None:
# We can be sure that row_count <= max_rows iff the table key is unique
max_rows = max_space_size
info_tree.info.max_rows = max_rows

# If count is below the threshold, just download and compare the columns locally
# This saves time, as bisection speed is limited by ping and query performance.
if max_rows < self.bisection_threshold or max_space_size < self.bisection_factor * 2:
rows1, rows2 = self._threaded_call("get_values", [table1, table2])
diff = list(diff_sets(rows1, rows2))

# Initial bisection_threshold larger than count. Normally we always
# checksum and count segments, even if we get the values. At the
# first level, however, that won't be true.
if level == 0:
self.stats["table1_count"] = len(rows1)
self.stats["table2_count"] = len(rows2)

self.stats["diff_count"] += len(diff)
info_tree.info.set_diff(diff)
info_tree.info.rowcounts = {1: len(rows1), 2: len(rows2)}

logger.info(". " * level + f"Diff found {len(diff)} different rows.")
self.stats["rows_downloaded"] = self.stats.get("rows_downloaded", 0) + max(len(rows1), len(rows2))
return diff

return super()._bisect_and_diff_segments(ti, table1, table2, level, max_rows)
return super()._bisect_and_diff_segments(ti, table1, table2, info_tree, level, max_rows)
53 changes: 53 additions & 0 deletions data_diff/info_tree.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
from typing import Tuple, List, Dict
from itertools import chain

from runtype import dataclass

from .table_segment import TableSegment


@dataclass(frozen=False)
class SegmentInfo:
tables: List[TableSegment]

diff: list = None
is_diff: bool = None
diff_count: int = None

rowcounts: Dict[int, int] = {}
max_rows: int = None

def set_diff(self, diff):
self.diff = diff
self.diff_count = len(diff)
self.is_diff = self.diff_count > 0

def update_from_children(self, child_infos):
child_infos = list(child_infos)
assert child_infos

# self.diff = list(chain(*[c.diff for c in child_infos]))
self.diff_count = sum(c.diff_count for c in child_infos if c.diff_count is not None)
self.is_diff = any(c.is_diff for c in child_infos)

self.rowcounts = {
1: sum(c.rowcounts[1] for c in child_infos),
2: sum(c.rowcounts[2] for c in child_infos),
}


@dataclass
class InfoTree:
info: SegmentInfo
children: List["InfoTree"] = []

def add_node(self, table1: TableSegment, table2: TableSegment, max_rows: int = None):
node = InfoTree(SegmentInfo([table1, table2], max_rows=max_rows))
self.children.append(node)
return node

def aggregate_info(self):
if self.children:
for c in self.children:
c.aggregate_info()
self.info.update_from_children(c.info for c in self.children)
Loading