Skip to content

feat(low-code): added key_transformation to DpathFlattenFields #472

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
26 changes: 26 additions & 0 deletions airbyte_cdk/sources/declarative/declarative_component_schema.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -2307,6 +2307,27 @@ definitions:
$parameters:
type: object
additionalProperties: true
KeyTransformation:
title: Transformation to apply for extracted object keys by Dpath Flatten Fields
type: object
required:
- type
properties:
type:
type: string
enum: [ KeyTransformation ]
prefix:
title: Key Prefix
description: Prefix to add for object keys. If not provided original keys remain unchanged.
type: string
examples:
- flattened_
suffix:
title: Key Suffix
description: Suffix to add for object keys. If not provided original keys remain unchanged.
type: string
examples:
- _flattened
DpathFlattenFields:
title: Dpath Flatten Fields
description: A transformation that flatten field values to the to top of the record.
Expand Down Expand Up @@ -2335,6 +2356,11 @@ definitions:
title: Replace Origin Record
description: Whether to replace the origin record or not. Default is False.
type: boolean
key_transformation:
title: Key transformation
description: Transformation for object keys. If not provided, original key will be used.
type: object
"$ref": "#/definitions/KeyTransformation"
$parameters:
type: object
additionalProperties: true
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -879,6 +879,25 @@ class FlattenFields(BaseModel):
parameters: Optional[Dict[str, Any]] = Field(None, alias="$parameters")


class KeyTransformation(BaseModel):
prefix: Optional[Union[str, None]] = Field(
None,
description="Prefix to add for object keys. If not provided original keys remain unchanged.",
examples=[
"flattened_",
],
title="Key Prefix",
)
suffix: Optional[Union[str, None]] = Field(
None,
description="Suffix to add for object keys. If not provided original keys remain unchanged.",
examples=[
"_flattened",
],
title="Key Suffix",
)


