6
6
from enum import Enum
7
7
from contextlib import contextmanager
8
8
from operator import methodcaller
9
- from typing import Dict , Set , Tuple , Iterator , Optional
9
+ from typing import Dict , Set , List , Tuple , Iterator , Optional
10
10
from concurrent .futures import ThreadPoolExecutor , as_completed
11
11
12
12
import attrs
@@ -28,6 +28,7 @@ class Algorithm(Enum):
28
28
29
29
30
30
DiffResult = Iterator [Tuple [str , tuple ]] # Iterator[Tuple[Literal["+", "-"], tuple]]
31
+ DiffResultList = Iterator [List [Tuple [str , tuple ]]]
31
32
32
33
33
34
@attrs .define (frozen = False )
@@ -187,6 +188,7 @@ class TableDiffer(ThreadBase, ABC):
187
188
ignored_columns1 : Set [str ] = attrs .field (factory = set )
188
189
ignored_columns2 : Set [str ] = attrs .field (factory = set )
189
190
_ignored_columns_lock : threading .Lock = attrs .field (factory = threading .Lock , init = False )
191
+ yield_list : bool = False
190
192
191
193
def diff_tables (self , table1 : TableSegment , table2 : TableSegment , info_tree : InfoTree = None ) -> DiffResultWrapper :
192
194
"""Diff the given tables.
@@ -255,7 +257,9 @@ def _diff_tables_wrapper(self, table1: TableSegment, table2: TableSegment, info_
255
257
def _validate_and_adjust_columns (self , table1 : TableSegment , table2 : TableSegment ) -> None :
256
258
pass
257
259
258
- def _diff_tables_root (self , table1 : TableSegment , table2 : TableSegment , info_tree : InfoTree ) -> DiffResult :
260
+ def _diff_tables_root (
261
+ self , table1 : TableSegment , table2 : TableSegment , info_tree : InfoTree
262
+ ) -> DiffResult | DiffResultList :
259
263
return self ._bisect_and_diff_tables (table1 , table2 , info_tree )
260
264
261
265
@abstractmethod
@@ -300,9 +304,9 @@ def _bisect_and_diff_tables(self, table1: TableSegment, table2: TableSegment, in
300
304
f"size: table1 <= { btable1 .approximate_size ()} , table2 <= { btable2 .approximate_size ()} "
301
305
)
302
306
303
- ti = ThreadedYielder (self .max_threadpool_size )
307
+ ti = ThreadedYielder (self .max_threadpool_size , self . yield_list )
304
308
# Bisect (split) the table into segments, and diff them recursively.
305
- ti .submit (self ._bisect_and_diff_segments , ti , btable1 , btable2 , info_tree )
309
+ ti .submit (self ._bisect_and_diff_segments , ti , btable1 , btable2 , info_tree , priority = 999 )
306
310
307
311
# Now we check for the second min-max, to diff the portions we "missed".
308
312
# This is achieved by subtracting the table ranges, and dividing the resulting space into aligned boxes.
@@ -326,7 +330,7 @@ def _bisect_and_diff_tables(self, table1: TableSegment, table2: TableSegment, in
326
330
327
331
for p1 , p2 in new_regions :
328
332
extra_tables = [t .new_key_bounds (min_key = p1 , max_key = p2 ) for t in (table1 , table2 )]
329
- ti .submit (self ._bisect_and_diff_segments , ti , * extra_tables , info_tree )
333
+ ti .submit (self ._bisect_and_diff_segments , ti , * extra_tables , info_tree , priority = 999 )
330
334
331
335
return ti
332
336
0 commit comments