Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Airbyte CDK: add filter to RemoveFields #35326

Merged
2 changes: 1 addition & 1 deletion airbyte-cdk/python/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@ pip install -e ".[dev]" # [dev] installs development-only dependencies
If the iteration you are working on includes changes to the models, you might want to regenerate them. In order to do that, you can run:

```bash
./gradlew :airbyte-cdk:python:format
./gradlew :airbyte-cdk:python:build
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That seems like a docs mistake from previous things, right?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, gradle format tast does not exist anymore. airbyte-ci format fix should be used instead

```

This will generate the files based on the schemas, add the license information and format the code. If you want to only do the former and rely on
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1847,6 +1847,19 @@ definitions:
type:
type: string
enum: [RemoveFields]
condition:
artem1205 marked this conversation as resolved.
Show resolved Hide resolved
description: The predicate to filter a property by a property value. Property will be removed if it is empty OR expression is evaluated to True.,
type: string
default: ""
interpolation_context:
- config
- property
- parameters
examples:
- "{{ property|string == '' }}"
- "{{ property is integer }}"
- "{{ property|length > 5 }}"
- "{{ property == 'some_string_to_match' }}"
field_pointers:
title: Field Paths
description: Array of paths defining the field to remove. Each item is an array whose field describe the path of a field to remove.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -579,6 +579,16 @@ class SchemaNormalization(Enum):

