Skip to content

Commit

Permalink
🐛 CDK: Fix the logging of unhandled exceptions to show stacktrace (ai…
Browse files Browse the repository at this point in the history
…rbytehq#8704)

* print stacktrace for unhandled exceptions

Co-authored-by: Eugene Kulak <kulak.eugene@gmail.com>
  • Loading branch information
keu and eugene-kulak authored Dec 13, 2021
1 parent 53c4047 commit d50ae47
Show file tree
Hide file tree
Showing 16 changed files with 138 additions and 84 deletions.
3 changes: 3 additions & 0 deletions airbyte-cdk/python/CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,8 @@
# Changelog

## 0.1.43
Fix logging of unhandled exceptions: print stacktrace.

## 0.1.42
Add base pydantic model for connector config and schemas.

Expand Down
4 changes: 4 additions & 0 deletions airbyte-cdk/python/airbyte_cdk/__init__.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,7 @@
#
# Copyright (c) 2021 Airbyte, Inc., all rights reserved.
#

from .connector import AirbyteSpec, Connector
from .entrypoint import AirbyteEntrypoint
from .logger import AirbyteLogger
Expand Down
4 changes: 4 additions & 0 deletions airbyte-cdk/python/airbyte_cdk/destinations/__init__.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,7 @@
#
# Copyright (c) 2021 Airbyte, Inc., all rights reserved.
#

from .destination import Destination

__all__ = ["Destination"]
10 changes: 4 additions & 6 deletions airbyte-cdk/python/airbyte_cdk/logger.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@
import logging.config
import sys
import traceback
from functools import partial
from typing import List

from airbyte_cdk.models import AirbyteLogMessage, AirbyteMessage
Expand Down Expand Up @@ -38,15 +37,14 @@ def init_unhandled_exception_output_filtering(logger: logging.Logger) -> None:
secrets removed.
"""

def hook_fn(_logger, exception_type, exception_value, traceback_):
def hook_fn(exception_type, exception_value, traceback_):
# For developer ergonomics, we want to see the stack trace in the logs when we do a ctrl-c
if issubclass(exception_type, KeyboardInterrupt):
sys.__excepthook__(exception_type, exception_value, traceback_)
return

logger.critical(str(exception_value))
else:
logger.critical(exception_value, exc_info=exception_value)

sys.excepthook = partial(hook_fn, logger)
sys.excepthook = hook_fn


def init_logger(name: str = None):
Expand Down
7 changes: 6 additions & 1 deletion airbyte-cdk/python/airbyte_cdk/sources/__init__.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,9 @@
#
# Copyright (c) 2021 Airbyte, Inc., all rights reserved.
#

from .abstract_source import AbstractSource
from .config import BaseConfig
from .source import Source

__all__ = ["AbstractSource", "Source"]
__all__ = ["AbstractSource", "BaseConfig", "Source"]
47 changes: 4 additions & 43 deletions airbyte-cdk/python/airbyte_cdk/sources/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,9 @@
# Copyright (c) 2021 Airbyte, Inc., all rights reserved.
#

from typing import Any, Dict, List, MutableMapping, Optional
from typing import Any, Dict

from jsonschema import RefResolver
from airbyte_cdk.sources.utils.schema_helpers import expand_refs, rename_key
from pydantic import BaseModel


Expand All @@ -16,50 +16,11 @@ class BaseConfig(BaseModel):
- drop description
"""

@classmethod
def _rename_key(cls, schema: Any, old_key: str, new_key: str) -> None:
"""Iterate over nested dictionary and replace one key with another. Used to replace anyOf with oneOf. Recursive."
:param schema: schema that will be patched
:param old_key: name of the key to replace
:param new_key: new name of the key
"""
if not isinstance(schema, MutableMapping):
return

for key, value in schema.items():
cls._rename_key(value, old_key, new_key)
if old_key in schema:
schema[new_key] = schema.pop(old_key)

@classmethod
def _expand_refs(cls, schema: Any, ref_resolver: Optional[RefResolver] = None) -> None:
"""Iterate over schema and replace all occurrences of $ref with their definitions. Recursive.
:param schema: schema that will be patched
:param ref_resolver: resolver to get definition from $ref, if None pass it will be instantiated
"""
ref_resolver = ref_resolver or RefResolver.from_schema(schema)

if isinstance(schema, MutableMapping):
if "$ref" in schema:
ref_url = schema.pop("$ref")
_, definition = ref_resolver.resolve(ref_url)
cls._expand_refs(definition, ref_resolver=ref_resolver) # expand refs in definitions as well
schema.update(definition)
else:
for key, value in schema.items():
cls._expand_refs(value, ref_resolver=ref_resolver)
elif isinstance(schema, List):
for value in schema:
cls._expand_refs(value, ref_resolver=ref_resolver)

@classmethod
def schema(cls, **kwargs) -> Dict[str, Any]:
"""We're overriding the schema classmethod to enable some post-processing"""
schema = super().schema(**kwargs)
cls._rename_key(schema, old_key="anyOf", new_key="oneOf") # UI supports only oneOf
cls._expand_refs(schema) # UI and destination doesn't support $ref's
schema.pop("definitions", None) # remove definitions created by $ref
rename_key(schema, old_key="anyOf", new_key="oneOf") # UI supports only oneOf
expand_refs(schema)
schema.pop("description", None) # description added from the docstring
return schema
4 changes: 4 additions & 0 deletions airbyte-cdk/python/airbyte_cdk/sources/singer/__init__.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,7 @@
#
# Copyright (c) 2021 Airbyte, Inc., all rights reserved.
#

from .singer_helpers import SingerHelper, SyncModeInfo
from .source import SingerSource

Expand Down
4 changes: 4 additions & 0 deletions airbyte-cdk/python/airbyte_cdk/sources/streams/__init__.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,7 @@
#
# Copyright (c) 2021 Airbyte, Inc., all rights reserved.
#

# Initialize Streams Package
from .core import Stream

Expand Down
Original file line number Diff line number Diff line change
@@ -1,3 +1,7 @@
#
# Copyright (c) 2021 Airbyte, Inc., all rights reserved.
#

# Initialize Streams Package
from .exceptions import UserDefinedBackoffException
from .http import HttpStream, HttpSubStream
Expand Down
Original file line number Diff line number Diff line change
@@ -1,3 +1,7 @@
#
# Copyright (c) 2021 Airbyte, Inc., all rights reserved.
#

# Initialize Auth Package
from .core import HttpAuthenticator, NoAuth
from .oauth import Oauth2Authenticator
Expand Down
Original file line number Diff line number Diff line change
@@ -1,25 +1,5 @@
#
# MIT License
#
# Copyright (c) 2020 Airbyte
#
# Permission is hereby granted, free of charge, to any person obtaining a copy
# of this software and associated documentation files (the "Software"), to deal
# in the Software without restriction, including without limitation the rights
# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
# copies of the Software, and to permit persons to whom the Software is
# furnished to do so, subject to the following conditions:
#
# The above copyright notice and this permission notice shall be included in all
# copies or substantial portions of the Software.
#
# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
# SOFTWARE.
# Copyright (c) 2021 Airbyte, Inc., all rights reserved.
#

from .oauth import Oauth2Authenticator
Expand Down
53 changes: 50 additions & 3 deletions airbyte-cdk/python/airbyte_cdk/sources/utils/schema_helpers.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,11 +7,11 @@
import json
import os
import pkgutil
from typing import Any, ClassVar, Dict, Mapping, Tuple
from typing import Any, ClassVar, Dict, List, Mapping, MutableMapping, Optional, Tuple, Union

import jsonref
from airbyte_cdk.models import ConnectorSpecification
from jsonschema import validate
from jsonschema import RefResolver, validate
from jsonschema.exceptions import ValidationError
from pydantic import BaseModel, Field

Expand All @@ -32,7 +32,7 @@ def __call__(self, uri: str) -> Dict[str, Any]:
return json.load(open(uri))


def resolve_ref_links(obj: Any) -> Dict[str, Any]:
def resolve_ref_links(obj: Any) -> Union[Dict[str, Any], List[Any]]:
"""
Scan resolved schema and convert jsonref.JsonRef object to JSON serializable dict.
Expand All @@ -53,6 +53,53 @@ def resolve_ref_links(obj: Any) -> Dict[str, Any]:
return obj


def _expand_refs(schema: Any, ref_resolver: Optional[RefResolver] = None) -> None:
"""Internal function to iterate over schema and replace all occurrences of $ref with their definitions. Recursive.
:param schema: schema that will be patched
:param ref_resolver: resolver to get definition from $ref, if None pass it will be instantiated
"""
ref_resolver = ref_resolver or RefResolver.from_schema(schema)

if isinstance(schema, MutableMapping):
if "$ref" in schema:
ref_url = schema.pop("$ref")
_, definition = ref_resolver.resolve(ref_url)
_expand_refs(definition, ref_resolver=ref_resolver) # expand refs in definitions as well
schema.update(definition)
else:
for key, value in schema.items():
_expand_refs(value, ref_resolver=ref_resolver)
elif isinstance(schema, List):
for value in schema:
_expand_refs(value, ref_resolver=ref_resolver)


def expand_refs(schema: Any) -> None:
"""Iterate over schema and replace all occurrences of $ref with their definitions.
:param schema: schema that will be patched
"""
_expand_refs(schema)
schema.pop("definitions", None) # remove definitions created by $ref


def rename_key(schema: Any, old_key: str, new_key: str) -> None:
"""Iterate over nested dictionary and replace one key with another. Used to replace anyOf with oneOf. Recursive."
:param schema: schema that will be patched
:param old_key: name of the key to replace
:param new_key: new name of the key
"""
if not isinstance(schema, MutableMapping):
return

for key, value in schema.items():
rename_key(value, old_key, new_key)
if old_key in schema:
schema[new_key] = schema.pop(old_key)


class ResourceSchemaLoader:
"""JSONSchema loader from package resources"""

Expand Down
8 changes: 8 additions & 0 deletions airbyte-cdk/python/airbyte_cdk/sources/utils/schema_models.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@

from typing import Any, Dict, Optional, Type

from airbyte_cdk.sources.utils.schema_helpers import expand_refs
from pydantic import BaseModel, Extra
from pydantic.main import ModelMetaclass
from pydantic.typing import resolve_annotations
Expand Down Expand Up @@ -74,3 +75,10 @@ def schema_extra(cls, schema: Dict[str, Any], model: Type[BaseModel]) -> None:
elif "$ref" in prop:
ref = prop.pop("$ref")
prop["oneOf"] = [{"type": "null"}, {"$ref": ref}]

@classmethod
def schema(cls, **kwargs) -> Dict[str, Any]:
"""We're overriding the schema classmethod to enable some post-processing"""
schema = super().schema(**kwargs)
expand_refs(schema)
return schema
2 changes: 1 addition & 1 deletion airbyte-cdk/python/setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@

setup(
name="airbyte-cdk",
version="0.1.42",
version="0.1.43",
description="A framework for writing Airbyte Connectors.",
long_description=README,
long_description_content_type="text/markdown",
Expand Down
25 changes: 16 additions & 9 deletions airbyte-cdk/python/unit_tests/sources/utils/test_schema_models.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,11 +28,16 @@ class TestSchemaWithFewNullables:
"type": "object",
"properties": {
"name": {"type": ["null", "string"]},
"optional_item": {"oneOf": [{"type": "null"}, {"$ref": "#/definitions/InnerClass"}]},
"items": {"type": "array", "items": {"$ref": "#/definitions/InnerClass"}},
},
"definitions": {
"InnerClass": {"type": "object", "properties": {"field1": {"type": ["null", "string"]}, "field2": {"type": "integer"}}}
"optional_item": {
"oneOf": [
{"type": "null"},
{"type": "object", "properties": {"field1": {"type": ["null", "string"]}, "field2": {"type": "integer"}}},
]
},
"items": {
"type": "array",
"items": {"type": "object", "properties": {"field1": {"type": ["null", "string"]}, "field2": {"type": "integer"}}},
},
},
}

Expand All @@ -46,10 +51,12 @@ class TestSchemaWithAllOptional:
"type": "object",
"properties": {
"object_id": {"type": ["null", "integer"]},
"item": {"oneOf": [{"type": "null"}, {"$ref": "#/definitions/InnerClass"}]},
},
"definitions": {
"InnerClass": {"type": "object", "properties": {"field1": {"type": ["null", "string"]}, "field2": {"type": "integer"}}}
"item": {
"oneOf": [
{"type": "null"},
{"type": "object", "properties": {"field1": {"type": ["null", "string"]}, "field2": {"type": "integer"}}},
]
},
},
}

