|
| 1 | +# Copyright 2021 Joschua Gandert |
| 2 | +# |
| 3 | +# This program is free software; you can redistribute it and/or modify |
| 4 | +# it under the terms of the GNU General Public License as published by |
| 5 | +# the Free Software Foundation; either version 2 of the License, or |
| 6 | +# (at your option) any later version. |
| 7 | + |
| 8 | +from difflib import SequenceMatcher |
| 9 | +from operator import itemgetter |
| 10 | +from typing import Generic, TypeVar, Mapping, Callable, Any, Union, List, \ |
| 11 | + Sequence, Optional, Tuple |
| 12 | + |
| 13 | + |
| 14 | +T = TypeVar('T') |
| 15 | +Real = Union[int, float] |
| 16 | +AttributeGetter = Callable[[T], Any] |
| 17 | +AttributeGetterToWeight = Mapping[AttributeGetter, Real] |
| 18 | + |
| 19 | +SUPPORTED_NUMBER_TYPES = (int, float) |
| 20 | + |
| 21 | + |
| 22 | +class _MatchData: |
| 23 | + def __init__(self, a_idx, a_value, b_size): |
| 24 | + self.a_idx = a_idx |
| 25 | + self.a_value = a_value |
| 26 | + self.b_idx_to_similarity = [0 for _ in range(b_size)] |
| 27 | + |
| 28 | + self.best_b_idx = -1 |
| 29 | + self.best_b_similarity = float('-inf') |
| 30 | + self.second_best_b_idx = -1 |
| 31 | + self.second_best_b_similarity = float('-inf') |
| 32 | + self.found_best = False |
| 33 | + |
| 34 | + self.continue_attr_index = 0 |
| 35 | + |
| 36 | + # rarely needed since in a lot of cases there won't be conflicts |
| 37 | + self._sorted_b_similarity_with_idx_pairs = None |
| 38 | + self.is_fully_measured = False |
| 39 | + |
| 40 | + def add_similarity(self, b_idx, similarity_part): |
| 41 | + self.b_idx_to_similarity[b_idx] += similarity_part |
| 42 | + b_idx_total_similarity = self.b_idx_to_similarity[b_idx] |
| 43 | + |
| 44 | + if b_idx_total_similarity > self.best_b_similarity: |
| 45 | + self.second_best_b_idx = self.best_b_idx |
| 46 | + self.second_best_b_similarity = self.best_b_similarity |
| 47 | + |
| 48 | + self.best_b_idx = b_idx |
| 49 | + self.best_b_similarity = b_idx_total_similarity |
| 50 | + elif b_idx_total_similarity > self.second_best_b_similarity: |
| 51 | + self.second_best_b_idx = b_idx |
| 52 | + self.second_best_b_similarity = b_idx_total_similarity |
| 53 | + |
| 54 | + def set_to_fully_measured(self): |
| 55 | + self.is_fully_measured = True |
| 56 | + |
| 57 | + # We sort so last = best as calling pop() on a list is O(1) |
| 58 | + # We negate the index here, so that the sorting is correct if the |
| 59 | + # similarities of two b's are the same (lowest index last) |
| 60 | + self._sorted_b_similarity_with_idx_pairs = sorted( |
| 61 | + ((s, -i) for i, s in enumerate(self.b_idx_to_similarity))) |
| 62 | + |
| 63 | + def replace_best(self): |
| 64 | + if not self._sorted_b_similarity_with_idx_pairs: |
| 65 | + return |
| 66 | + |
| 67 | + self._sorted_b_similarity_with_idx_pairs.pop() |
| 68 | + if not self._sorted_b_similarity_with_idx_pairs: |
| 69 | + # None left, which means all others a's have better values |
| 70 | + # than this one. As a result, this a will stay alone :( |
| 71 | + self.best_b_idx = -1 |
| 72 | + self.best_b_similarity = float('-inf') |
| 73 | + return |
| 74 | + |
| 75 | + sim, idx = self._sorted_b_similarity_with_idx_pairs[-1] |
| 76 | + self.best_b_idx = -idx |
| 77 | + self.best_b_similarity = sim |
| 78 | + |
| 79 | + |
| 80 | +class ObjectListMatcher(Generic[T]): |
| 81 | + """ |
| 82 | + Utility class that compares the objects of two lists (all the same type), and finds |
| 83 | + the best matches. What is considered a good match depends on the supplied mapping, |
| 84 | + which maps "attribute getter" functions to their weight. |
| 85 | +
|
| 86 | + Weights are normalized, so you can use values > 1. |
| 87 | +
|
| 88 | + A specific getter has to return objects of only one of these types: |
| 89 | + - Real number (e.g. int or float) - compared by 1 - abs(delta) / max_delta |
| 90 | + - Other non-Sequence - converted to str which is a Sequence and then.. |
| 91 | + - Sequence - compared using difflib.SequenceMatcher |
| 92 | +
|
| 93 | + The problem this solves is called the "Assignment problem". |
| 94 | +
|
| 95 | + Note that this class will not necessarily call every attribute function and thus |
| 96 | + not use its weight in the calculation, as it will avoid doing unnecessary |
| 97 | + calculations when the current best match for an element is undefeatable. |
| 98 | +
|
| 99 | + For example, let's say we have an attribute x with weight 0.8 and an attribute y |
| 100 | + with weight 0.2. If, after checking the similarity on attribute x with all items, |
| 101 | + the highest similarity is 0.8 (it was equal to the compared item) and the second |
| 102 | + highest has a similarity of 0.5, then we won't check attribute y at all, as the |
| 103 | + second best cannot possibly win with the remaining weight (0.5 + 0.2 < 0.8).""" |
| 104 | + |
| 105 | + _matcher: SequenceMatcher |
| 106 | + _attr_with_weight: List[Tuple[Callable[[T], Any], float]] |
| 107 | + similarity_matrix: List[List[float]] |
| 108 | + _b_idx_to_a_match_data: List[Optional[_MatchData]] |
| 109 | + |
| 110 | + def __init__(self, attr_to_weight: AttributeGetterToWeight): |
| 111 | + self.update_attr_to_weights(attr_to_weight) |
| 112 | + self._matcher = SequenceMatcher() |
| 113 | + self.should_store_similarity_matrix = False |
| 114 | + """Whether or not to store the similarity_matrix.""" |
| 115 | + |
| 116 | + self.similarity_matrix = [] |
| 117 | + """This will only be populated if should_store_similarity_matrix is True, and |
| 118 | + then every time get_indices is called. |
| 119 | + It's a matrix that has the same number of elements as a_items. Each of those |
| 120 | + lists has the same number of elements as b_items. Those elements represent how |
| 121 | + similar an a item is to each b item. So matrix[a_idx][b_idx] gives you the |
| 122 | + similarity between the a item and b item at those indices.""" |
| 123 | + |
| 124 | + self.minimum_similarity = 0 |
| 125 | + """A value in the range [0, 1] that defines what similarity is required to be a |
| 126 | + valid match. Setting this to a reasonable value will speed up the calculation |
| 127 | + as low-quality matches will be discarded before a match conflict can even arise. |
| 128 | + This has to be carefully chosen with regard to the weights, as this class will |
| 129 | + not necessarily incorporate all (but the largest) weights in the score. |
| 130 | + So, if you set this to 0.8, but your largest weight is 0.5, it's possible that |
| 131 | + a match will almost never be discarded. This can be fine, of course.""" |
| 132 | + |
| 133 | + # This has not been tested heavily, so there might be some bugs! |
| 134 | + self.lock_in_if_similarity_first_above = 1 # FIXME: test this more |
| 135 | + """If the similarity is above this value, we lock it in as the best match. |
| 136 | + Improves performance by removing alternatives from all following elements.""" |
| 137 | + |
| 138 | + @classmethod |
| 139 | + def for_sequence(cls, weights: Sequence[Real]): |
| 140 | + """Creates a matcher where itemgetter(n) is mapped to weight_list[n]""" |
| 141 | + attr_to_weight = {itemgetter(n): w for n, w in enumerate(weights)} |
| 142 | + return ObjectListMatcher(attr_to_weight) |
| 143 | + |
| 144 | + @classmethod |
| 145 | + def for_one_attr(cls, attr: AttributeGetter): |
| 146 | + """Creates a matcher where a single attribute of the objects are compared.""" |
| 147 | + return ObjectListMatcher({attr: 1}) |
| 148 | + |
| 149 | + @classmethod |
| 150 | + def of_identity(cls): |
| 151 | + """Creates a matcher where only the objects themselves are compared.""" |
| 152 | + return cls.for_one_attr((lambda i: i)) |
| 153 | + |
| 154 | + def update_attr_to_weights(self, attr_to_weight: AttributeGetterToWeight): |
| 155 | + if not attr_to_weight: |
| 156 | + raise ValueError("there must be at least one weight") |
| 157 | + |
| 158 | + # normalize |
| 159 | + weight_sum = sum(attr_to_weight.values()) |
| 160 | + |
| 161 | + self._attr_with_weight = [] |
| 162 | + for attr, weight in attr_to_weight.items(): |
| 163 | + if weight <= 0: |
| 164 | + raise ValueError("weights <= 0 are not allowed") |
| 165 | + |
| 166 | + self._attr_with_weight.append((attr, weight / weight_sum)) |
| 167 | + |
| 168 | + # from largest to smallest weight |
| 169 | + self._attr_with_weight.sort(key=lambda i: i[1], reverse=True) |
| 170 | + |
| 171 | + weight_left = 1.0 |
| 172 | + self._attr_to_weight_left = [] |
| 173 | + for _, weight in self._attr_with_weight: |
| 174 | + self._attr_to_weight_left.append(weight_left) |
| 175 | + weight_left -= weight |
| 176 | + |
| 177 | + def get_indices(self, a_items: List[T], b_items: List[T]) -> List[int]: |
| 178 | + """ |
| 179 | + Returns the indices of b ordered so that they match elements in a. |
| 180 | +
|
| 181 | + Size of a_items and b_items can differ. If there are more a_items than |
| 182 | + there are b_items, -1 is used if no match could be assigned to an a. |
| 183 | + As a result, the returned list always has the size of a_items. |
| 184 | +
|
| 185 | + In terms of performance, it's preferable to supply the smaller list as a_items. |
| 186 | + """ |
| 187 | + if not b_items: |
| 188 | + return [-1 for _ in a_items] |
| 189 | + |
| 190 | + self._b_items = list(enumerate(b_items)) |
| 191 | + b_size = len(b_items) |
| 192 | + |
| 193 | + self._b_idx_to_a_match_data = [None for _ in range(b_size)] |
| 194 | + self._a_idx_to_match_data = [] |
| 195 | + |
| 196 | + for a_idx, a in enumerate(a_items): |
| 197 | + match_data = _MatchData(a_idx, a, b_size) |
| 198 | + self._a_idx_to_match_data.append(match_data) |
| 199 | + |
| 200 | + self._measure_similarity_to_find_best_b_match(match_data) |
| 201 | + |
| 202 | + if match_data.best_b_similarity < self.minimum_similarity: |
| 203 | + match_data.best_b_idx = -1 |
| 204 | + elif match_data.found_best: |
| 205 | + self._b_idx_to_a_match_data[match_data.best_b_idx] = match_data |
| 206 | + else: |
| 207 | + self._handle_conflicts_if_any(match_data) |
| 208 | + pass |
| 209 | + |
| 210 | + result = [md.best_b_idx for md in self._a_idx_to_match_data] |
| 211 | + |
| 212 | + if self.should_store_similarity_matrix: |
| 213 | + matrix = [md.b_idx_to_similarity for md in |
| 214 | + self._a_idx_to_match_data] |
| 215 | + self.similarity_matrix = matrix |
| 216 | + |
| 217 | + # cleanup |
| 218 | + del self._b_items |
| 219 | + del self._b_idx_to_a_match_data |
| 220 | + del self._a_idx_to_match_data |
| 221 | + |
| 222 | + return result |
| 223 | + |
| 224 | + def _handle_conflicts_if_any(self, a1_match_data): |
| 225 | + while True: |
| 226 | + best_b_idx = a1_match_data.best_b_idx |
| 227 | + if best_b_idx == -1: |
| 228 | + # a1 has no matches left or could not find a match |
| 229 | + return |
| 230 | + |
| 231 | + a2_match_data = self._b_idx_to_a_match_data[best_b_idx] |
| 232 | + if a2_match_data is None: |
| 233 | + self._b_idx_to_a_match_data[best_b_idx] = a1_match_data |
| 234 | + return |
| 235 | + |
| 236 | + # We have found a conflict and will now solve it. |
| 237 | + a1_match_data = self._get_worse_match_data(a1_match_data, |
| 238 | + a2_match_data) |
| 239 | + |
| 240 | + def _get_worse_match_data(self, a1_match_data, a2_match_data): |
| 241 | + # b is matched to a previous a (a2), so we have to find a better match |
| 242 | + self._finish_similarity_measures(a1_match_data) |
| 243 | + |
| 244 | + if a2_match_data.found_best: |
| 245 | + a1_match_data.replace_best() |
| 246 | + # still need to find a better match for a1 |
| 247 | + |
| 248 | + return a1_match_data |
| 249 | + |
| 250 | + self._finish_similarity_measures(a2_match_data) |
| 251 | + |
| 252 | + # As the index of a1 will almost always be larger than a2, we use <= |
| 253 | + # here, since in case they're equal in terms of similarity, we want to |
| 254 | + # give some weight to the current order of b (index 0 preferred to 1). |
| 255 | + if a1_match_data.best_b_similarity <= a2_match_data.best_b_similarity: |
| 256 | + a1_match_data.replace_best() |
| 257 | + |
| 258 | + # still need to find a better match for a1 |
| 259 | + return a1_match_data |
| 260 | + else: |
| 261 | + # a1 is better so replace a2 in map |
| 262 | + best_b_idx = a1_match_data.best_b_idx |
| 263 | + self._b_idx_to_a_match_data[best_b_idx] = a1_match_data |
| 264 | + |
| 265 | + a2_match_data.replace_best() |
| 266 | + |
| 267 | + # we need to find a new match for a2 |
| 268 | + return a2_match_data |
| 269 | + |
| 270 | + def _finish_similarity_measures(self, a_match_data): |
| 271 | + """Only if this was called, is match data full measured and only then |
| 272 | + can match_data.replace_best() be called.""" |
| 273 | + if a_match_data.is_fully_measured: |
| 274 | + return |
| 275 | + |
| 276 | + # figure out total similarity (without stopping) if we didn't |
| 277 | + self._measure_similarity_to_find_best_b_match(a_match_data) |
| 278 | + a_match_data.set_to_fully_measured() |
| 279 | + |
| 280 | + def _measure_similarity_to_find_best_b_match(self, a_match_data): |
| 281 | + continue_attr_idx = a_match_data.continue_attr_index |
| 282 | + can_stop = continue_attr_idx == 0 |
| 283 | + attr_size = len(self._attr_with_weight) |
| 284 | + |
| 285 | + for attr_idx in range(continue_attr_idx, attr_size): |
| 286 | + if a_match_data.found_best: |
| 287 | + a_match_data.continue_attr_index = attr_size |
| 288 | + return |
| 289 | + |
| 290 | + # stop if one has more similarity than is possible for the rest |
| 291 | + if can_stop and self._is_max_similarity_undefeatable(attr_idx, |
| 292 | + a_match_data): |
| 293 | + a_match_data.continue_attr_index = attr_idx |
| 294 | + return |
| 295 | + |
| 296 | + self._measure_similarity_for_attr(attr_idx, a_match_data) |
| 297 | + |
| 298 | + a_match_data.continue_attr_index = attr_size |
| 299 | + |
| 300 | + def _is_max_similarity_undefeatable(self, attr_idx, a_match_data): |
| 301 | + # Must only be called at the start of the loop. |
| 302 | + # We need a second best, so we can check if they have any chance. |
| 303 | + if a_match_data.second_best_b_idx < 0: |
| 304 | + return False |
| 305 | + |
| 306 | + # Any similarity has to be in [0, 1], so the following is the maximum |
| 307 | + # similarity that could be achieved with the remaining attr / weights |
| 308 | + optimal_second_best_similarity = self._attr_to_weight_left[attr_idx] |
| 309 | + |
| 310 | + # Now we add the second best similarity, since we want to know if it's |
| 311 | + # even possible for the second best to win against the current best |
| 312 | + optimal_second_best_similarity += a_match_data.second_best_b_similarity |
| 313 | + |
| 314 | + return optimal_second_best_similarity < a_match_data.best_b_similarity |
| 315 | + |
| 316 | + def _measure_similarity_for_attr(self, attr_idx, a_match_data): |
| 317 | + get_attr, weight = self._attr_with_weight[attr_idx] |
| 318 | + a_attr = get_attr(a_match_data.a_value) |
| 319 | + |
| 320 | + # isinstance does not work with Union, so we need to use a tuple here |
| 321 | + if isinstance(a_attr, SUPPORTED_NUMBER_TYPES): |
| 322 | + self._add_number_similarity(a_match_data, a_attr, get_attr, weight) |
| 323 | + return |
| 324 | + |
| 325 | + if not isinstance(a_attr, Sequence): |
| 326 | + a_attr = str(a_attr) |
| 327 | + |
| 328 | + def get_attr(obj, original_get_attr=get_attr): |
| 329 | + return str(original_get_attr(obj)) |
| 330 | + |
| 331 | + # replace it so we don't have to do this again for this attr_idx |
| 332 | + self._attr_with_weight[attr_idx] = (get_attr, weight) |
| 333 | + |
| 334 | + self._add_sequence_similarity(a_match_data, a_attr, get_attr, weight) |
| 335 | + |
| 336 | + def _add_number_similarity(self, a_match_data, a_attr, get_attr, weight): |
| 337 | + deltas = [abs(get_attr(b) - a_attr) for _, b in self._b_items] |
| 338 | + max_delta = max(deltas) |
| 339 | + if max_delta == 0: |
| 340 | + if a_match_data.best_b_idx == -1: |
| 341 | + # We try to set a match here manually to avoid the situation where |
| 342 | + # nothing is matched to a and -1 returned just because all are the same. |
| 343 | + # In all other cases MatchData's add_similarity takes care of this. |
| 344 | + for b_idx, match_data in enumerate( |
| 345 | + self._b_idx_to_a_match_data): |
| 346 | + # find the first b index that's not matched to an a element |
| 347 | + if match_data is None: |
| 348 | + a_match_data.best_b_idx = b_idx |
| 349 | + a_match_data.best_b_similarity = 0 |
| 350 | + break |
| 351 | + |
| 352 | + # no delta, so all the same and no change in similarity |
| 353 | + return # also let's avoid division by zero |
| 354 | + |
| 355 | + for b_idx, delta in enumerate(deltas): |
| 356 | + # if delta is small relative to max, similarity is higher |
| 357 | + # for example, if delta is 0, b_attr is the same as a_attr |
| 358 | + similarity = weight * (1 - delta / max_delta) |
| 359 | + a_match_data.add_similarity(b_idx, similarity) |
| 360 | + |
| 361 | + def _add_sequence_similarity(self, a_match_data, a_attr, get_attr, weight): |
| 362 | + matcher = self._matcher |
| 363 | + if a_match_data.found_best: |
| 364 | + return |
| 365 | + |
| 366 | + # set_seq2 is used here as it caches information (contrary to 1) |
| 367 | + matcher.set_seq2(a_attr) |
| 368 | + |
| 369 | + lock_in_min = self.lock_in_if_similarity_first_above |
| 370 | + |
| 371 | + b_items = self._b_items |
| 372 | + |
| 373 | + for i in range(len(b_items)): |
| 374 | + b_idx, b = b_items[i] |
| 375 | + b_attr = get_attr(b) |
| 376 | + |
| 377 | + matcher.set_seq1(b_attr) |
| 378 | + |
| 379 | + a_match_data.add_similarity(b_idx, weight * matcher.ratio()) |
| 380 | + |
| 381 | + if (a_match_data.best_b_idx == b_idx and # |
| 382 | + a_match_data.best_b_similarity > lock_in_min and |
| 383 | + a_attr and b_attr): |
| 384 | + del b_items[i] |
| 385 | + a_match_data.found_best = True |
| 386 | + return |
0 commit comments