Skip to content

Commit

Permalink
Zyp: A compact transformation engine
Browse files Browse the repository at this point in the history
A data model and implementation for a compact transformation engine
written in Python.

- Based on JSON Pointer (RFC 6901), JMESPath, and transon
- Implemented using `attrs` and `cattrs`
- Includes built-in transformation functions `to_datetime` and
  `to_unixtime`
- Ability to marshal and unmarshal its representation to/from JSON and
  YAML
  • Loading branch information
amotl committed Aug 11, 2024
1 parent 998ce02 commit d96f8f9
Show file tree
Hide file tree
Showing 36 changed files with 1,681 additions and 12 deletions.
64 changes: 63 additions & 1 deletion .github/workflows/tests.yml
Original file line number Diff line number Diff line change
Expand Up @@ -104,7 +104,7 @@ jobs:
pip install "setuptools>=64" --upgrade
# Install package in editable mode.
pip install --use-pep517 --prefer-binary --editable=.[develop,test,mongodb]
pip install --use-pep517 --prefer-binary --editable=.[mongodb,develop,test]
- name: Run linters and software tests
run: poe check
Expand All @@ -120,3 +120,65 @@ jobs:
env_vars: OS,PYTHON
name: codecov-umbrella
fail_ci_if_error: true


test-zyp:
name: "
Zyp: Python ${{ matrix.python-version }}
"
runs-on: ${{ matrix.os }}
strategy:
fail-fast: false
matrix:
os: ['ubuntu-latest']
python-version: ['3.8', '3.9', '3.12']

env:
OS: ${{ matrix.os }}
PYTHON: ${{ matrix.python-version }}

steps:

- name: Acquire sources
uses: actions/checkout@v4

- name: Set up Python ${{ matrix.python-version }}
uses: actions/setup-python@v5
with:
python-version: ${{ matrix.python-version }}
architecture: x64
cache: 'pip'
cache-dependency-path:
pyproject.toml

- name: Set up project
run: |
# `setuptools 0.64.0` adds support for editable install hooks (PEP 660).
# https://github.com/pypa/setuptools/blob/main/CHANGES.rst#v6400
pip install "setuptools>=64" --upgrade
# Install package in editable mode.
pip install --use-pep517 --prefer-binary --editable=.[zyp,develop,test]
- name: Set timezone
uses: szenius/set-timezone@v2.0
with:
timezoneLinux: "Europe/Berlin"
timezoneMacos: "Europe/Berlin"
timezoneWindows: "European Standard Time"

- name: Run linters and software tests
run: poe check

# https://github.com/codecov/codecov-action
- name: Upload coverage results to Codecov
uses: codecov/codecov-action@v4
env:
CODECOV_TOKEN: ${{ secrets.CODECOV_TOKEN }}
with:
files: ./coverage.xml
flags: zyp
env_vars: OS,PYTHON
name: codecov-umbrella
fail_ci_if_error: true
2 changes: 2 additions & 0 deletions CHANGES.md
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
# Changelog

## Unreleased
- Added `BucketTransformation`, a minimal transformation engine
based on JSON Pointer (RFC 6901).
- Added documentation using Sphinx and Read the Docs