Expand Down
21 changes: 21 additions & 0 deletions airbyte-cdk/python/unit_tests/test_logger.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,10 +5,13 @@

import json
import logging
import subprocess
import sys
from typing import Dict

import pytest
from airbyte_cdk.logger import AirbyteLogFormatter
from airbyte_cdk.models import AirbyteLogMessage, AirbyteMessage


@pytest.fixture(scope="session")
Expand Down Expand Up @@ -92,3 +95,21 @@ def test_fatal(logger, caplog):
record = caplog.records[0]
assert record.levelname == "CRITICAL"
assert record.message == "Test fatal 1"


def test_unhandled_logger():
cmd = "from airbyte_cdk.logger import init_logger; init_logger('airbyte'); raise 1"
expected_message = (
"exceptions must derive from BaseException\n"
"Traceback (most recent call last):\n"
' File "<string>", line 1, in <module>\n'
"TypeError: exceptions must derive from BaseException"
)
log_message = AirbyteMessage(type="LOG", log=AirbyteLogMessage(level="FATAL", message=expected_message))
expected_output = log_message.json(exclude_unset=True)

with pytest.raises(subprocess.CalledProcessError) as err:
subprocess.check_output([sys.executable, "-c", cmd], stderr=subprocess.STDOUT)

assert not err.value.stderr, "nothing on the stderr"
assert err.value.output.decode("utf-8").strip() == expected_output, "Error should be printed in expected form"

0 comments on commit d50ae47

Please sign in to comment.