Skip to content

Commit

Permalink
Fix schema transformation false warnings. (airbytehq#7863)
Browse files Browse the repository at this point in the history
  • Loading branch information
Dmytro authored Nov 12, 2021
1 parent 0c41542 commit e56e86d
Show file tree
Hide file tree
Showing 4 changed files with 56 additions and 63 deletions.
3 changes: 3 additions & 0 deletions airbyte-cdk/python/CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,8 @@
# Changelog

## 0.1.35
Fix false warnings on record transform.

## 0.1.34
Fix logging inside source and streams

Expand Down
26 changes: 13 additions & 13 deletions airbyte-cdk/python/airbyte_cdk/sources/utils/transform.py
Original file line number Diff line number Diff line change
Expand Up @@ -125,7 +125,7 @@ def __get_normalizer(self, schema_key: str, original_validator: Callable):
:original_validator: native jsonschema validator callback.
"""

def normalizator(validator_instance: Callable, val: Any, instance: Any, schema: Dict[str, Any]):
def normalizator(validator_instance: Callable, property_value: Any, instance: Any, schema: Dict[str, Any]):
"""
Jsonschema validator callable it uses for validating instance. We
override default Draft7Validator to perform value transformation
Expand All @@ -144,19 +144,19 @@ def resolve(subschema):
return resolved
return subschema

if schema_key == "type" and instance is not None:
if "object" in val and isinstance(instance, dict):
for k, subschema in schema.get("properties", {}).items():
if k in instance:
subschema = resolve(subschema)
instance[k] = self.__normalize(instance[k], subschema)
elif "array" in val and isinstance(instance, list):
subschema = schema.get("items", {})
subschema = resolve(subschema)
for index, item in enumerate(instance):
instance[index] = self.__normalize(item, subschema)
# Transform object and array values before running json schema type checking for each element.
if schema_key == "properties":
for k, subschema in property_value.items():
if k in (instance or {}):
subschema = resolve(subschema)
instance[k] = self.__normalize(instance[k], subschema)
elif schema_key == "items":
subschema = resolve(property_value)
for index, item in enumerate((instance or [])):
instance[index] = self.__normalize(item, subschema)

# Running native jsonschema traverse algorithm after field normalization is done.
yield from original_validator(validator_instance, val, instance, schema)
yield from original_validator(validator_instance, property_value, instance, schema)

return normalizator

Expand Down
2 changes: 1 addition & 1 deletion airbyte-cdk/python/setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@

setup(
name="airbyte-cdk",
version="0.1.34",
version="0.1.35",
description="A framework for writing Airbyte Connectors.",
long_description=README,
long_description_content_type="text/markdown",
Expand Down
88 changes: 39 additions & 49 deletions airbyte-cdk/python/unit_tests/sources/utils/test_transform.py
Original file line number Diff line number Diff line change
Expand Up @@ -60,116 +60,106 @@


@pytest.mark.parametrize(
"schema, actual, expected",
"schema, actual, expected, expected_warns",
[
(
SIMPLE_SCHEMA,
{"value": 12},
{"value": "12"},
),
(
SIMPLE_SCHEMA,
{"value": 12},
{"value": "12"},
),
(
COMPLEX_SCHEMA,
{"value": 1, "array": ["111", 111, {1: 111}]},
{"value": True, "array": ["111", "111", "{1: 111}"]},
),
(SIMPLE_SCHEMA, {"value": 12}, {"value": "12"}, None),
(SIMPLE_SCHEMA, {"value": 12}, {"value": "12"}, None),
(SIMPLE_SCHEMA, {"value": 12, "unexpected_value": "unexpected"}, {"value": "12", "unexpected_value": "unexpected"}, None),
(COMPLEX_SCHEMA, {"value": 1, "array": ["111", 111, {1: 111}]}, {"value": True, "array": ["111", "111", "{1: 111}"]}, None),
(
COMPLEX_SCHEMA,
{"value": 1, "list_of_lists": [["111"], [111], [11], [{1: 1}]]},
{"value": True, "list_of_lists": [["111"], ["111"], ["11"], ["{1: 1}"]]},
None,
),
(
COMPLEX_SCHEMA,
{"value": 1, "nested": {"a": [1, 2, 3]}},
{"value": True, "nested": {"a": "[1, 2, 3]"}},
),
(
COMPLEX_SCHEMA,
{"value": "false", "nested": {"a": [1, 2, 3]}},
{"value": False, "nested": {"a": "[1, 2, 3]"}},
),
(COMPLEX_SCHEMA, {}, {}),
(COMPLEX_SCHEMA, {"int_prop": "12"}, {"int_prop": 12}),
(COMPLEX_SCHEMA, {"value": 1, "nested": {"a": [1, 2, 3]}}, {"value": True, "nested": {"a": "[1, 2, 3]"}}, None),
(COMPLEX_SCHEMA, {"value": "false", "nested": {"a": [1, 2, 3]}}, {"value": False, "nested": {"a": "[1, 2, 3]"}}, None),
(COMPLEX_SCHEMA, {}, {}, None),
(COMPLEX_SCHEMA, {"int_prop": "12"}, {"int_prop": 12}, None),
# Skip invalid formattted field and process other fields.
(
COMPLEX_SCHEMA,
{"prop": 12, "number_prop": "aa12", "array": [12]},
{"prop": "12", "number_prop": "aa12", "array": ["12"]},
"'aa12' is not of type 'number'",
),
# Field too_many_types have ambigious type, skip formatting
(
COMPLEX_SCHEMA,
{"prop": 12, "too_many_types": 1212, "array": [12]},
{"prop": "12", "too_many_types": 1212, "array": ["12"]},
"1212 is not of type 'boolean', 'null', 'string'",
),
# Test null field
(
COMPLEX_SCHEMA,
{"prop": None, "array": [12]},
{"prop": "None", "array": ["12"]},
),
(COMPLEX_SCHEMA, {"prop": None, "array": [12]}, {"prop": "None", "array": ["12"]}, None),
# If field can be null do not convert
(
COMPLEX_SCHEMA,
{"prop_with_null": None, "array": [12]},
{"prop_with_null": None, "array": ["12"]},
),
(COMPLEX_SCHEMA, {"prop_with_null": None, "array": [12]}, {"prop_with_null": None, "array": ["12"]}, None),
(
VERY_NESTED_SCHEMA,
{"very_nested_value": {"very_nested_value": {"very_nested_value": {"very_nested_value": {"very_nested_value": "2"}}}}},
{"very_nested_value": {"very_nested_value": {"very_nested_value": {"very_nested_value": {"very_nested_value": 2.0}}}}},
None,
),
(
VERY_NESTED_SCHEMA,
{"very_nested_value": {"very_nested_value": None}},
{"very_nested_value": {"very_nested_value": None}},
),
(VERY_NESTED_SCHEMA, {"very_nested_value": {"very_nested_value": None}}, {"very_nested_value": {"very_nested_value": None}}, None),
# Object without properties
(
{"type": "object"},
{"value": 12},
{"value": 12},
),
({"type": "object"}, {"value": 12}, {"value": 12}, None),
(
# Array without items
{"type": "object", "properties": {"value": {"type": "array"}}},
{"value": [12]},
{"value": [12]},
None,
),
(
# Array without items and value is not an array
{"type": "object", "properties": {"value": {"type": "array"}}},
{"value": "12"},
{"value": "12"},
"'12' is not of type 'array'",
),
(
# Schema root object is not an object, no convertion should happen
{"type": "integer"},
{"value": "12"},
{"value": "12"},
"{'value': '12'} is not of type 'integer'",
),
(
# More than one type except null, no conversion should happen
{"type": "object", "properties": {"value": {"type": ["string", "boolean", "null"]}}},
{"value": 12},
{"value": 12},
"12 is not of type 'string', 'boolean', 'null'",
),
(
# Oneof not suported, no conversion for one_of_value should happen
{"type": "object", "properties": {"one_of_value": {"oneOf": ["string", "boolean", "null"]}, "value_2": {"type": "string"}}},
{"one_of_value": 12, "value_2": 12},
{"one_of_value": 12, "value_2": "12"},
None,
),
(
# Case for #7076 issue (Facebook marketing: print tons of WARN message)
{
"properties": {
"cpc": {"type": ["null", "number"]},
},
},
{"cpc": "6.6666"},
{"cpc": 6.6666},
None,
),
],
)
def test_transform(schema, actual, expected):
def test_transform(schema, actual, expected, expected_warns, capsys):
t = TypeTransformer(TransformConfig.DefaultSchemaNormalization)
t.transform(actual, schema)
assert json.dumps(actual) == json.dumps(expected)
stdout = capsys.readouterr().out
if expected_warns:
assert expected_warns in stdout
else:
assert not stdout


def test_transform_wrong_config():
Expand Down

0 comments on commit e56e86d

Please sign in to comment.