-
Notifications
You must be signed in to change notification settings - Fork 185
Cms for categorical #892
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Merged
Merged
Cms for categorical #892
Changes from all commits
Commits
Show all changes
26 commits
Select commit
Hold shift + click to select a range
3d25015
WIP cms implementation
tyfarnan 94e53ec
add heavy hitters implementation
tyfarnan 1ff4d02
add heavy hitters implementation
tyfarnan 8c750a6
WIP: mypy issue
tyfarnan 2f27a92
WIP: mypy issue
tyfarnan b7feeb5
add cms bool and refactor options handler
tyfarnan 466c76a
WIP: testing for CMS
tyfarnan aeb19ae
WIP: testing for CMS
tyfarnan e7f84d1
use new heavy_hitters_threshold, add test for it
tyfarnan 3132b0e
feat: add dev to workfow for testing (#897)
JGSweets 3b307b8
allow for hybrid handling of stream and batch for CMS
tyfarnan 085024a
allow for hybrid handling of stream and batch for CMS
tyfarnan efa958b
add tests for edge cases and various ways of merging profiles with cms
tyfarnan 05526c6
Merge branch 'dev' into cms_for_categorical
tyfarnan 9aa87e7
include new CMS options, add test condition for them
tyfarnan abb4db8
var renames, add conditions for adding profiles, remove serialization
tyfarnan ddd212e
update tests for new conditions
tyfarnan 3be6229
handle case where two profiles have different max_num_heavy_hitters
tyfarnan 16e1247
change default test value for max_num_heavy_hitters
tyfarnan b91384b
move edge case condition inside _merge_categorical_cms, modify test t…
tyfarnan 4666681
add tests for edge cases, refactor
tyfarnan dacbb1a
scipy req limit
tyfarnan f843a0f
update docstrings for new CMS params
tyfarnan 7785d97
update default val in expected results
tyfarnan 66f30a5
Merge branch 'dev' into cms_for_categorical
tyfarnan 81ac139
Update dataprofiler/profilers/categorical_column_profile.py
tyfarnan File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -5,6 +5,7 @@ | |
| from operator import itemgetter | ||
| from typing import cast | ||
|
|
||
| import datasketches | ||
| from pandas import DataFrame, Series | ||
|
|
||
| from . import BaseColumnProfiler, utils | ||
|
|
@@ -52,6 +53,11 @@ def __init__(self, name: str | None, options: CategoricalOptions = None) -> None | |
|
|
||
| self._stopped_at_unique_ratio: float | None = None | ||
| self._stopped_at_unique_count: int | None = None | ||
|
|
||
| self._cms_max_num_heavy_hitters: int | None = 5000 | ||
| self.cms_num_hashes: int | None = None | ||
| self.cms_num_buckets: int | None = None | ||
| self.cms: datasketches.countminsketch | None = None | ||
| if options: | ||
| self._top_k_categories = options.top_k_categories | ||
| self.stop_condition_unique_value_ratio = ( | ||
|
|
@@ -61,6 +67,20 @@ def __init__(self, name: str | None, options: CategoricalOptions = None) -> None | |
| options.max_sample_size_to_check_stop_condition | ||
| ) | ||
|
|
||
| if options.cms: | ||
| self._cms_max_num_heavy_hitters = options.cms_max_num_heavy_hitters | ||
| self.cms_num_hashes = datasketches.count_min_sketch.suggest_num_hashes( | ||
| options.cms_confidence | ||
| ) | ||
| self.cms_num_buckets = ( | ||
| datasketches.count_min_sketch.suggest_num_buckets( | ||
| options.cms_relative_error | ||
| ) | ||
| ) | ||
| self.cms = datasketches.count_min_sketch( | ||
| self.cms_num_hashes, self.cms_num_buckets | ||
| ) | ||
|
|
||
| def __add__(self, other: CategoricalColumn) -> CategoricalColumn: | ||
| """ | ||
| Merge the properties of two CategoricalColumn profiles. | ||
|
|
@@ -83,53 +103,88 @@ def __add__(self, other: CategoricalColumn) -> CategoricalColumn: | |
| self._merge_calculations( | ||
| merged_profile.__calculations, self.__calculations, other.__calculations | ||
| ) | ||
| # If both profiles have not met stop condition | ||
| if not (self._stop_condition_is_met or other._stop_condition_is_met): | ||
| merged_profile._categories = utils.add_nested_dictionaries( | ||
| self._categories, other._categories | ||
|
|
||
| if self.cms and other.cms: | ||
tyfarnan marked this conversation as resolved.
Show resolved
Hide resolved
|
||
|
|
||
| assert isinstance(self._cms_max_num_heavy_hitters, int) | ||
| assert isinstance(other._cms_max_num_heavy_hitters, int) | ||
| cms_max_num_heavy_hitters: int = min( | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. i'd say we could have assigned here and not output at merge (currently just passing through), but that can be a future refactor. |
||
| self._cms_max_num_heavy_hitters, other._cms_max_num_heavy_hitters | ||
| ) | ||
|
|
||
| # Transfer stop condition variables of 1st profile object to merged profile | ||
| # if they are not None else set to 2nd profile | ||
| profile1_product = self.sample_size * self.unique_ratio | ||
| profile2_product = other.sample_size * other.unique_ratio | ||
| if profile1_product < profile2_product: | ||
| merged_profile.max_sample_size_to_check_stop_condition = ( | ||
| self.max_sample_size_to_check_stop_condition | ||
| ) | ||
| merged_profile.stop_condition_unique_value_ratio = ( | ||
| self.stop_condition_unique_value_ratio | ||
| ) | ||
| else: | ||
| merged_profile.stop_condition_unique_value_ratio = ( | ||
| other.stop_condition_unique_value_ratio | ||
| ) | ||
| merged_profile.max_sample_size_to_check_stop_condition = ( | ||
| other.max_sample_size_to_check_stop_condition | ||
| ( | ||
| merged_profile.cms, | ||
| merged_profile._categories, | ||
| merged_profile._cms_max_num_heavy_hitters, | ||
| ) = self._merge_categories_cms( | ||
| self.cms, | ||
| self._categories, | ||
| self.sample_size, | ||
| {}, | ||
| other.cms, | ||
| other._categories, | ||
| other.sample_size, | ||
| cms_max_num_heavy_hitters, | ||
| ) | ||
|
|
||
| elif not self.cms and not other.cms: | ||
| # If both profiles have not met stop condition | ||
| if not (self._stop_condition_is_met or other._stop_condition_is_met): | ||
| merged_profile._categories = utils.add_nested_dictionaries( | ||
| self._categories, other._categories | ||
| ) | ||
|
|
||
| # Check merged profile w/ stop condition | ||
| if merged_profile._check_stop_condition_is_met( | ||
| merged_profile.sample_size, merged_profile.unique_ratio | ||
| ): | ||
| merged_profile._stopped_at_unique_ratio = merged_profile.unique_ratio | ||
| merged_profile._stopped_at_unique_count = merged_profile.unique_count | ||
| # Transfer stop condition variables of 1st profile object to | ||
| # merged profile if they are not None else set to 2nd profile | ||
| profile1_product = self.sample_size * self.unique_ratio | ||
| profile2_product = other.sample_size * other.unique_ratio | ||
| if profile1_product < profile2_product: | ||
| merged_profile.max_sample_size_to_check_stop_condition = ( | ||
| self.max_sample_size_to_check_stop_condition | ||
| ) | ||
| merged_profile.stop_condition_unique_value_ratio = ( | ||
| self.stop_condition_unique_value_ratio | ||
| ) | ||
| else: | ||
| merged_profile.stop_condition_unique_value_ratio = ( | ||
| other.stop_condition_unique_value_ratio | ||
| ) | ||
| merged_profile.max_sample_size_to_check_stop_condition = ( | ||
| other.max_sample_size_to_check_stop_condition | ||
| ) | ||
|
|
||
| # Check merged profile w/ stop condition | ||
| if merged_profile._check_stop_condition_is_met( | ||
| merged_profile.sample_size, merged_profile.unique_ratio | ||
| ): | ||
| merged_profile._stopped_at_unique_ratio = ( | ||
| merged_profile.unique_ratio | ||
| ) | ||
| merged_profile._stopped_at_unique_count = ( | ||
| merged_profile.unique_count | ||
| ) | ||
| merged_profile._categories = {} | ||
| merged_profile._stop_condition_is_met = True | ||
|
|
||
| else: | ||
| if self.sample_size > other.sample_size: | ||
| merged_profile._stopped_at_unique_ratio = self.unique_ratio | ||
| merged_profile._stopped_at_unique_count = self.unique_count | ||
| merged_profile.sample_size = self.sample_size | ||
| else: | ||
| merged_profile._stopped_at_unique_ratio = other.unique_ratio | ||
| merged_profile._stopped_at_unique_count = other.unique_count | ||
| merged_profile.sample_size = other.sample_size | ||
|
|
||
| # If either profile has hit stop condition, remove categories dict | ||
| merged_profile._categories = {} | ||
| merged_profile._stop_condition_is_met = True | ||
|
|
||
| else: | ||
| if self.sample_size > other.sample_size: | ||
| merged_profile._stopped_at_unique_ratio = self.unique_ratio | ||
| merged_profile._stopped_at_unique_count = self.unique_count | ||
| merged_profile.sample_size = self.sample_size | ||
| else: | ||
| merged_profile._stopped_at_unique_ratio = other.unique_ratio | ||
| merged_profile._stopped_at_unique_count = other.unique_count | ||
| merged_profile.sample_size = other.sample_size | ||
|
|
||
| # If either profile has hit stop condition, remove categories dict | ||
| merged_profile._categories = {} | ||
| merged_profile._stop_condition_is_met = True | ||
| raise Exception( | ||
| "Unable to add two profiles: One is using count min sketch" | ||
| "and the other is using full." | ||
| ) | ||
|
|
||
| return merged_profile | ||
|
|
||
|
|
@@ -323,6 +378,107 @@ def _update_stop_condition(self, data: DataFrame): | |
| self._stopped_at_unique_ratio = merged_unique_ratio | ||
| self._stopped_at_unique_count = merged_unique_count | ||
|
|
||
| @BaseColumnProfiler._timeit(name="categories") | ||
| def _get_categories_cms(self, df_series, len_df): | ||
| """Return count min sketch and heavy hitters for both the batch and stream case. | ||
|
|
||
| :param df_series: Series currently being processed by categorical profiler | ||
| :type df_series: Series | ||
| :param len_df: the total number of samples iin df_series | ||
| :type len_df: int | ||
| :return: cms, heavy_hitter_dict, missing_heavy_hitter_dict | ||
| """ | ||
| cms = datasketches.count_min_sketch(self.cms_num_hashes, self.cms_num_buckets) | ||
| heavy_hitter_dict = defaultdict(int) | ||
| missing_heavy_hitter_dict = defaultdict(int) | ||
| for i, value in enumerate(df_series): | ||
| cms.update(value) | ||
| i_count = cms.get_estimate(value) | ||
| i_total_count = i_count + self.cms.get_estimate(value) | ||
| # approximate heavy-hitters | ||
| if i_count >= int(len_df / self._cms_max_num_heavy_hitters): | ||
| heavy_hitter_dict[value] = i_count | ||
| missing_heavy_hitter_dict.pop(value, None) | ||
| elif i_total_count >= int( | ||
| (self.sample_size + len_df) / self._cms_max_num_heavy_hitters | ||
| ): | ||
| missing_heavy_hitter_dict[value] = i_total_count | ||
| heavy_hitter_dict.pop(value, None) | ||
|
|
||
| return cms, heavy_hitter_dict, missing_heavy_hitter_dict | ||
|
|
||
| @BaseColumnProfiler._timeit(name="categories") | ||
| def _merge_categories_cms( | ||
| self, | ||
| cms1, | ||
| heavy_hitter_dict1, | ||
| len1, | ||
| missing_heavy_hitter_dict, | ||
| cms2, | ||
| heavy_hitter_dict2, | ||
| len2, | ||
| max_num_heavy_hitters, | ||
| ): | ||
| """Return the aggregate count min sketch and approximate histogram (categories). | ||
|
|
||
| :param cms1: count min sketch | ||
| :type cms1: datasketches.countminsketch | ||
| :param cms2: count min sketch | ||
| :type cms2: datasketches.countminsketch | ||
| :param heavy_hitter_dict1: Heavy Hitters category count | ||
| :type heavy_hitter_dict1: Dict | ||
| :param heavy_hitter_dict2: Heavy Hitters category count | ||
| :type heavy_hitter_dict2: Dict | ||
| :param missing_heavy_hitter_dict: Heavy Hitters category count | ||
| considering two batches are two chunks of a data stream | ||
| :type missing_heavy_hitter_dict: Dict | ||
| :param len1: number of samples in batch 1 | ||
| :type len1: int | ||
| :param len2: number of samples in batch 2 | ||
| :type len2: int | ||
| :param max_num_heavy_hitters: value used to define | ||
| the threshold for minimum frequency required by a category to be counted | ||
| :type max_num_heavy_hitters: int | ||
| :return: cms1, categories, max_num_heavy_hitters | ||
| """ | ||
| try: | ||
| cms3 = datasketches.count_min_sketch( | ||
| self.cms_num_hashes, self.cms_num_buckets | ||
| ) | ||
| cms3.merge(cms1) | ||
| cms3.merge(cms2) | ||
| except ValueError as err: | ||
| raise err( | ||
| """Incompatible sketch configuration. When merging two sketches, | ||
| they must have the same number of buckets and hashes, | ||
| which are defined by cms_confidence and cms_relative_error options, | ||
| respectively.""" | ||
| ) | ||
|
|
||
| # re-collecting the estimates of non intersecting categories before | ||
| # re-applying heavy-hitters to the aggregate profile. | ||
| heavy_hitter_dict1 = heavy_hitter_dict1.copy() | ||
| heavy_hitter_dict2 = heavy_hitter_dict2.copy() | ||
| for k in (x for x in heavy_hitter_dict1 if x not in heavy_hitter_dict2): | ||
| heavy_hitter_dict2[k] = cms2.get_estimate(k) | ||
| for k in (x for x in heavy_hitter_dict2 if x not in heavy_hitter_dict1): | ||
| heavy_hitter_dict1[k] = cms1.get_estimate(k) | ||
|
|
||
| categories = utils.add_nested_dictionaries( | ||
| heavy_hitter_dict2, heavy_hitter_dict1 | ||
| ) | ||
|
|
||
| # This is a catch all for edge cases where batch heavy hitters under estimates | ||
| # frequencies compared to treated as a sequence of batches as part of | ||
| # the same stream. | ||
| categories.update(missing_heavy_hitter_dict) | ||
tyfarnan marked this conversation as resolved.
Show resolved
Hide resolved
|
||
|
|
||
| total_samples = len1 + len2 | ||
| for cat in list(categories): | ||
| if categories[cat] < (total_samples / max_num_heavy_hitters): | ||
| categories.pop(cat) | ||
| return cms3, categories, max_num_heavy_hitters | ||
|
|
||
| @BaseColumnProfiler._timeit(name="categories") | ||
| def _update_categories( | ||
| self, | ||
|
|
@@ -345,13 +501,36 @@ def _update_categories( | |
| :type df_series: pandas.DataFrame | ||
| :return: None | ||
| """ | ||
| category_count = df_series.value_counts(dropna=False).to_dict() | ||
tyfarnan marked this conversation as resolved.
Show resolved
Hide resolved
|
||
| self._categories = utils.add_nested_dictionaries( | ||
| self._categories, category_count | ||
| ) | ||
| self._update_stop_condition(df_series) | ||
| if self._stop_condition_is_met: | ||
| self._categories = {} | ||
| if self.cms is not None: | ||
| if self._cms_max_num_heavy_hitters is None: | ||
| raise ValueError( | ||
| "when using CMS, cms_max_num_heavy_hitters must be an integer" | ||
| ) | ||
| len_df = len(df_series) | ||
| ( | ||
| cms, | ||
| heavy_hitter_dict, | ||
| missing_heavy_hitter_dict, | ||
| ) = self._get_categories_cms(df_series, len_df) | ||
|
|
||
| self.cms, self._categories, _ = self._merge_categories_cms( | ||
| cms, | ||
| heavy_hitter_dict, | ||
| len_df, | ||
| missing_heavy_hitter_dict, | ||
| self.cms, | ||
| self._categories, | ||
| self.sample_size, | ||
| self._cms_max_num_heavy_hitters, | ||
| ) | ||
| else: | ||
| category_count = df_series.value_counts(dropna=False).to_dict() | ||
| self._categories = utils.add_nested_dictionaries( | ||
| self._categories, category_count | ||
| ) | ||
| self._update_stop_condition(df_series) | ||
| if self._stop_condition_is_met: | ||
| self._categories = {} | ||
|
|
||
| def _update_helper(self, df_series_clean: Series, profile: dict) -> None: | ||
| """ | ||
|
|
||
Oops, something went wrong.
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
Uh oh!
There was an error while loading. Please reload this page.