Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Zyp Treatments: A slightly tailored transformation subsystem #46

Merged
merged 4 commits into from
Sep 10, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion CHANGES.md
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
# Changelog

## Unreleased
- Added Zyp Treatments, a slightly tailored transformation subsystem

## 2024/09/02 v0.0.14
- Replace poor man's relation name quoting with implementation
Expand Down Expand Up @@ -41,7 +42,7 @@
column. This allows defining primary keys on the sink table.

## 2024/08/14 v0.0.4
- Added `BucketTransformation`, a minimal transformation engine
- Added Zyp Transformations, a minimal transformation engine
based on JSON Pointer (RFC 6901).
- Added documentation using Sphinx and Read the Docs

Expand Down
1 change: 1 addition & 0 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -247,6 +247,7 @@ python_files = [
xfail_strict = true
markers = [
"dynamodb",
"integration",
"mongodb",
"tasmota",
"wemos",
Expand Down
4 changes: 4 additions & 0 deletions src/zyp/model/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,10 @@

from zyp.util.data import no_privates_no_nulls_no_empties

Record = t.Dict[str, t.Any]
Collection = t.List[Record]
DictOrList = t.Union[Record, Collection]


@define
class Metadata:
Expand Down
4 changes: 0 additions & 4 deletions src/zyp/model/bucket.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,11 +17,7 @@
logger = logging.getLogger(__name__)


Record = t.Dict[str, t.Any]
Collection = t.List[Record]
DictOrList = t.Union[Record, Collection]
TransonTemplate = t.Dict[str, t.Any]

MokshaTransformer = t.Union[jmespath.parser.ParsedResult, jq._Program, transon.Transformer]


Expand Down
11 changes: 8 additions & 3 deletions src/zyp/model/collection.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,10 @@

from attrs import define

from zyp.model.base import Dumpable, Metadata, SchemaDefinition
from zyp.model.bucket import BucketTransformation, Collection, DictOrList
from zyp.model.base import Collection, DictOrList, Dumpable, Metadata, SchemaDefinition
from zyp.model.bucket import BucketTransformation
from zyp.model.moksha import MokshaTransformation
from zyp.model.treatment import Treatment


@define(frozen=True)
Expand All @@ -21,16 +22,20 @@ class CollectionTransformation(Dumpable):
pre: t.Union[MokshaTransformation, None] = None
bucket: t.Union[BucketTransformation, None] = None
post: t.Union[MokshaTransformation, None] = None
treatment: t.Union[Treatment, None] = None

def apply(self, data: DictOrList) -> Collection:
collection = t.cast(Collection, data)
collection_out = collection
if self.pre:
collection = t.cast(Collection, self.pre.apply(collection))
collection_out: Collection = []
if self.bucket:
collection_out = []
for item in collection:
item = self.bucket.apply(item)
collection_out.append(item)
if self.post:
collection_out = t.cast(Collection, self.post.apply(collection_out))
if self.treatment:
collection_out = t.cast(Collection, self.treatment.apply(collection_out))
return collection_out
3 changes: 2 additions & 1 deletion src/zyp/model/moksha.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,8 @@
from attr import Factory
from attrs import define

from zyp.model.bucket import ConverterBase, DictOrList, MokshaTransformer, TransonTemplate
from zyp.model.base import DictOrList
from zyp.model.bucket import ConverterBase, MokshaTransformer, TransonTemplate
from zyp.util.expression import compile_expression


Expand Down
131 changes: 131 additions & 0 deletions src/zyp/model/treatment.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,131 @@
import builtins
import typing as t

from attr import Factory
from attrs import define

from zyp.model.base import Collection, DictOrList, Dumpable, Record


@define
class Treatment(Dumpable):
ignore_complex_lists: bool = False
ignore_field: t.List[str] = Factory(list)
convert_list: t.List[str] = Factory(list)
convert_string: t.List[str] = Factory(list)
convert_dict: t.List[t.Dict[str, str]] = Factory(list)
normalize_complex_lists: bool = False
prune_invalid_date: t.List[str] = Factory(list)

def apply(self, data: DictOrList) -> DictOrList:
if isinstance(data, dict):
self.apply_record(data)
return {k: self.apply(v) for (k, v) in data.items()}
elif isinstance(data, list):
return t.cast(list, [self.apply(v) for v in data])
return data

def apply_record(self, data: Record) -> Record:
# Optionally ignore lists of complex objects.
local_ignores = []
if self.ignore_complex_lists:
for k, v in data.items():
if self.is_list_of_dicts(v):
# Skip ignoring special-encoded items.
if v[0] and list(v[0].keys())[0].startswith("$"):
continue
local_ignores.append(k)

# Apply global and computed ignores.
for ignore_name in self.ignore_field + local_ignores:
if ignore_name in data:
del data[ignore_name]

# Apply normalization for lists of objects.
if self.normalize_complex_lists:
for _, v in data.items():
if self.is_list_of_dicts(v):
ListOfVaryingObjectsNormalizer(v).apply()

# Converge certain items to `list` even when defined differently.
for to_list_name in self.convert_list:
if to_list_name in data and not isinstance(data[to_list_name], list):
data[to_list_name] = [data[to_list_name]]

# Converge certain items to `str` even when defined differently.
for name in self.convert_string:
if name in data and not isinstance(data[name], str):
data[name] = str(data[name])

# Converge certain items to `dict` even when defined differently.
for rule in self.convert_dict:
name = rule["name"]
wrapper_name = rule["wrapper_name"]
if name in data and not isinstance(data[name], dict):
data[name] = {wrapper_name: data[name]}

# Prune invalid date representations.
for key in self.prune_invalid_date:
if key in data:
if not isinstance(data[key], dict):
del data[key]
elif "date" in data[key]:
if isinstance(data[key]["date"], str):
del data[key]

return data

@staticmethod
def is_list_of_dicts(v: t.Any) -> bool:
return isinstance(v, list) and bool(v) and isinstance(v[0], dict)


@define
class NormalizerRule:
"""
Manage details of a normalizer rule.
"""

name: str
converter: t.Callable


@define
class ListOfVaryingObjectsNormalizer:
"""
CrateDB can not store lists of varying objects, so try to normalize them.
"""

data: Collection

def apply(self):
self.apply_rules(self.get_rules(self.type_stats()))

def apply_rules(self, rules: t.List[NormalizerRule]) -> None:
for item in self.data:
for rule in rules:
name = rule.name
if name in item:
item[name] = rule.converter(item[name])

def get_rules(self, statistics) -> t.List[NormalizerRule]:
rules = []
for name, types in statistics.items():
if len(types) > 1:
rules.append(NormalizerRule(name=name, converter=self.get_best_converter(types)))
return rules

def type_stats(self) -> t.Dict[str, t.List[str]]:
types: t.Dict[str, t.List[str]] = {}
for item in self.data:
for key, value in item.items():
types.setdefault(key, []).append(type(value).__name__)
return types

@staticmethod
def get_best_converter(types: t.List[str]) -> t.Callable:
if "str" in types:
return builtins.str
if "float" in types and "int" in types and "str" not in types:
return builtins.float
return lambda x: x
1 change: 1 addition & 0 deletions tests/transform/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

RESET_TABLES = [
"from.dynamodb",
"from.generic",
]


Expand Down
1 change: 1 addition & 0 deletions tests/transform/test_dynamodb_full.py
Original file line number Diff line number Diff line change
Expand Up @@ -98,6 +98,7 @@ def test_to_sql_operation():
)


@pytest.mark.integration
def test_to_sql_cratedb(caplog, cratedb):
"""
Verify writing converted DynamoDB record to CrateDB.
Expand Down
41 changes: 41 additions & 0 deletions tests/transform/test_zyp_generic.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
import pytest

from commons_codec.model import SQLOperation
from zyp.model.treatment import Treatment


@pytest.mark.integration
def test_normalize_list_of_objects(caplog, cratedb):
"""
Verify writing record to CrateDB, with transformations.
"""

record_in = {
"_list_float_int": [{"abc": 42.42}, {"abc": 42}],
"_list_float_none": [{"id": 1, "abc": 42.42}, {"id": 2, "abc": None}],
"_list_int_str": [{"abc": 123}, {"abc": "123"}],
}

record_out = {
"_list_float_int": [{"abc": 42.42}, {"abc": 42.0}],
"_list_float_none": [{"id": 1, "abc": 42.42}, {"id": 2}],
"_list_int_str": [{"abc": "123"}, {"abc": "123"}],
}

# Define CrateDB SQL DDL and DML operations (SQL+parameters).
operation_ddl = SQLOperation('CREATE TABLE "from".generic (data OBJECT(DYNAMIC))', None)
operation_dml = SQLOperation('INSERT INTO "from".generic (data) VALUES (:data)', {"data": record_in})

# Apply treatment to parameters.
parameters = operation_dml.parameters
Treatment(normalize_complex_lists=True).apply(parameters)

# Insert into CrateDB.
cratedb.database.run_sql(operation_ddl.statement)
cratedb.database.run_sql(operation_dml.statement, parameters)

# Verify data in target database.
assert cratedb.database.refresh_table("from.generic") is True

results = cratedb.database.run_sql('SELECT * FROM "from".generic;', records=True) # noqa: S608
assert results[0]["data"] == record_out
21 changes: 17 additions & 4 deletions tests/zyp/test_collection.py
Original file line number Diff line number Diff line change
Expand Up @@ -86,18 +86,31 @@ def test_collection_transformation_serialize():
}
dict_result = transformation.to_dict()
assert dict_result == transformation_dict
return

yaml_result = transformation.to_yaml()
assert yaml.full_load(yaml_result) == transformation_dict
CollectionTransformation.from_yaml(yaml_result)
transformation_second = CollectionTransformation.from_yaml(yaml_result)
assert isinstance(transformation_second, CollectionTransformation)


def test_collection_transformation_load_and_apply():
def test_collection_transformation_regular_load_and_apply():
"""
Verify transformation can be loaded from JSON and applied again.
Verify rule-based transformations can be loaded and applied.
"""
payload = Path("tests/zyp/transformation-collection.yaml").read_text()
transformation = CollectionTransformation.from_yaml(payload)
result = transformation.apply(deepcopy(ComplexRecipe.data_in))
assert result == ComplexRecipe.data_out


def test_collection_transformation_treatment_load_and_apply():
"""
Verify collection transformation with treatment can be loaded and applied.
"""
payload = Path("tests/zyp/transformation-collection-treatment.yaml").read_text()
transformation = CollectionTransformation.from_yaml(payload)
result = transformation.apply(deepcopy(ComplexRecipe.data_in))
assert result == {
"message-source": "system-3000",
"message-type": "eai-warehouse",
}
Loading