Skip to content

Commit

Permalink
4776: Python CDK: Validate input config.py against spec (airbytehq#5457)
Browse files Browse the repository at this point in the history
Python CDK: Validate input config.py against spec

Co-authored-by: Dmytro Rezchykov <dmitry.rezchykov@zazmic.com>
  • Loading branch information
avida and Dmytro Rezchykov authored Aug 19, 2021
1 parent 5560df7 commit b1f2bf5
Show file tree
Hide file tree
Showing 8 changed files with 110 additions and 37 deletions.
3 changes: 3 additions & 0 deletions airbyte-cdk/python/CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,8 @@
# Changelog

## 0.1.11
Add checking specified config againt spec for read, write, check and discover commands

## 0.1.10
Add `MultipleTokenAuthenticator` class to allow cycling through a list of API tokens when making HTTP requests

Expand Down
35 changes: 19 additions & 16 deletions airbyte-cdk/python/airbyte_cdk/destinations/destination.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,36 +31,35 @@
from airbyte_cdk import AirbyteLogger
from airbyte_cdk.connector import Connector
from airbyte_cdk.models import AirbyteMessage, ConfiguredAirbyteCatalog, Type
from airbyte_cdk.sources.utils.schema_helpers import check_config_against_spec_or_exit
from pydantic import ValidationError


class Destination(Connector, ABC):
logger = AirbyteLogger()
VALID_CMDS = {"spec", "check", "write"}

@abstractmethod
def write(
self, config: Mapping[str, Any], configured_catalog: ConfiguredAirbyteCatalog, input_messages: Iterable[AirbyteMessage]
) -> Iterable[AirbyteMessage]:
"""Implement to define how the connector writes data to the destination"""

def _run_spec(self) -> AirbyteMessage:
return AirbyteMessage(type=Type.SPEC, spec=self.spec(self.logger))

def _run_check(self, config_path: str) -> AirbyteMessage:
config = self.read_config(config_path=config_path)
def _run_check(self, config: Mapping[str, Any]) -> AirbyteMessage:
check_result = self.check(self.logger, config)
return AirbyteMessage(type=Type.CONNECTION_STATUS, connectionStatus=check_result)

def _parse_input_stream(self, input_stream: io.TextIOWrapper) -> Iterable[AirbyteMessage]:
""" Reads from stdin, converting to Airbyte messages"""
"""Reads from stdin, converting to Airbyte messages"""
for line in input_stream:
try:
yield AirbyteMessage.parse_raw(line)
except ValidationError:
self.logger.info(f"ignoring input which can't be deserialized as Airbyte Message: {line}")

def _run_write(self, config_path: str, configured_catalog_path: str, input_stream: io.TextIOWrapper) -> Iterable[AirbyteMessage]:
config = self.read_config(config_path=config_path)
def _run_write(
self, config: Mapping[str, Any], configured_catalog_path: str, input_stream: io.TextIOWrapper
) -> Iterable[AirbyteMessage]:
catalog = ConfiguredAirbyteCatalog.parse_file(configured_catalog_path)
input_messages = self._parse_input_stream(input_stream)
self.logger.info("Begin writing to the destination...")
Expand Down Expand Up @@ -104,18 +103,22 @@ def parse_args(self, args: List[str]) -> argparse.Namespace:

def run_cmd(self, parsed_args: argparse.Namespace) -> Iterable[AirbyteMessage]:
cmd = parsed_args.command
if cmd not in self.VALID_CMDS:
raise Exception(f"Unrecognized command: {cmd}")

spec = self.spec(self.logger)
if cmd == "spec":
yield self._run_spec()
elif cmd == "check":
yield self._run_check(config_path=parsed_args.config)
yield AirbyteMessage(type=Type.SPEC, spec=spec)
return
config = self.read_config(config_path=parsed_args.config)
check_config_against_spec_or_exit(config, spec, self.logger)

if cmd == "check":
yield self._run_check(config=config)
elif cmd == "write":
# Wrap in UTF-8 to override any other input encodings
wrapped_stdin = io.TextIOWrapper(sys.stdin.buffer, encoding="utf-8")
yield from self._run_write(
config_path=parsed_args.config, configured_catalog_path=parsed_args.catalog, input_stream=wrapped_stdin
)
else:
raise Exception(f"Unrecognized command: {cmd}")
yield from self._run_write(config=config, configured_catalog_path=parsed_args.catalog, input_stream=wrapped_stdin)

def run(self, args: List[str]):
parsed_args = self.parse_args(args)
Expand Down
5 changes: 4 additions & 1 deletion airbyte-cdk/python/airbyte_cdk/entrypoint.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
from airbyte_cdk.logger import AirbyteLogger
from airbyte_cdk.models import AirbyteMessage, Status, Type
from airbyte_cdk.sources import Source
from airbyte_cdk.sources.utils.schema_helpers import check_config_against_spec_or_exit

logger = AirbyteLogger()

Expand Down Expand Up @@ -80,14 +81,16 @@ def run(self, parsed_args: argparse.Namespace) -> Iterable[str]:
raise Exception("No command passed")

# todo: add try catch for exceptions with different exit codes
source_spec = self.source.spec(logger)

with tempfile.TemporaryDirectory() as temp_dir:
if cmd == "spec":
message = AirbyteMessage(type=Type.SPEC, spec=self.source.spec(logger))
message = AirbyteMessage(type=Type.SPEC, spec=source_spec)
yield message.json(exclude_unset=True)
else:
raw_config = self.source.read_config(parsed_args.config)
config = self.source.configure(raw_config, temp_dir)
check_config_against_spec_or_exit(config, source_spec, logger)

if cmd == "check":
check_result = self.source.check(logger, config)
Expand Down
26 changes: 24 additions & 2 deletions airbyte-cdk/python/airbyte_cdk/sources/utils/schema_helpers.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,10 +26,15 @@
import json
import os
import pkgutil
from typing import Dict
import sys
from typing import Any, Dict, Mapping

import pkg_resources
from jsonschema import RefResolver

from airbyte_cdk.logger import AirbyteLogger
from airbyte_cdk.models import ConnectorSpecification
from jsonschema import RefResolver, validate
from jsonschema.exceptions import ValidationError


class JsonSchemaResolver:
Expand Down Expand Up @@ -124,3 +129,20 @@ def get_schema(self, name: str) -> dict:
if os.path.exists(shared_schemas_folder):
return JsonSchemaResolver(shared_schemas_folder).resolve(raw_schema)
return raw_schema


def check_config_against_spec_or_exit(config: Mapping[str, Any], spec: ConnectorSpecification, logger: AirbyteLogger):
"""
Check config object against spec. In case of spec is invalid, throws
SystemExit exception causing application to make system exit call with
errorcode 1
:param config - config loaded from file specified over command line
:param spec - spec object generated by connector
:param logger - Airbyte logger for reporting validation error
"""
spec_schema = spec.connectionSpecification
try:
validate(instance=config, schema=spec_schema)
except ValidationError as validation_error:
logger.error("Config validation error: " + validation_error.message)
sys.exit(1)
2 changes: 1 addition & 1 deletion airbyte-cdk/python/setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@

setup(
name="airbyte-cdk",
version="0.1.10",
version="0.1.11",
description="A framework for writing Airbyte Connectors.",
long_description=README,
long_description_content_type="text/markdown",
Expand Down
11 changes: 10 additions & 1 deletion airbyte-cdk/python/unit_tests/destinations/test_destination.py
Original file line number Diff line number Diff line change
Expand Up @@ -179,7 +179,9 @@ def test_run_check(self, mocker, destination: Destination, tmp_path):

parsed_args = argparse.Namespace(**args)
destination.run_cmd(parsed_args)

spec_msg = ConnectorSpecification(connectionSpecification={})
mocker.patch.object(destination, "spec", return_value=spec_msg)
validate_mock = mocker.patch("airbyte_cdk.destinations.destination.check_config_against_spec_or_exit")
expected_check_result = AirbyteConnectionStatus(status=Status.SUCCEEDED)
mocker.patch.object(destination, "check", return_value=expected_check_result, autospec=True)

Expand All @@ -189,6 +191,8 @@ def test_run_check(self, mocker, destination: Destination, tmp_path):
destination.check.assert_called_once() # type: ignore
# Affirm to Mypy that this is indeed a method on this mock
destination.check.assert_called_with(logger=ANY, config=dummy_config) # type: ignore
# Check if config validation has been called
validate_mock.assert_called_with(dummy_config, spec_msg, destination.logger)

# verify output was correct
assert _wrapped(expected_check_result) == returned_check_result
Expand Down Expand Up @@ -216,6 +220,9 @@ def test_run_write(self, mocker, destination: Destination, tmp_path, monkeypatch
mocker.patch.object(
destination, "write", return_value=iter(expected_write_result), autospec=True # convert to iterator to mimic real usage
)
spec_msg = ConnectorSpecification(connectionSpecification={})
mocker.patch.object(destination, "spec", return_value=spec_msg)
validate_mock = mocker.patch("airbyte_cdk.destinations.destination.check_config_against_spec_or_exit")
# mock input is a record followed by some state messages
mocked_input: List[AirbyteMessage] = [_wrapped(_record("s1", {"k1": "v1"})), *expected_write_result]
mocked_stdin_string = "\n".join([record.json(exclude_unset=True) for record in mocked_input])
Expand All @@ -236,6 +243,8 @@ def test_run_write(self, mocker, destination: Destination, tmp_path, monkeypatch
# that iterates over two iterables to check equality
input_messages=OrderedIterableMatcher(mocked_input),
)
# Check if config validation has been called
validate_mock.assert_called_with(dummy_config, spec_msg, destination.logger)

# verify output was correct
assert expected_write_result == returned_write_result
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -100,7 +100,7 @@ def next_page_token(self, response: requests.Response) -> Optional[Mapping[str,


def test_next_page_token_is_input_to_other_methods(mocker):
""" Validates that the return value from next_page_token is passed into other methods that need it like request_params, headers, body, etc.."""
"""Validates that the return value from next_page_token is passed into other methods that need it like request_params, headers, body, etc.."""
pages = 5
stream = StubNextPageTokenHttpStream(pages=pages)
blank_response = {} # Send a blank response is fine as we ignore the response in `parse_response anyway.
Expand Down Expand Up @@ -144,6 +144,7 @@ def backoff_time(self, response: requests.Response) -> Optional[float]:


def test_stub_custom_backoff_http_stream(mocker):
mocker.patch("time.sleep", lambda x: None)
stream = StubCustomBackoffHttpStream()
req = requests.Response()
req.status_code = 429
Expand Down
62 changes: 47 additions & 15 deletions airbyte-cdk/python/unit_tests/test_entrypoint.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
from argparse import Namespace
from copy import deepcopy
from typing import Any, List, Mapping, MutableMapping, Union
from unittest.mock import MagicMock

import pytest
from airbyte_cdk import AirbyteEntrypoint
Expand Down Expand Up @@ -61,6 +62,14 @@ def _as_arglist(cmd: str, named_args: Mapping[str, Any]) -> List[str]:
return out


@pytest.fixture
def spec_mock(mocker):
expected = ConnectorSpecification(connectionSpecification={})
mock = MagicMock(return_value=expected)
mocker.patch.object(MockSource, "spec", mock)
return mock


@pytest.fixture
def entrypoint() -> AirbyteEntrypoint:
return AirbyteEntrypoint(MockSource())
Expand Down Expand Up @@ -121,40 +130,63 @@ def test_run_spec(entrypoint: AirbyteEntrypoint, mocker):
assert [_wrap_message(expected)] == list(entrypoint.run(parsed_args))


def test_run_check(entrypoint: AirbyteEntrypoint, mocker):
parsed_args = Namespace(command="check", config="config_path")
config = {"username": "fake"}
check_value = AirbyteConnectionStatus(status=Status.SUCCEEDED)
@pytest.fixture
def config_mock(mocker, request):
config = request.param if hasattr(request, "param") else {"username": "fake"}
mocker.patch.object(MockSource, "read_config", return_value=config)
mocker.patch.object(MockSource, "configure", return_value=config)
return config


@pytest.mark.parametrize(
"config_mock, schema, config_valid",
[
({"username": "fake"}, {"type": "object", "properties": {"name": {"type": "string"}}, "additionalProperties": False}, False),
({"username": "fake"}, {"type": "object", "properties": {"username": {"type": "string"}}, "additionalProperties": False}, True),
({"username": "fake"}, {"type": "object", "properties": {"user": {"type": "string"}}}, True),
],
indirect=["config_mock"],
)
def test_config_validate(entrypoint: AirbyteEntrypoint, mocker, config_mock, schema, config_valid):
parsed_args = Namespace(command="check", config="config_path")
check_value = AirbyteConnectionStatus(status=Status.SUCCEEDED)
mocker.patch.object(MockSource, "check", return_value=check_value)
mocker.patch.object(MockSource, "spec", return_value=ConnectorSpecification(connectionSpecification=schema))
if config_valid:
messages = list(entrypoint.run(parsed_args))
assert [_wrap_message(check_value)] == messages
else:
with pytest.raises(SystemExit) as ex_info:
list(entrypoint.run(parsed_args))
assert ex_info.value.code == 1


def test_run_check(entrypoint: AirbyteEntrypoint, mocker, spec_mock, config_mock):
parsed_args = Namespace(command="check", config="config_path")
check_value = AirbyteConnectionStatus(status=Status.SUCCEEDED)
mocker.patch.object(MockSource, "check", return_value=check_value)
assert [_wrap_message(check_value)] == list(entrypoint.run(parsed_args))
assert spec_mock.called


def test_run_discover(entrypoint: AirbyteEntrypoint, mocker):
def test_run_discover(entrypoint: AirbyteEntrypoint, mocker, spec_mock, config_mock):
parsed_args = Namespace(command="discover", config="config_path")
config = {"username": "fake"}
expected = AirbyteCatalog(streams=[AirbyteStream(name="stream", json_schema={"k": "v"})])
mocker.patch.object(MockSource, "read_config", return_value=config)
mocker.patch.object(MockSource, "configure", return_value=config)
mocker.patch.object(MockSource, "discover", return_value=expected)
assert [_wrap_message(expected)] == list(entrypoint.run(parsed_args))
assert spec_mock.called


def test_run_read(entrypoint: AirbyteEntrypoint, mocker):
def test_run_read(entrypoint: AirbyteEntrypoint, mocker, spec_mock, config_mock):
parsed_args = Namespace(command="read", config="config_path", state="statepath", catalog="catalogpath")
config = {"username": "fake"}
expected = AirbyteRecordMessage(stream="stream", data={"data": "stuff"}, emitted_at=1)
mocker.patch.object(MockSource, "read_config", return_value=config)
mocker.patch.object(MockSource, "configure", return_value=config)
mocker.patch.object(MockSource, "read_state", return_value={})
mocker.patch.object(MockSource, "read_catalog", return_value={})
mocker.patch.object(MockSource, "read", return_value=[AirbyteMessage(record=expected, type=Type.RECORD)])
assert [_wrap_message(expected)] == list(entrypoint.run(parsed_args))
assert spec_mock.called


def test_invalid_command(entrypoint: AirbyteEntrypoint, mocker):
def test_invalid_command(entrypoint: AirbyteEntrypoint, mocker, config_mock):
with pytest.raises(Exception):
mocker.patch.object(MockSource, "read_config", return_value={})
mocker.patch.object(MockSource, "configure", return_value={})
list(entrypoint.run(Namespace(command="invalid", config="conf")))

0 comments on commit b1f2bf5

Please sign in to comment.