Skip to content

Commit

Permalink
Zyp Treatments: Add normalize_complex_lists option
Browse files Browse the repository at this point in the history
Because CrateDB can not store lists of varying objects, try to normalize
them, currently biased towards strings.
  • Loading branch information
amotl committed Sep 7, 2024
1 parent d40cd82 commit 1859746
Show file tree
Hide file tree
Showing 2 changed files with 75 additions and 1 deletion.
63 changes: 62 additions & 1 deletion src/zyp/model/treatment.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
import builtins
import typing as t

from attr import Factory
Expand All @@ -13,6 +14,7 @@ class Treatment(Dumpable):
convert_list: t.List[str] = Factory(list)
convert_string: t.List[str] = Factory(list)
convert_dict: t.List[t.Dict[str, str]] = Factory(list)
normalize_complex_lists: bool = False
prune_invalid_date: t.List[str] = Factory(list)

def apply(self, data: DictOrList) -> DictOrList:
Expand All @@ -28,7 +30,7 @@ def apply_record(self, data: Record) -> Record:
local_ignores = []
if self.ignore_complex_lists:
for k, v in data.items():
if isinstance(v, list) and v and isinstance(v[0], dict):
if self.is_list_of_dicts(v):
# Skip ignoring special-encoded items.
if v[0] and list(v[0].keys())[0].startswith("$"):
continue
Expand All @@ -39,6 +41,12 @@ def apply_record(self, data: Record) -> Record:
if ignore_name in data:
del data[ignore_name]

# Apply sanitation for lists of objects.
if self.normalize_complex_lists:
for _, v in data.items():
if self.is_list_of_dicts(v):
ListOfVaryingObjectsNormalizer(v).apply()

# Converge certain items to `list` even when defined differently.
for to_list_name in self.convert_list:
if to_list_name in data and not isinstance(data[to_list_name], list):
Expand Down Expand Up @@ -66,3 +74,56 @@ def apply_record(self, data: Record) -> Record:
del data[key]

return data

@staticmethod
def is_list_of_dicts(v: t.Any) -> bool:
return isinstance(v, list) and bool(v) and isinstance(v[0], dict)


@define
class SanitizerRule:
"""
Manage details of a sanitizer rule.
"""

name: str
converter: t.Callable


@define
class ListOfVaryingObjectsNormalizer:
"""
CrateDB can not store lists of varying objects, so try to normalize them.
"""

data: Collection

def apply(self):
self.apply_rules(self.get_rules(self.type_stats()))

def apply_rules(self, rules: t.List[SanitizerRule]) -> None:
for item in self.data:
for rule in rules:
name = rule.name
if name in item:
item[name] = rule.converter(item[name])

def get_rules(self, statistics) -> t.List[SanitizerRule]:
rules = []
for name, types in statistics.items():
if len(types) > 1:
rules.append(SanitizerRule(name=name, converter=self.get_best_converter(types)))
return rules

def type_stats(self) -> t.Dict[str, t.List[str]]:
types: t.Dict[str, t.List[str]] = {}
for item in self.data:
for key, value in item.items():
types.setdefault(key, []).append(type(value).__name__)
return types

@staticmethod
def get_best_converter(types: t.List[str]) -> t.Callable:
if "str" in types:
return builtins.str
return lambda x: x
13 changes: 13 additions & 0 deletions tests/zyp/test_treatment.py
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,19 @@ def test_treatment_ignore_fields():
assert transformation.apply([{"data": [{"abc": 123}]}]) == [{"data": [{}]}]


def test_treatment_normalize_complex_lists():
"""
Verify normalizing lists of objects works.
"""
transformation = Treatment(normalize_complex_lists=True)
assert transformation.apply([{"data": [{"abc": 123.42}, {"abc": 123}]}]) == [
{"data": [{"abc": 123.42}, {"abc": 123}]}
]
assert transformation.apply([{"data": [{"abc": 123}, {"abc": "123"}]}]) == [
{"data": [{"abc": "123"}, {"abc": "123"}]}
]


def test_treatment_convert_string():
"""
Verify treating nested data to convert values into strings works.
Expand Down

0 comments on commit 1859746

Please sign in to comment.