Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
26 commits
Select commit Hold shift + click to select a range
3d25015
WIP cms implementation
tyfarnan Jun 20, 2023
94e53ec
add heavy hitters implementation
tyfarnan Jun 20, 2023
1ff4d02
add heavy hitters implementation
tyfarnan Jun 20, 2023
8c750a6
WIP: mypy issue
tyfarnan Jun 21, 2023
2f27a92
WIP: mypy issue
tyfarnan Jun 21, 2023
b7feeb5
add cms bool and refactor options handler
tyfarnan Jun 21, 2023
466c76a
WIP: testing for CMS
tyfarnan Jun 21, 2023
aeb19ae
WIP: testing for CMS
tyfarnan Jun 21, 2023
e7f84d1
use new heavy_hitters_threshold, add test for it
tyfarnan Jun 21, 2023
3132b0e
feat: add dev to workfow for testing (#897)
JGSweets Jun 22, 2023
3b307b8
allow for hybrid handling of stream and batch for CMS
tyfarnan Jun 23, 2023
085024a
allow for hybrid handling of stream and batch for CMS
tyfarnan Jun 23, 2023
efa958b
add tests for edge cases and various ways of merging profiles with cms
tyfarnan Jun 23, 2023
05526c6
Merge branch 'dev' into cms_for_categorical
tyfarnan Jun 23, 2023
9aa87e7
include new CMS options, add test condition for them
tyfarnan Jun 26, 2023
abb4db8
var renames, add conditions for adding profiles, remove serialization
tyfarnan Jun 26, 2023
ddd212e
update tests for new conditions
tyfarnan Jun 26, 2023
3be6229
handle case where two profiles have different max_num_heavy_hitters
tyfarnan Jun 26, 2023
16e1247
change default test value for max_num_heavy_hitters
tyfarnan Jun 26, 2023
b91384b
move edge case condition inside _merge_categorical_cms, modify test t…
tyfarnan Jun 27, 2023
4666681
add tests for edge cases, refactor
tyfarnan Jun 27, 2023
dacbb1a
scipy req limit
tyfarnan Jun 27, 2023
f843a0f
update docstrings for new CMS params
tyfarnan Jun 27, 2023
7785d97
update default val in expected results
tyfarnan Jun 27, 2023
66f30a5
Merge branch 'dev' into cms_for_categorical
tyfarnan Jun 27, 2023
81ac139
Update dataprofiler/profilers/categorical_column_profile.py
tyfarnan Jun 27, 2023
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion .pre-commit-config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,7 @@ repos:
networkx>=2.5.1,
typing-extensions>=3.10.0.2,
HLL>=2.0.3,
datasketches>=4.1.0,

# requirements-dev.txt
check-manifest>=0.48,
Expand Down Expand Up @@ -109,7 +110,7 @@ repos:
additional_dependencies: ['h5py', 'wheel', 'future', 'numpy', 'pandas',
'python-dateutil', 'pytz', 'pyarrow', 'chardet', 'fastavro',
'python-snappy', 'charset-normalizer', 'psutil', 'scipy', 'requests',
'networkx','typing-extensions', 'HLL']
'networkx','typing-extensions', 'HLL', 'datasketches']
# Pyupgrade - standardize and modernize Python syntax for newer versions of the language
- repo: https://github.com/asottile/pyupgrade
rev: v3.3.0
Expand Down
271 changes: 225 additions & 46 deletions dataprofiler/profilers/categorical_column_profile.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
from operator import itemgetter
from typing import cast

import datasketches
from pandas import DataFrame, Series

from . import BaseColumnProfiler, utils
Expand Down Expand Up @@ -52,6 +53,11 @@ def __init__(self, name: str | None, options: CategoricalOptions = None) -> None

self._stopped_at_unique_ratio: float | None = None
self._stopped_at_unique_count: int | None = None

self._cms_max_num_heavy_hitters: int | None = 5000
self.cms_num_hashes: int | None = None
self.cms_num_buckets: int | None = None
self.cms: datasketches.countminsketch | None = None
if options:
self._top_k_categories = options.top_k_categories
self.stop_condition_unique_value_ratio = (
Expand All @@ -61,6 +67,20 @@ def __init__(self, name: str | None, options: CategoricalOptions = None) -> None
options.max_sample_size_to_check_stop_condition
)

if options.cms:
self._cms_max_num_heavy_hitters = options.cms_max_num_heavy_hitters
self.cms_num_hashes = datasketches.count_min_sketch.suggest_num_hashes(
options.cms_confidence
)
self.cms_num_buckets = (
datasketches.count_min_sketch.suggest_num_buckets(
options.cms_relative_error
)
)
self.cms = datasketches.count_min_sketch(
self.cms_num_hashes, self.cms_num_buckets
)