class RemoveFields(BaseModel):
type: Literal['RemoveFields']
condition: Optional[str] = Field(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

was this file modified manually? we should run gradle :airbyte-cdk:python:build or airbyte-cdk/python/bin/generate-component-manifest-files.sh to generate it

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

TIL, btw.

'',
description="The predicate to filter a property by a property value. Property will be removed if it is empty OR expression is evaluated to True.",
examples=[
"{{ property|string == '' }}",
'{{ property is integer }}',
'{{ property|length > 5 }}',
"{{ property == 'some_string_to_match' }}",
],
)
field_pointers: List[List[str]] = Field(
...,
description='Array of paths defining the field to remove. Each item is an array whose field describe the path of a field to remove.',
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -909,7 +909,7 @@ def create_record_selector(

@staticmethod
def create_remove_fields(model: RemoveFieldsModel, config: Config, **kwargs: Any) -> RemoveFields:
return RemoveFields(field_pointers=model.field_pointers, parameters={})
return RemoveFields(field_pointers=model.field_pointers, condition=model.condition or "", parameters={})
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can you add a unit test where condition is "" to verify nothing is filtered out by default?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

added


def create_selective_authenticator(self, model: SelectiveAuthenticatorModel, config: Config, **kwargs: Any) -> DeclarativeAuthenticator:
authenticators = {name: self._create_component_from_model(model=auth, config=config) for name, auth in model.authenticators.items()}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@

import dpath.exceptions
import dpath.util
from airbyte_cdk.sources.declarative.interpolation.interpolated_boolean import InterpolatedBoolean
from airbyte_cdk.sources.declarative.transformations import RecordTransformation
from airbyte_cdk.sources.declarative.types import Config, FieldPointer, StreamSlice, StreamState

Expand Down Expand Up @@ -40,6 +41,10 @@ class RemoveFields(RecordTransformation):

field_pointers: List[FieldPointer]
parameters: InitVar[Mapping[str, Any]]
condition: str = ""

def __post_init__(self, parameters: Mapping[str, Any]) -> None:
self._filter_interpolator = InterpolatedBoolean(condition=self.condition, parameters=parameters)

def transform(
self,
Expand All @@ -55,7 +60,11 @@ def transform(
for pointer in self.field_pointers:
# the dpath library by default doesn't delete fields from arrays
try:
dpath.util.delete(record, pointer)
dpath.util.delete(
artem1205 marked this conversation as resolved.
Show resolved Hide resolved
record,
pointer,
afilter=(lambda x: self._filter_interpolator.eval(config or {}, property=x)) if self.condition else None,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is afilter a typo, should it be filter? I don't fully understand the details, so I might be wrong on this.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

that's just what the parameter is called in the dpath library https://pypi.org/project/dpath/

)
except dpath.exceptions.PathNotFound:
# if the (potentially nested) property does not exist, silently skip
pass
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,39 +10,79 @@


@pytest.mark.parametrize(
["input_record", "field_pointers", "expected"],
["input_record", "field_pointers", "condition", "expected"],
[
pytest.param({"k1": "v", "k2": "v"}, [["k1"]], {"k2": "v"}, id="remove a field that exists (flat dict)"),
pytest.param({"k1": "v", "k2": "v"}, [["k3"]], {"k1": "v", "k2": "v"}, id="remove a field that doesn't exist (flat dict)"),
pytest.param({"k1": "v", "k2": "v"}, [["k1"], ["k2"]], {}, id="remove multiple fields that exist (flat dict)"),
pytest.param({"k1": "v", "k2": "v"}, [["k1"]], None, {"k2": "v"}, id="remove a field that exists (flat dict), condition = None"),
pytest.param({"k1": "v", "k2": "v"}, [["k1"]], "", {"k2": "v"}, id="remove a field that exists (flat dict)"),
pytest.param({"k1": "v", "k2": "v"}, [["k3"]], "", {"k1": "v", "k2": "v"}, id="remove a field that doesn't exist (flat dict)"),
pytest.param({"k1": "v", "k2": "v"}, [["k1"], ["k2"]], "", {}, id="remove multiple fields that exist (flat dict)"),
# TODO: should we instead splice the element out of the array? I think that's the more intuitive solution
# Otherwise one could just set the field's value to null.
pytest.param({"k1": [1, 2]}, [["k1", 0]], {"k1": [None, 2]}, id="remove field inside array (int index)"),
pytest.param({"k1": [1, 2]}, [["k1", "0"]], {"k1": [None, 2]}, id="remove field inside array (string index)"),
pytest.param({"k1": [1, 2]}, [["k1", 0]], "", {"k1": [None, 2]}, id="remove field inside array (int index)"),
pytest.param({"k1": [1, 2]}, [["k1", "0"]], "", {"k1": [None, 2]}, id="remove field inside array (string index)"),
pytest.param(
{"k1": "v", "k2": "v", "k3": [0, 1], "k4": "v"},
[["k1"], ["k2"], ["k3", 0]],
"",
{"k3": [None, 1], "k4": "v"},
id="test all cases (flat)",
),
pytest.param({"k1": [0, 1]}, [[".", "k1", 10]], {"k1": [0, 1]}, id="remove array index that doesn't exist (flat)"),
pytest.param({".": {"k1": [0, 1]}}, [[".", "k1", 10]], {".": {"k1": [0, 1]}}, id="remove array index that doesn't exist (nested)"),
pytest.param({".": {"k2": "v", "k1": "v"}}, [[".", "k1"]], {".": {"k2": "v"}}, id="remove nested field that exists"),
pytest.param({"k1": [0, 1]}, [[".", "k1", 10]], "", {"k1": [0, 1]}, id="remove array index that doesn't exist (flat)"),
pytest.param(
{".": {"k2": "v", "k1": "v"}}, [[".", "k3"]], {".": {"k2": "v", "k1": "v"}}, id="remove field that doesn't exist (nested)"
{".": {"k1": [0, 1]}}, [[".", "k1", 10]], "", {".": {"k1": [0, 1]}}, id="remove array index that doesn't exist (nested)"
),
pytest.param({".": {"k2": "v", "k1": "v"}}, [[".", "k1"], [".", "k2"]], {".": {}}, id="remove multiple fields that exist (nested)"),
pytest.param({".": {"k2": "v", "k1": "v"}}, [[".", "k1"]], "", {".": {"k2": "v"}}, id="remove nested field that exists"),
pytest.param(
{".": {"k1": [0, 1]}}, [[".", "k1", 0]], {".": {"k1": [None, 1]}}, id="remove multiple fields that exist in arrays (nested)"
{".": {"k2": "v", "k1": "v"}}, [[".", "k3"]], "", {".": {"k2": "v", "k1": "v"}}, id="remove field that doesn't exist (nested)"
),
pytest.param(
{".": {"k2": "v", "k1": "v"}}, [[".", "k1"], [".", "k2"]], "", {".": {}}, id="remove multiple fields that exist (nested)"
),
pytest.param(
{".": {"k1": [0, 1]}},
[[".", "k1", 0]],
"",
{".": {"k1": [None, 1]}},
id="remove multiple fields that exist in arrays (nested)",
),
pytest.param(
{".": {"k1": [{"k2": "v", "k3": "v"}, {"k4": "v"}]}},
[[".", "k1", 0, "k2"], [".", "k1", 1, "k4"]],
"",
{".": {"k1": [{"k3": "v"}, {}]}},
id="remove fields that exist in arrays (deeply nested)",
),
pytest.param(
{"k1": "v", "k2": "v"},
[["**"]],
"{{ False }}",
{"k1": "v", "k2": "v"},
id="do not remove any field if condition is boolean False",
),
pytest.param({"k1": "v", "k2": "v"}, [["**"]], "{{ True }}", {}, id="remove all field if condition is boolean True"),
pytest.param(
{"k1": "v", "k2": "v1", "k3": "v1", "k4": {"k_nested": "v1", "k_nested2": "v2"}},
[["**"]],
"{{ property == 'v1' }}",
{"k1": "v", "k4": {"k_nested2": "v2"}},
id="recursively remove any field that matches property condition and leave that does not",
),
pytest.param(
{"k1": "v", "k2": "some_long_string", "k3": "some_long_string", "k4": {"k_nested": "v1", "k_nested2": "v2"}},
[["**"]],
"{{ property|length > 5 }}",
{"k1": "v", "k4": {"k_nested": "v1", "k_nested2": "v2"}},
id="remove any field that have length > 5 and leave that does not",
),
pytest.param(
{"k1": 255, "k2": "some_string", "k3": "some_long_string", "k4": {"k_nested": 123123, "k_nested2": "v2"}},
[["**"]],
"{{ property is integer }}",
{"k2": "some_string", "k3": "some_long_string", "k4": {"k_nested2": "v2"}},
id="recursively remove any field that of type integer and leave that does not",
),
],
)
def test_remove_fields(input_record: Mapping[str, Any], field_pointers: List[FieldPointer], expected: Mapping[str, Any]):
transformation = RemoveFields(field_pointers=field_pointers, parameters={})
def test_remove_fields(input_record: Mapping[str, Any], field_pointers: List[FieldPointer], condition: str, expected: Mapping[str, Any]):
transformation = RemoveFields(field_pointers=field_pointers, condition=condition, parameters={})
assert transformation.transform(input_record) == expected
Loading