Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

CDK: Add base pydantic model for connector config and schemas #8485

Merged
merged 11 commits into from
Dec 7, 2021
Prev Previous commit
Next Next commit
add schema base models, fix spelling, signatures and polishing
  • Loading branch information
eugene-kulak committed Dec 7, 2021
commit 787ef131153a5af54ac6b4fed28bd89f43a3da0c
2 changes: 1 addition & 1 deletion airbyte-cdk/python/airbyte_cdk/destinations/destination.py
Original file line number Diff line number Diff line change
Expand Up @@ -92,7 +92,7 @@ def run_cmd(self, parsed_args: argparse.Namespace) -> Iterable[AirbyteMessage]:
return
config = self.read_config(config_path=parsed_args.config)
if self.check_config_against_spec or cmd == "check":
check_config_against_spec_or_exit(config, spec, self.logger)
check_config_against_spec_or_exit(config, spec)

if cmd == "check":
yield self._run_check(config=config)
Expand Down
2 changes: 1 addition & 1 deletion airbyte-cdk/python/airbyte_cdk/entrypoint.py
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,7 @@ def run(self, parsed_args: argparse.Namespace) -> Iterable[str]:
# jsonschema's additionalProperties flag wont fail the validation
config, internal_config = split_config(config)
if self.source.check_config_against_spec or cmd == "check":
check_config_against_spec_or_exit(config, source_spec, self.logger)
check_config_against_spec_or_exit(config, source_spec)
# Put internal flags back to config dict
config.update(internal_config.dict())

Expand Down
90 changes: 44 additions & 46 deletions airbyte-cdk/python/airbyte_cdk/sources/utils/schema_helpers.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,13 +10,49 @@
from typing import Any, ClassVar, Dict, Mapping, Tuple

import jsonref
from airbyte_cdk.logger import AirbyteLogger
from airbyte_cdk.models import ConnectorSpecification
from jsonschema import validate
from jsonschema.exceptions import ValidationError
from pydantic import BaseModel, Field


class JsonFileLoader:
"""
Custom json file loader to resolve references to resources located in "shared" directory.
We need this for compatability with existing schemas cause all of them have references
pointing to shared_schema.json file instead of shared/shared_schema.json
"""

def __init__(self, uri_base: str, shared: str):
self.shared = shared
self.uri_base = uri_base

def __call__(self, uri: str) -> Dict[str, Any]:
uri = uri.replace(self.uri_base, f"{self.uri_base}/{self.shared}/")
return json.load(open(uri))


def resolve_ref_links(obj: Any) -> Dict[str, Any]:
"""
Scan resolved schema and convert jsonref.JsonRef object to JSON serializable dict.

:param obj - jsonschema object with ref field resolved.
:return JSON serializable object with references without external dependencies.
"""
if isinstance(obj, jsonref.JsonRef):
obj = resolve_ref_links(obj.__subject__)
# Omit existing definitions for external resource since
# we dont need it anymore.
obj.pop("definitions", None)
return obj
elif isinstance(obj, dict):
return {k: resolve_ref_links(v) for k, v in obj.items()}
elif isinstance(obj, list):
return [resolve_ref_links(item) for item in obj]
else:
return obj


class ResourceSchemaLoader:
"""JSONSchema loader from package resources"""

Expand All @@ -42,10 +78,8 @@ def get_schema(self, name: str) -> dict:
raise IOError(f"Cannot find file {schema_filename}")
try:
raw_schema = json.loads(raw_file)
except ValueError:
# TODO use proper logging
print(f"Invalid JSON file format for file {schema_filename}")
raise
except ValueError as err:
raise RuntimeError(f"Invalid JSON file format for file {schema_filename}") from err

return self.__resolve_schema_references(raw_schema)

Expand All @@ -57,58 +91,20 @@ def __resolve_schema_references(self, raw_schema: dict) -> dict:
:return JSON serializable object with references without external dependencies.
"""

class JsonFileLoader:
"""
Custom json file loader to resolve references to resources located in "shared" directory.
We need this for compatability with existing schemas cause all of them have references
pointing to shared_schema.json file instead of shared/shared_schema.json
"""

def __init__(self, uri_base: str, shared: str):
self.shared = shared
self.uri_base = uri_base

def __call__(self, uri: str) -> Dict[str, Any]:
uri = uri.replace(self.uri_base, f"{self.uri_base}/{self.shared}/")
return json.load(open(uri))

package = importlib.import_module(self.package_name)
base = os.path.dirname(package.__file__) + "/"

def resolve_ref_links(obj: Any) -> Dict[str, Any]:
"""
Scan resolved schema and convert jsonref.JsonRef object to JSON
serializable dict.

