Skip to content
This repository was archived by the owner on May 17, 2024. It is now read-only.

yield list vs. indiv tuples #805

Merged
merged 1 commit into from
Dec 29, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 9 additions & 5 deletions data_diff/diff_tables.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
from enum import Enum
from contextlib import contextmanager
from operator import methodcaller
from typing import Dict, Set, Tuple, Iterator, Optional
from typing import Dict, Set, List, Tuple, Iterator, Optional
from concurrent.futures import ThreadPoolExecutor, as_completed

import attrs
Expand All @@ -28,6 +28,7 @@ class Algorithm(Enum):


DiffResult = Iterator[Tuple[str, tuple]] # Iterator[Tuple[Literal["+", "-"], tuple]]
DiffResultList = Iterator[List[Tuple[str, tuple]]]


@attrs.define(frozen=False)
Expand Down Expand Up @@ -187,6 +188,7 @@ class TableDiffer(ThreadBase, ABC):
ignored_columns1: Set[str] = attrs.field(factory=set)
ignored_columns2: Set[str] = attrs.field(factory=set)
_ignored_columns_lock: threading.Lock = attrs.field(factory=threading.Lock, init=False)
yield_list: bool = False

def diff_tables(self, table1: TableSegment, table2: TableSegment, info_tree: InfoTree = None) -> DiffResultWrapper:
"""Diff the given tables.
Expand Down Expand Up @@ -255,7 +257,9 @@ def _diff_tables_wrapper(self, table1: TableSegment, table2: TableSegment, info_
def _validate_and_adjust_columns(self, table1: TableSegment, table2: TableSegment) -> None:
pass

def _diff_tables_root(self, table1: TableSegment, table2: TableSegment, info_tree: InfoTree) -> DiffResult:
def _diff_tables_root(
self, table1: TableSegment, table2: TableSegment, info_tree: InfoTree
) -> DiffResult | DiffResultList:
return self._bisect_and_diff_tables(table1, table2, info_tree)

@abstractmethod
Expand Down Expand Up @@ -300,9 +304,9 @@ def _bisect_and_diff_tables(self, table1: TableSegment, table2: TableSegment, in
f"size: table1 <= {btable1.approximate_size()}, table2 <= {btable2.approximate_size()}"
)

ti = ThreadedYielder(self.max_threadpool_size)
ti = ThreadedYielder(self.max_threadpool_size, self.yield_list)
# Bisect (split) the table into segments, and diff them recursively.
ti.submit(self._bisect_and_diff_segments, ti, btable1, btable2, info_tree)
ti.submit(self._bisect_and_diff_segments, ti, btable1, btable2, info_tree, priority=999)

# Now we check for the second min-max, to diff the portions we "missed".
# This is achieved by subtracting the table ranges, and dividing the resulting space into aligned boxes.
Expand All @@ -326,7 +330,7 @@ def _bisect_and_diff_tables(self, table1: TableSegment, table2: TableSegment, in

for p1, p2 in new_regions:
extra_tables = [t.new_key_bounds(min_key=p1, max_key=p2) for t in (table1, table2)]
ti.submit(self._bisect_and_diff_segments, ti, *extra_tables, info_tree)
ti.submit(self._bisect_and_diff_segments, ti, *extra_tables, info_tree, priority=999)

return ti

Expand Down
9 changes: 7 additions & 2 deletions data_diff/thread_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -56,19 +56,24 @@ class ThreadedYielder(Iterable):
_futures: deque
_yield: deque = attrs.field(alias="_yield") # Python keyword!
_exception: Optional[None]
yield_list: bool

def __init__(self, max_workers: Optional[int] = None):
def __init__(self, max_workers: Optional[int] = None, yield_list: bool = False):
super().__init__()
self._pool = PriorityThreadPoolExecutor(max_workers)
self._futures = deque()
self._yield = deque()
self._exception = None
self.yield_list = yield_list

def _worker(self, fn, *args, **kwargs):
try:
res = fn(*args, **kwargs)
if res is not None:
self._yield += res
if self.yield_list:
self._yield.append(res)
else:
self._yield += res
except Exception as e:
self._exception = e

Expand Down