## 2024/08/05 v0.0.3
Expand Down
15 changes: 10 additions & 5 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,6 @@
[![License](https://img.shields.io/pypi/l/commons-codec.svg)](https://pypi.org/project/commons-codec/)

## About
Data decoding, encoding, conversion, and translation utilities.

> A codec is a device or computer program that encodes or decodes a data stream or signal.
> Codec is a portmanteau of coder/decoder.
Expand All @@ -21,13 +20,18 @@ Data decoding, encoding, conversion, and translation utilities.
> -- https://en.wikipedia.org/wiki/Codec
## What's Inside
- **Decoders:** A collection of reusable utilities with minimal dependencies for
transcoding purposes, mostly collected from other projects like
- [Zyp], a generic and compact **transformation engine** written in Python, for data
decoding, encoding, conversion, translation, transformation, and cleansing purposes,
to be used as a data processing pipeline element.

- **Transformer** components for converging [Change Data Capture (CDC)] messages to
SQL statements.

- A collection of reusable utilities with minimal dependencies for
**decoding and transcoding** purposes, mostly collected from other projects like
[Kotori](https://kotori.readthedocs.io/) and [LorryStream](https://lorrystream.readthedocs.io/),
in order to provide them per standalone package for broader use cases.

- Transformers for [Change Data Capture (CDC)] messages to SQL statements.

## Installation
The package is available from [PyPI] at [commons-codec].
To install the most recent version, run:
Expand Down Expand Up @@ -74,3 +78,4 @@ within the header sections of relevant files.
[managed on GitHub]: https://github.com/daq-tools/commons-codec
[prior art]: https://commons-codec.readthedocs.io/prior-art.html
[PyPI]: https://pypi.org/
[Zyp]: https://commons-codec.readthedocs.io/zyp/
3 changes: 3 additions & 0 deletions doc/backlog.md
Original file line number Diff line number Diff line change
Expand Up @@ -12,3 +12,6 @@
- [ ] MongoDB: Implement stream resumption using `start_after`
- [ ] Feature: Filter by events, e.g. Ignore "delete" events?
- [ ] Integration Testing the "example" programs?
- [ ] Improve capabilities of DMS translator
https://github.com/daq-tools/commons-codec/issues/11
- https://github.com/supabase/pg_replicate
3 changes: 2 additions & 1 deletion doc/index.md
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@
| [LorryStream]

```{include} readme.md
:start-line: 12
:start-line: 11
```


Expand All @@ -32,6 +32,7 @@
:caption: Documentation
:hidden:
zyp/index
cdc/index
decode
```
Expand Down
48 changes: 48 additions & 0 deletions doc/zyp/backlog.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
# Zyp Backlog

## Iteration +1
- Refactor module namespace to `zyp`
- Documentation
- CLI interface
- Apply to MongoDB Table Loader in CrateDB Toolkit

## Iteration +2
Demonstrate!
- math expressions
- omit key (recursively)
- combine keys
- filter on keys and/or values
- Pathological cases like "Not defined" in typed fields like `TIMESTAMP`
- Use simpleeval, like Meltano, and provide the same built-in functions
- https://sdk.meltano.com/en/v0.39.1/stream_maps.html#other-built-in-functions-and-names
- https://github.com/MeltanoLabs/meltano-map-transform/pull/255
- https://github.com/MeltanoLabs/meltano-map-transform/issues/252
- Use JSONPath, see https://sdk.meltano.com/en/v0.39.1/code_samples.html#use-a-jsonpath-expression-to-extract-the-next-page-url-from-a-hateoas-response

## Iteration +3
- Moksha transformations on Buckets
- Investigate using JSON Schema
- Fluent API interface
- https://github.com/Halvani/alphabetic
- Mappers do not support external API lookups.
To add external API lookups, you can either (a) land all your data and
then joins using a transformation tool like dbt, or (b) create a custom
mapper plugin with inline lookup logic.
=> Example from Luftdatenpumpe, using a reverse geocoder
- [ ] Define schema
https://sdk.meltano.com/en/latest/typing.html
- https://docs.meltano.com/guide/v2-migration/#migrate-to-an-adapter-specific-dbt-transformer
- https://github.com/meltano/sdk/blob/v0.39.1/singer_sdk/mapper.py

## Fluent API Interface

```python

from zyp.model.fluent import FluentTransformation

transformation = FluentTransformation()
.jmes("records[?starts_with(location, 'B')]")
.rename_fields({"_id": "id"})
.convert_values({"/id": "int", "/value": "float"}, type="pointer-python")
.jq(".[] |= (.value /= 100)")
```
194 changes: 194 additions & 0 deletions doc/zyp/index.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,194 @@
# Zyp Transformations

## About
A data model and implementation for a compact transformation engine written
in [Python], based on [JSON Pointer] (RFC 6901), [JMESPath], and [transon],
implemented using [attrs] and [cattrs].

## Ideas
:Conciseness:
Define a multistep data refinement process with as little code as possible.
:Low Footprint:
Doesn't need any infrastructure or pipeline framework. It's just a little library.
:Interoperability:
Marshal transformation recipe definition to/from text-only representations (JSON,
YAML), in order to encourage implementations in other languages.
:Performance:
Well, it is written in Python. Fragments can be re-written in Rust, when applicable.
:Immediate:
Other ETL frameworks and concepts often need to first land your data in the target
system before applying subsequent transformations. Zyp is working directly within
the data pipeline, before data is inserted into the target system.

## Synopsis I
A basic transformation example for individual data records.

```python
from zyp.model.bucket import BucketTransformation, FieldRenamer, ValueConverter

# Consider a slightly messy collection of records.
data_in = [
{"_id": "123", "name": "device-foo", "reading": "42.42"},
{"_id": "456", "name": "device-bar", "reading": -84.01},
]

# Define a transformation that renames the `_id` field to `id`,
# casts its value to `int`, and casts the `reading` field to `float`.
transformation = BucketTransformation(
names=FieldRenamer().add(old="_id", new="id"),
values=ValueConverter()
.add(pointer="/id", transformer="builtins.int")
.add(pointer="/reading", transformer="builtins.float"),
)

for record in data_in:
print(transformation.apply(record))
```
The result is a transformed data collection.
```json
[
{"id": 123, "name": "device-foo", "reading": 42.42},
{"id": 456, "name": "device-bar", "reading": -84.01}
]
```

## Synopsis II
A more advanced transformation example for a collection of data records.

Consider a messy collection of input data.
- The actual collection is nested within the top-level `records` item.
- `_id` fields are conveyed in string format.
- `value` fields include both integer and string values.
- `value` fields are fixed-point values, using a scaling factor of `100`.
- The collection includes invalid `null` records.
Those records usually trip processing when, for example, filtering on object items.
```python
data_in = {
"message-source": "system-3000",
"message-type": "eai-warehouse",
"records": [
{"_id": "12", "meta": {"name": "foo", "location": "B"}, "data": {"value": "4242"}},
None,
{"_id": "34", "meta": {"name": "bar", "location": "BY"}, "data": {"value": -8401}},
{"_id": "56", "meta": {"name": "baz", "location": "NI"}, "data": {"value": 2323}},
{"_id": "78", "meta": {"name": "qux", "location": "NRW"}, "data": {"value": -580}},
None,
None,
],
}
```

Consider after applying a corresponding transformation, the expected outcome is a
collection of valid records, optionally filtered, and values adjusted according
to relevant type hints and other conversions.
```python
data_out = [
{"id": 12, "meta": {"name": "foo", "location": "B"}, "data": {"value": 42.42}},
{"id": 34, "meta": {"name": "bar", "location": "BY"}, "data": {"value": -84.01}},
]
```

Let's come up with relevant pre-processing rules to cleanse and mangle the shape of the
input collection. In order to make this example more exciting, let's include two special
needs:
- Filter input collection by value of nested element.
- Rename top-level fields starting with underscore `_`.

Other than those special rules, the fundamental ones to re-shape the data are:
- Unwrap `records` attribute from container dictionary into actual collection.
- Filter collection, both by omitting invalid/empty records, and by applying query
constrains.
- On each record, rename the top-level `_id` field to `id`.
- On each record, adjust the data types of the `id` and `value` fields.
- Postprocess collection, applying a custom scaling factor to the `value` field.

Zyp let's you concisely write those rules down, using the Python language.

```python
from zyp.model.bucket import BucketTransformation, FieldRenamer, ValueConverter
from zyp.model.collection import CollectionTransformation
from zyp.model.moksha import MokshaTransformation

transformation = CollectionTransformation(
pre=MokshaTransformation().jmes("records[?not_null(meta.location) && !starts_with(meta.location, 'N')]"),
bucket=BucketTransformation(
names=FieldRenamer().add(old="_id", new="id"),
values=ValueConverter()
.add(pointer="/id", transformer="builtins.int")
.add(pointer="/data/value", transformer="builtins.float"),
),
post=MokshaTransformation().jq(".[] |= (.data.value /= 100)"),
)

data_out = transformation.apply(data_in)
```
Alternatively, serialize the `zyp-collection` transformation description,
for example into YAML format.
```python
print(transformation.to_yaml())
```
```yaml
meta:
version: 1
type: zyp-collection
pre:
rules:
- expression: records[?not_null(meta.location) && !starts_with(meta.location, 'N')]
type: jmes
bucket:
names:
rules:
- new: id
old: _id
values:
rules:
- args: []
pointer: /id
transformer: builtins.int
- args: []
pointer: /data/value
transformer: builtins.float
post:
rules:
- expression: .[] |= (.data.value /= 100)
type: jq
```
## Prior Art
- [Singer Transformer]
- [PipelineWise Transformations]
- [singer-transform]
- [Meltano Inline Data Mapping]
- [Meltano Inline Stream Maps]
- [AWS DMS source filter rules]
- [AWS DMS table selection and transformation rules]
- ... and many more. Thanks for the inspirations.
## Etymology
With kudos to [Kris Zyp] for conceiving [JSON Pointer].
## More
```{toctree}
:maxdepth: 1

research
backlog
```



[attrs]: https://www.attrs.org/
[AWS DMS source filter rules]: https://docs.aws.amazon.com/dms/latest/userguide/CHAP_Tasks.CustomizingTasks.Filters.html
[AWS DMS table selection and transformation rules]: https://docs.aws.amazon.com/dms/latest/userguide/CHAP_Tasks.CustomizingTasks.TableMapping.SelectionTransformation.html
[cattrs]: https://catt.rs/
[Kris Zyp]: https://github.com/kriszyp
[JMESPath]: https://jmespath.org/
[JSON Pointer]: https://datatracker.ietf.org/doc/html/rfc6901
[Meltano Inline Data Mapping]: https://docs.meltano.com/guide/mappers/
[Meltano Inline Stream Maps]: https://sdk.meltano.com/en/latest/stream_maps.html
[PipelineWise Transformations]: https://transferwise.github.io/pipelinewise/user_guide/transformations.html
[Python]: https://en.wikipedia.org/wiki/Python_(programming_language)
[Singer Transformer]: https://github.com/singer-io/singer-python/blob/master/singer/transform.py
[singer-transform]: https://github.com/dkarzon/singer-transform
[transon]: https://transon-org.github.io/
Loading

0 comments on commit d96f8f9

Please sign in to comment.