:param obj - jsonschema object with ref field resovled.
:return JSON serializable object with references without external dependencies.
"""
if isinstance(obj, jsonref.JsonRef):
obj = resolve_ref_links(obj.__subject__)
# Omit existance definitions for extenal resource since
# we dont need it anymore.
obj.pop("definitions", None)
return obj
elif isinstance(obj, dict):
return {k: resolve_ref_links(v) for k, v in obj.items()}
elif isinstance(obj, list):
return [resolve_ref_links(item) for item in obj]
else:
return obj

resolved = jsonref.JsonRef.replace_refs(raw_schema, loader=JsonFileLoader(base, "schemas/shared"), base_uri=base)
resolved = resolve_ref_links(resolved)
return resolved


def check_config_against_spec_or_exit(config: Mapping[str, Any], spec: ConnectorSpecification, logger: AirbyteLogger):
def check_config_against_spec_or_exit(config: Mapping[str, Any], spec: ConnectorSpecification):
"""
Check config object against spec. In case of spec is invalid, throws
an exception with validation error description.

:param config - config loaded from file specified over command line
:param spec - spec object generated by connector
:param logger - Airbyte logger for reporting validation error
"""
spec_schema = spec.connectionSpecification
try:
Expand All @@ -122,8 +118,10 @@ class InternalConfig(BaseModel):
limit: int = Field(None, alias="_limit")
page_size: int = Field(None, alias="_page_size")

def dict(self):
return super().dict(by_alias=True, exclude_unset=True)
def dict(self, *args, **kwargs):
kwargs["by_alias"] = True
kwargs["exclude_unset"] = True
return super().dict(*args, **kwargs)


def split_config(config: Mapping[str, Any]) -> Tuple[dict, InternalConfig]:
Expand Down
72 changes: 72 additions & 0 deletions airbyte-cdk/python/airbyte_cdk/sources/utils/schema_models.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,72 @@
from typing import Type, Dict, Any, Optional

from pydantic import BaseModel, Extra
from pydantic.main import ModelMetaclass
from pydantic.typing import resolve_annotations


class AllOptional(ModelMetaclass):
"""
Metaclass for marking all Pydantic model fields as Optional
Here is example of declaring model using this metaclass like:
'''
class MyModel(BaseModel, metaclass=AllOptional):
a: str
b: str
'''
it is an equivalent of:
'''
class MyModel(BaseModel):
a: Optional[str]
b: Optional[str]
'''
It would make code more clear and eliminate a lot of manual work.
"""

def __new__(mcs, name, bases, namespaces, **kwargs):
"""
Iterate through fields and wrap then with typing.Optional type.
"""
annotations = resolve_annotations(namespaces.get("__annotations__", {}), namespaces.get("__module__", None))
for base in bases:
annotations = {**annotations, **getattr(base, "__annotations__", {})}
for field in annotations:
if not field.startswith("__"):
annotations[field] = Optional[annotations[field]]
namespaces["__annotations__"] = annotations
return super().__new__(mcs, name, bases, namespaces, **kwargs)


class BaseSchemaModel(BaseModel):
"""
Base class for all schema models. It has some extra schema postprocessing.
Can be used in combination with AllOptional metaclass
"""

class Config:
extra = Extra.allow

@classmethod
def schema_extra(cls, schema: Dict[str, Any], model: Type[BaseModel]) -> None:
""" Modify generated jsonschema, remove "title", "description" and "required" fields.

Pydantic doesn't treat Union[None, Any] type correctly when generate jsonschema,
so we can't set field as nullable (i.e. field that can have either null and non-null values),
We generate this jsonschema value manually.