class DpathFlattenFields(BaseModel):
type: Literal["DpathFlattenFields"]
field_path: List[str] = Field(
Expand All @@ -897,6 +916,11 @@ class DpathFlattenFields(BaseModel):
description="Whether to replace the origin record or not. Default is False.",
title="Replace Origin Record",
)
key_transformation: Optional[Union[KeyTransformation, None]] = Field(
None,
description="Transformation for object keys. If not provided, original key will be used.",
title="Key transformation",
)
parameters: Optional[Dict[str, Any]] = Field(None, alias="$parameters")


Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -498,6 +498,7 @@
from airbyte_cdk.sources.declarative.transformations.add_fields import AddedFieldDefinition
from airbyte_cdk.sources.declarative.transformations.dpath_flatten_fields import (
DpathFlattenFields,
KeyTransformation,
)
from airbyte_cdk.sources.declarative.transformations.flatten_fields import (
FlattenFields,
Expand Down Expand Up @@ -790,13 +791,24 @@ def create_dpath_flatten_fields(
self, model: DpathFlattenFieldsModel, config: Config, **kwargs: Any
) -> DpathFlattenFields:
model_field_path: List[Union[InterpolatedString, str]] = [x for x in model.field_path]
key_transformation = (
KeyTransformation(
config=config,
prefix=model.key_transformation.prefix,
suffix=model.key_transformation.suffix,
parameters=model.parameters or {},
)
if model.key_transformation is not None
else None
)
return DpathFlattenFields(
config=config,
field_path=model_field_path,
delete_origin_value=model.delete_origin_value
if model.delete_origin_value is not None
else False,
replace_record=model.replace_record if model.replace_record is not None else False,
key_transformation=key_transformation,
parameters=model.parameters or {},
)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,24 @@
from airbyte_cdk.sources.types import Config, StreamSlice, StreamState


@dataclass
class KeyTransformation:
config: Config
parameters: InitVar[Mapping[str, Any]]
prefix: Optional[str] = None
suffix: Optional[str] = None

def __post_init__(self, parameters: Mapping[str, Any]) -> None:
if self.prefix is not None:
self.prefix = InterpolatedString.create(self.prefix, parameters=parameters).eval(
self.config
)
if self.suffix is not None:
self.suffix = InterpolatedString.create(self.suffix, parameters=parameters).eval(
self.config
)


@dataclass
class DpathFlattenFields(RecordTransformation):
"""
Expand All @@ -16,6 +34,7 @@ class DpathFlattenFields(RecordTransformation):
field_path: List[Union[InterpolatedString, str]] path to the field to flatten.
delete_origin_value: bool = False whether to delete origin field or keep it. Default is False.
replace_record: bool = False whether to replace origin record or not. Default is False.
key_transformation: KeyTransformation = None how to transform extracted object keys

"""

Expand All @@ -24,17 +43,35 @@ class DpathFlattenFields(RecordTransformation):
parameters: InitVar[Mapping[str, Any]]
delete_origin_value: bool = False
replace_record: bool = False
key_transformation: Optional[KeyTransformation] = None

def __post_init__(self, parameters: Mapping[str, Any]) -> None:
self._parameters = parameters
self._field_path = [
InterpolatedString.create(path, parameters=parameters) for path in self.field_path
InterpolatedString.create(path, parameters=self._parameters) for path in self.field_path
]
for path_index in range(len(self.field_path)):
if isinstance(self.field_path[path_index], str):
self._field_path[path_index] = InterpolatedString.create(
self.field_path[path_index], parameters=parameters
self.field_path[path_index], parameters=self._parameters
)

def _apply_key_transformation(self, extracted: Mapping[str, Any]) -> Mapping[str, Any]:
if self.key_transformation:
if self.key_transformation.prefix:
extracted = {
f"{self.key_transformation.prefix}{key}": value
for key, value in extracted.items()
}

if self.key_transformation.suffix:
extracted = {
f"{key}{self.key_transformation.suffix}": value
for key, value in extracted.items()
}

return extracted

def transform(
self,
record: Dict[str, Any],
Expand All @@ -50,6 +87,8 @@ def transform(
extracted = dpath.get(record, path, default=[])

if isinstance(extracted, dict):
extracted = self._apply_key_transformation(extracted)

if self.replace_record and extracted:
dpath.delete(record, "**")
record.update(extracted)
Expand Down
Original file line number Diff line number Diff line change
@@ -1,12 +1,18 @@
import pytest

from airbyte_cdk.sources.declarative.transformations.dpath_flatten_fields import DpathFlattenFields
from airbyte_cdk.sources.declarative.transformations.dpath_flatten_fields import (
DpathFlattenFields,
KeyTransformation,
)

_ANY_VALUE = -1
_DELETE_ORIGIN_VALUE = True
_REPLACE_WITH_VALUE = True
_DO_NOT_DELETE_ORIGIN_VALUE = False
_DO_NOT_REPLACE_WITH_VALUE = False
_NO_KEY_PREFIX = None
_NO_KEY_SUFFIX = None
_NO_KEY_TRANSFORMATIONS = None


@pytest.mark.parametrize(
Expand All @@ -16,6 +22,7 @@
"field_path",
"delete_origin_value",
"replace_record",
"key_transformation",
"expected_record",
],
[
Expand All @@ -25,6 +32,7 @@
["field2"],
_DO_NOT_DELETE_ORIGIN_VALUE,
_DO_NOT_REPLACE_WITH_VALUE,
_NO_KEY_TRANSFORMATIONS,
{"field1": _ANY_VALUE, "field2": {"field3": _ANY_VALUE}, "field3": _ANY_VALUE},
id="flatten by dpath, don't delete origin value",
),
Expand All @@ -34,6 +42,7 @@
["field2"],
_DELETE_ORIGIN_VALUE,
_DO_NOT_REPLACE_WITH_VALUE,
_NO_KEY_TRANSFORMATIONS,
{"field1": _ANY_VALUE, "field3": _ANY_VALUE},
id="flatten by dpath, delete origin value",
),
Expand All @@ -46,6 +55,7 @@
["field2", "*", "field4"],
_DO_NOT_DELETE_ORIGIN_VALUE,
_DO_NOT_REPLACE_WITH_VALUE,
_NO_KEY_TRANSFORMATIONS,
{
"field1": _ANY_VALUE,
"field2": {"field3": {"field4": {"field5": _ANY_VALUE}}},
Expand All @@ -62,6 +72,7 @@
["field2", "*", "field4"],
_DELETE_ORIGIN_VALUE,
_DO_NOT_REPLACE_WITH_VALUE,
_NO_KEY_TRANSFORMATIONS,
{"field1": _ANY_VALUE, "field2": {"field3": {}}, "field5": _ANY_VALUE},
id="flatten by dpath with *, delete origin value",
),
Expand All @@ -71,6 +82,7 @@
["{{ config['field_path'] }}"],
_DO_NOT_DELETE_ORIGIN_VALUE,
_DO_NOT_REPLACE_WITH_VALUE,
_NO_KEY_TRANSFORMATIONS,
{"field1": _ANY_VALUE, "field2": {"field3": _ANY_VALUE}, "field3": _ANY_VALUE},
id="flatten by dpath from config, don't delete origin value",
),
Expand All @@ -80,6 +92,7 @@
["non-existing-field"],
_DO_NOT_DELETE_ORIGIN_VALUE,
_DO_NOT_REPLACE_WITH_VALUE,
_NO_KEY_TRANSFORMATIONS,
{"field1": _ANY_VALUE, "field2": {"field3": _ANY_VALUE}},
id="flatten by non-existing dpath, don't delete origin value",
),
Expand All @@ -89,6 +102,7 @@
["*", "non-existing-field"],
_DO_NOT_DELETE_ORIGIN_VALUE,
_DO_NOT_REPLACE_WITH_VALUE,
_NO_KEY_TRANSFORMATIONS,
{"field1": _ANY_VALUE, "field2": {"field3": _ANY_VALUE}},
id="flatten by non-existing dpath with *, don't delete origin value",
),
Expand All @@ -98,6 +112,7 @@
["field2"],
_DO_NOT_DELETE_ORIGIN_VALUE,
_DO_NOT_REPLACE_WITH_VALUE,
_NO_KEY_TRANSFORMATIONS,
{"field1": _ANY_VALUE, "field2": {"field3": _ANY_VALUE}, "field3": _ANY_VALUE},
id="flatten by dpath, not to update when record has field conflicts, don't delete origin value",
),
Expand All @@ -107,6 +122,7 @@
["field2"],
_DO_NOT_DELETE_ORIGIN_VALUE,
_DO_NOT_REPLACE_WITH_VALUE,
_NO_KEY_TRANSFORMATIONS,
{"field1": _ANY_VALUE, "field2": {"field3": _ANY_VALUE}, "field3": _ANY_VALUE},
id="flatten by dpath, not to update when record has field conflicts, delete origin value",
),
Expand All @@ -116,6 +132,7 @@
["field2"],
_DO_NOT_DELETE_ORIGIN_VALUE,
_REPLACE_WITH_VALUE,
_NO_KEY_TRANSFORMATIONS,
{"field3": _ANY_VALUE},
id="flatten by dpath, replace with value",
),
Expand All @@ -125,20 +142,61 @@
["field2"],
_DELETE_ORIGIN_VALUE,
_REPLACE_WITH_VALUE,
_NO_KEY_TRANSFORMATIONS,
{"field3": _ANY_VALUE},
id="flatten by dpath, delete_origin_value do not affect to replace_record",
),
pytest.param(
{"field1": _ANY_VALUE, "field2": {"field3": _ANY_VALUE}},
{},
["field2"],
_DO_NOT_DELETE_ORIGIN_VALUE,
_REPLACE_WITH_VALUE,
("prefix_", _NO_KEY_SUFFIX),
{"prefix_field3": _ANY_VALUE},
id="flatten by dpath, not delete origin value, replace record, add keys prefix",
),
pytest.param(
{"field1": _ANY_VALUE, "field2": {"field3": _ANY_VALUE}},
{},
["field2"],
_DO_NOT_DELETE_ORIGIN_VALUE,
_REPLACE_WITH_VALUE,
(_NO_KEY_PREFIX, "_suffix"),
{"field3_suffix": _ANY_VALUE},
id="flatten by dpath, not delete origin value, replace record, add keys suffix",
),
pytest.param(
{"field1": _ANY_VALUE, "field2": {"field3": _ANY_VALUE}},
{},
["field2"],
_DO_NOT_DELETE_ORIGIN_VALUE,
_REPLACE_WITH_VALUE,
("prefix_", "_suffix"),
{"prefix_field3_suffix": _ANY_VALUE},
id="flatten by dpath, not delete origin value, replace record, add keys prefix and suffix",
),
],
)
def test_dpath_flatten_lists(
input_record, config, field_path, delete_origin_value, replace_record, expected_record
input_record,
config,
field_path,
delete_origin_value,
replace_record,
key_transformation,
expected_record,
):
if key_transformation:
key_transformation = KeyTransformation(config, {}, *key_transformation)

flattener = DpathFlattenFields(
field_path=field_path,
parameters={},
config=config,
delete_origin_value=delete_origin_value,
replace_record=replace_record,
key_transformation=key_transformation,
)
flattener.transform(input_record)
assert input_record == expected_record
Loading