def __add__(self, other: CategoricalColumn) -> CategoricalColumn:
"""
Merge the properties of two CategoricalColumn profiles.
Expand All @@ -83,53 +103,88 @@ def __add__(self, other: CategoricalColumn) -> CategoricalColumn:
self._merge_calculations(
merged_profile.__calculations, self.__calculations, other.__calculations
)
# If both profiles have not met stop condition
if not (self._stop_condition_is_met or other._stop_condition_is_met):
merged_profile._categories = utils.add_nested_dictionaries(
self._categories, other._categories

if self.cms and other.cms:

assert isinstance(self._cms_max_num_heavy_hitters, int)
assert isinstance(other._cms_max_num_heavy_hitters, int)
cms_max_num_heavy_hitters: int = min(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

i'd say we could have assigned here and not output at merge (currently just passing through), but that can be a future refactor.

self._cms_max_num_heavy_hitters, other._cms_max_num_heavy_hitters
)

# Transfer stop condition variables of 1st profile object to merged profile
# if they are not None else set to 2nd profile
profile1_product = self.sample_size * self.unique_ratio
profile2_product = other.sample_size * other.unique_ratio
if profile1_product < profile2_product:
merged_profile.max_sample_size_to_check_stop_condition = (
self.max_sample_size_to_check_stop_condition
)
merged_profile.stop_condition_unique_value_ratio = (
self.stop_condition_unique_value_ratio
)
else:
merged_profile.stop_condition_unique_value_ratio = (
other.stop_condition_unique_value_ratio
)
merged_profile.max_sample_size_to_check_stop_condition = (
other.max_sample_size_to_check_stop_condition
(
merged_profile.cms,
merged_profile._categories,
merged_profile._cms_max_num_heavy_hitters,
) = self._merge_categories_cms(
self.cms,
self._categories,
self.sample_size,
{},
other.cms,
other._categories,
other.sample_size,
cms_max_num_heavy_hitters,
)

elif not self.cms and not other.cms:
# If both profiles have not met stop condition
if not (self._stop_condition_is_met or other._stop_condition_is_met):
merged_profile._categories = utils.add_nested_dictionaries(
self._categories, other._categories
)

# Check merged profile w/ stop condition
if merged_profile._check_stop_condition_is_met(
merged_profile.sample_size, merged_profile.unique_ratio
):
merged_profile._stopped_at_unique_ratio = merged_profile.unique_ratio
merged_profile._stopped_at_unique_count = merged_profile.unique_count
# Transfer stop condition variables of 1st profile object to
# merged profile if they are not None else set to 2nd profile
profile1_product = self.sample_size * self.unique_ratio
profile2_product = other.sample_size * other.unique_ratio
if profile1_product < profile2_product:
merged_profile.max_sample_size_to_check_stop_condition = (
self.max_sample_size_to_check_stop_condition
)
merged_profile.stop_condition_unique_value_ratio = (
self.stop_condition_unique_value_ratio
)
else:
merged_profile.stop_condition_unique_value_ratio = (
other.stop_condition_unique_value_ratio
)
merged_profile.max_sample_size_to_check_stop_condition = (
other.max_sample_size_to_check_stop_condition
)

# Check merged profile w/ stop condition
if merged_profile._check_stop_condition_is_met(
merged_profile.sample_size, merged_profile.unique_ratio
):
merged_profile._stopped_at_unique_ratio = (
merged_profile.unique_ratio
)
merged_profile._stopped_at_unique_count = (
merged_profile.unique_count
)
merged_profile._categories = {}
merged_profile._stop_condition_is_met = True

else:
if self.sample_size > other.sample_size:
merged_profile._stopped_at_unique_ratio = self.unique_ratio
merged_profile._stopped_at_unique_count = self.unique_count
merged_profile.sample_size = self.sample_size
else:
merged_profile._stopped_at_unique_ratio = other.unique_ratio
merged_profile._stopped_at_unique_count = other.unique_count
merged_profile.sample_size = other.sample_size

# If either profile has hit stop condition, remove categories dict
merged_profile._categories = {}
merged_profile._stop_condition_is_met = True

else:
if self.sample_size > other.sample_size:
merged_profile._stopped_at_unique_ratio = self.unique_ratio
merged_profile._stopped_at_unique_count = self.unique_count
merged_profile.sample_size = self.sample_size
else:
merged_profile._stopped_at_unique_ratio = other.unique_ratio
merged_profile._stopped_at_unique_count = other.unique_count
merged_profile.sample_size = other.sample_size

# If either profile has hit stop condition, remove categories dict
merged_profile._categories = {}
merged_profile._stop_condition_is_met = True
raise Exception(
"Unable to add two profiles: One is using count min sketch"
"and the other is using full."
)

return merged_profile

Expand Down Expand Up @@ -323,6 +378,107 @@ def _update_stop_condition(self, data: DataFrame):
self._stopped_at_unique_ratio = merged_unique_ratio
self._stopped_at_unique_count = merged_unique_count

@BaseColumnProfiler._timeit(name="categories")
def _get_categories_cms(self, df_series, len_df):
"""Return count min sketch and heavy hitters for both the batch and stream case.