:param schema: generated jsonschema
:param model:
"""
schema.pop("title", None)
schema.pop("description", None)
schema.pop("required", None)
for name, prop in schema.get("properties", {}).items():
prop.pop("title", None)
prop.pop("description", None)
allow_none = model.__fields__[name].allow_none
if allow_none:
if "type" in prop:
prop["type"] = ["null", prop["type"]]
elif "$ref" in prop:
ref = prop.pop("$ref")
prop["oneOf"] = [{"type": "null"}, {"$ref": ref}]
Original file line number Diff line number Diff line change
Expand Up @@ -172,7 +172,7 @@ def test_run_check(self, mocker, destination: Destination, tmp_path):
# Affirm to Mypy that this is indeed a method on this mock
destination.check.assert_called_with(logger=ANY, config=dummy_config) # type: ignore
# Check if config validation has been called
validate_mock.assert_called_with(dummy_config, spec_msg, destination.logger)
validate_mock.assert_called_with(dummy_config, spec_msg)

# verify output was correct
assert _wrapped(expected_check_result) == returned_check_result
Expand Down Expand Up @@ -224,7 +224,7 @@ def test_run_write(self, mocker, destination: Destination, tmp_path, monkeypatch
input_messages=OrderedIterableMatcher(mocked_input),
)
# Check if config validation has been called
validate_mock.assert_called_with(dummy_config, spec_msg, destination.logger)
validate_mock.assert_called_with(dummy_config, spec_msg)

# verify output was correct
assert expected_write_result == returned_write_result
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ def spec_object():
def test_check_config_against_spec_or_exit_does_not_print_schema(capsys, spec_object):
config = {"super_secret_token": "really_a_secret"}
with pytest_raises(Exception) as ex_info:
check_config_against_spec_or_exit(config, spec_object, logger)
check_config_against_spec_or_exit(config, spec_object)
exc = ex_info.value
traceback.print_exception(type(exc), exc, exc.__traceback__)
out, err = capsys.readouterr()
Expand All @@ -67,7 +67,7 @@ def test_check_config_against_spec_or_exit_does_not_print_schema(capsys, spec_ob

def test_should_not_fail_validation_for_valid_config(spec_object):
config = {"api_token": "something"}
check_config_against_spec_or_exit(config, spec_object, logger)
check_config_against_spec_or_exit(config, spec_object)
assert True, "should pass validation with valid config"


Expand Down
54 changes: 54 additions & 0 deletions airbyte-cdk/python/unit_tests/sources/utils/test_schema_models.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
#
# Copyright (c) 2021 Airbyte, Inc., all rights reserved.
#

from typing import List, Optional

from airbyte_cdk.sources.utils.schema_models import BaseSchemaModel, AllOptional


class InnerClass(BaseSchemaModel):
field1: Optional[str]
field2: int


class SchemaWithFewNullables(BaseSchemaModel):
name: Optional[str]
optional_item: Optional[InnerClass]
items: List[InnerClass]


class SchemaWithAllOptional(BaseSchemaModel, metaclass=AllOptional):
object_id: int
item: InnerClass


class TestSchemaWithFewNullables:
EXPECTED_SCHEMA = {
'type': 'object', 'properties': {
'name': {'type': ['null', 'string']},
'optional_item': {'oneOf': [{'type': 'null'}, {'$ref': '#/definitions/InnerClass'}]},
'items': {'type': 'array', 'items': {'$ref': '#/definitions/InnerClass'}}
},
'definitions': {
'InnerClass': {'type': 'object', 'properties': {'field1': {'type': ['null', 'string']}, 'field2': {'type': 'integer'}}}
}
}

def test_schema_postprocessing(self):
schema = SchemaWithFewNullables.schema()
assert schema == self.EXPECTED_SCHEMA


class TestSchemaWithAllOptional:
EXPECTED_SCHEMA = {
'type': 'object', 'properties': {
'object_id': {'type': ['null', 'integer']},
'item': {'oneOf': [{'type': 'null'}, {'$ref': '#/definitions/InnerClass'}]}},
'definitions': {'InnerClass': {'type': 'object', 'properties': {'field1': {'type': ['null', 'string']},
'field2': {'type': 'integer'}}}}
}

def test_schema_postprocessing(self):
schema = SchemaWithAllOptional.schema()
assert schema == self.EXPECTED_SCHEMA