:param df_series: Series currently being processed by categorical profiler
:type df_series: Series
:param len_df: the total number of samples iin df_series
:type len_df: int
:return: cms, heavy_hitter_dict, missing_heavy_hitter_dict
"""
cms = datasketches.count_min_sketch(self.cms_num_hashes, self.cms_num_buckets)
heavy_hitter_dict = defaultdict(int)
missing_heavy_hitter_dict = defaultdict(int)
for i, value in enumerate(df_series):
cms.update(value)
i_count = cms.get_estimate(value)
i_total_count = i_count + self.cms.get_estimate(value)
# approximate heavy-hitters
if i_count >= int(len_df / self._cms_max_num_heavy_hitters):
heavy_hitter_dict[value] = i_count
missing_heavy_hitter_dict.pop(value, None)
elif i_total_count >= int(
(self.sample_size + len_df) / self._cms_max_num_heavy_hitters
):
missing_heavy_hitter_dict[value] = i_total_count
heavy_hitter_dict.pop(value, None)

return cms, heavy_hitter_dict, missing_heavy_hitter_dict

@BaseColumnProfiler._timeit(name="categories")
def _merge_categories_cms(
self,
cms1,
heavy_hitter_dict1,
len1,
missing_heavy_hitter_dict,
cms2,
heavy_hitter_dict2,
len2,
max_num_heavy_hitters,
):
"""Return the aggregate count min sketch and approximate histogram (categories).

:param cms1: count min sketch
:type cms1: datasketches.countminsketch
:param cms2: count min sketch
:type cms2: datasketches.countminsketch
:param heavy_hitter_dict1: Heavy Hitters category count
:type heavy_hitter_dict1: Dict
:param heavy_hitter_dict2: Heavy Hitters category count
:type heavy_hitter_dict2: Dict
:param missing_heavy_hitter_dict: Heavy Hitters category count
considering two batches are two chunks of a data stream
:type missing_heavy_hitter_dict: Dict
:param len1: number of samples in batch 1
:type len1: int
:param len2: number of samples in batch 2
:type len2: int
:param max_num_heavy_hitters: value used to define
the threshold for minimum frequency required by a category to be counted
:type max_num_heavy_hitters: int
:return: cms1, categories, max_num_heavy_hitters
"""
try:
cms3 = datasketches.count_min_sketch(
self.cms_num_hashes, self.cms_num_buckets
)
cms3.merge(cms1)
cms3.merge(cms2)
except ValueError as err:
raise err(
"""Incompatible sketch configuration. When merging two sketches,
they must have the same number of buckets and hashes,
which are defined by cms_confidence and cms_relative_error options,
respectively."""
)

# re-collecting the estimates of non intersecting categories before
# re-applying heavy-hitters to the aggregate profile.
heavy_hitter_dict1 = heavy_hitter_dict1.copy()
heavy_hitter_dict2 = heavy_hitter_dict2.copy()
for k in (x for x in heavy_hitter_dict1 if x not in heavy_hitter_dict2):
heavy_hitter_dict2[k] = cms2.get_estimate(k)
for k in (x for x in heavy_hitter_dict2 if x not in heavy_hitter_dict1):
heavy_hitter_dict1[k] = cms1.get_estimate(k)

categories = utils.add_nested_dictionaries(
heavy_hitter_dict2, heavy_hitter_dict1
)

# This is a catch all for edge cases where batch heavy hitters under estimates
# frequencies compared to treated as a sequence of batches as part of
# the same stream.
categories.update(missing_heavy_hitter_dict)

total_samples = len1 + len2
for cat in list(categories):
if categories[cat] < (total_samples / max_num_heavy_hitters):
categories.pop(cat)
return cms3, categories, max_num_heavy_hitters

@BaseColumnProfiler._timeit(name="categories")
def _update_categories(
self,
Expand All @@ -345,13 +501,36 @@ def _update_categories(
:type df_series: pandas.DataFrame
:return: None
"""
category_count = df_series.value_counts(dropna=False).to_dict()
self._categories = utils.add_nested_dictionaries(
self._categories, category_count
)
self._update_stop_condition(df_series)
if self._stop_condition_is_met:
self._categories = {}
if self.cms is not None:
if self._cms_max_num_heavy_hitters is None:
raise ValueError(
"when using CMS, cms_max_num_heavy_hitters must be an integer"
)
len_df = len(df_series)
(
cms,
heavy_hitter_dict,
missing_heavy_hitter_dict,
) = self._get_categories_cms(df_series, len_df)

self.cms, self._categories, _ = self._merge_categories_cms(
cms,
heavy_hitter_dict,
len_df,
missing_heavy_hitter_dict,
self.cms,
self._categories,
self.sample_size,
self._cms_max_num_heavy_hitters,
)
else:
category_count = df_series.value_counts(dropna=False).to_dict()
self._categories = utils.add_nested_dictionaries(
self._categories, category_count
)
self._update_stop_condition(df_series)
if self._stop_condition_is_met:
self._categories = {}

def _update_helper(self, df_series_clean: Series, profile: dict) -> None:
"""
Expand Down
Loading