Skip to content

Commit

Permalink
🎉 CDK: use standard logger with custom handler
Browse files Browse the repository at this point in the history
* CDK - using native logger with custom formatter

* CDK - using native logger(fix exception and add trace)

* CDK - using native logger in AirbyteEntrypoint

* CDK - CHANGELOG.md

* CDK - remove unnecessary imports

* CDK - fixing according to PR review

* CDK native logger airbytehq#1279 - annotations

* CDK native logger airbytehq#1279 - fixing according to PR review

* CDK standard logger airbytehq#1279 - tests

* CDK standard logger airbytehq#1279 - reformat

* Airbyte CDK airbytehq#1279 - improve docstrings

* Airbyte CDK airbytehq#1279 - improve log levels

* Airbyte CDK airbytehq#1279 - fix init get source name

* Airbyte CDK airbytehq#1279 - update test licence

* Airbyte CDK airbytehq#1279 - bump version
  • Loading branch information
vitaliizazmic authored Oct 12, 2021
1 parent fae6d47 commit d5c0499
Show file tree
Hide file tree
Showing 9 changed files with 168 additions and 14 deletions.
6 changes: 6 additions & 0 deletions airbyte-cdk/python/CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,11 @@
# Changelog

## 0.1.27
Improving unit test for logger

## 0.1.26
Use python standard logging instead of custom class

## 0.1.25
Modified `OAuth2Specification` model, added new fields: `rootObject` and `oauthFlowOutputParameters`

Expand Down
19 changes: 9 additions & 10 deletions airbyte-cdk/python/airbyte_cdk/entrypoint.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,17 +10,16 @@
import tempfile
from typing import Iterable, List

from airbyte_cdk.logger import AirbyteLogger
from airbyte_cdk.logger import init_logger
from airbyte_cdk.models import AirbyteMessage, Status, Type
from airbyte_cdk.sources import Source
from airbyte_cdk.sources.utils.schema_helpers import check_config_against_spec_or_exit, split_config

logger = AirbyteLogger()


class AirbyteEntrypoint(object):
def __init__(self, source: Source):
self.source = source
self.logger = init_logger(getattr(source, "name", "source"))

def parse_args(self, args: List[str]) -> argparse.Namespace:
# set up parent parsers
Expand Down Expand Up @@ -61,7 +60,7 @@ def run(self, parsed_args: argparse.Namespace) -> Iterable[str]:
raise Exception("No command passed")

# todo: add try catch for exceptions with different exit codes
source_spec = self.source.spec(logger)
source_spec = self.source.spec(self.logger)

with tempfile.TemporaryDirectory() as temp_dir:
if cmd == "spec":
Expand All @@ -73,26 +72,26 @@ def run(self, parsed_args: argparse.Namespace) -> Iterable[str]:
# Remove internal flags from config before validating so
# jsonschema's additionalProperties flag wont fail the validation
config, internal_config = split_config(config)
check_config_against_spec_or_exit(config, source_spec, logger)
check_config_against_spec_or_exit(config, source_spec, self.logger)
# Put internal flags back to config dict
config.update(internal_config.dict())

if cmd == "check":
check_result = self.source.check(logger, config)
check_result = self.source.check(self.logger, config)
if check_result.status == Status.SUCCEEDED:
logger.info("Check succeeded")
self.logger.info("Check succeeded")
else:
logger.error("Check failed")
self.logger.error("Check failed")

output_message = AirbyteMessage(type=Type.CONNECTION_STATUS, connectionStatus=check_result).json(exclude_unset=True)
yield output_message
elif cmd == "discover":
catalog = self.source.discover(logger, config)
catalog = self.source.discover(self.logger, config)
yield AirbyteMessage(type=Type.CATALOG, catalog=catalog).json(exclude_unset=True)
elif cmd == "read":
config_catalog = self.source.read_catalog(parsed_args.catalog)
state = self.source.read_state(parsed_args.state)
generator = self.source.read(logger, config, config_catalog, state)
generator = self.source.read(self.logger, config, config_catalog, state)
for message in generator:
yield message.json(exclude_unset=True)
else:
Expand Down
68 changes: 67 additions & 1 deletion airbyte-cdk/python/airbyte_cdk/logger.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,17 +2,83 @@
# Copyright (c) 2021 Airbyte, Inc., all rights reserved.
#


import logging
import logging.config
import traceback

from airbyte_cdk.models import AirbyteLogMessage, AirbyteMessage

TRACE_LEVEL_NUM = 5

LOGGING_CONFIG = {
"version": 1,
"disable_existing_loggers": False,
"formatters": {
"airbyte": {"()": "airbyte_cdk.logger.AirbyteLogFormatter", "format": "%(message)s"},
},
"handlers": {
"console": {
"class": "logging.StreamHandler",
"stream": "ext://sys.stdout",
"formatter": "airbyte",
},
},
"root": {
"handlers": ["console"],
},
}


def init_logger(name: str):
"""Initial set up of logger"""
logging.setLoggerClass(AirbyteNativeLogger)
logging.addLevelName(TRACE_LEVEL_NUM, "TRACE")
logger = logging.getLogger(name)
logger.setLevel(TRACE_LEVEL_NUM)
logging.config.dictConfig(LOGGING_CONFIG)
return logger


class AirbyteLogFormatter(logging.Formatter):
"""Output log records using AirbyteMessage"""

def format(self, record: logging.LogRecord) -> str:
"""Return a JSON representation of the log message"""
message = super().format(record)
log_message = AirbyteMessage(type="LOG", log=AirbyteLogMessage(level=record.levelname, message=message))
return log_message.json(exclude_unset=True)


class AirbyteNativeLogger(logging.Logger):
"""Using native logger with implementing all AirbyteLogger features"""

def __init__(self, name):
super().__init__(name)
self.valid_log_types = ["FATAL", "ERROR", "WARN", "INFO", "DEBUG", "TRACE"]

def log_by_prefix(self, msg, default_level):
"""Custom method, which takes log level from first word of message"""
split_line = msg.split()
first_word = next(iter(split_line), None)
if first_word in self.valid_log_types:
log_level = logging.getLevelName(first_word)
rendered_message = " ".join(split_line[1:])
else:
default_level = default_level if default_level in self.valid_log_types else "INFO"
log_level = logging.getLevelName(default_level)
rendered_message = msg
self.log(log_level, rendered_message)

def trace(self, msg, *args, **kwargs):
self._log(TRACE_LEVEL_NUM, msg, args, **kwargs)


class AirbyteLogger:
def __init__(self):
self.valid_log_types = ["FATAL", "ERROR", "WARN", "INFO", "DEBUG", "TRACE"]

def log_by_prefix(self, message, default_level):
"""Custom method, which takes log level from first word of message"""
split_line = message.split()
first_word = next(iter(split_line), None)
if first_word in self.valid_log_types:
Expand Down
2 changes: 2 additions & 0 deletions airbyte-cdk/python/airbyte_cdk/models/airbyte_protocol.py
Original file line number Diff line number Diff line change
Expand Up @@ -44,8 +44,10 @@ class Config:

class Level(Enum):
FATAL = "FATAL"
CRITICAL = "CRITICAL"
ERROR = "ERROR"
WARN = "WARN"
WARNING = "WARNING"
INFO = "INFO"
DEBUG = "DEBUG"
TRACE = "TRACE"
Expand Down
6 changes: 4 additions & 2 deletions airbyte-cdk/python/airbyte_cdk/sources/streams/core.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,11 +4,11 @@


import inspect
import logging
from abc import ABC, abstractmethod
from typing import Any, Iterable, List, Mapping, MutableMapping, Optional, Union

import airbyte_cdk.sources.utils.casing as casing
from airbyte_cdk.logger import AirbyteLogger
from airbyte_cdk.models import AirbyteStream, SyncMode
from airbyte_cdk.sources.utils.schema_helpers import ResourceSchemaLoader
from airbyte_cdk.sources.utils.transform import TransformConfig, TypeTransformer
Expand All @@ -26,7 +26,9 @@ class Stream(ABC):
"""

# Use self.logger in subclasses to log any messages
logger = AirbyteLogger() # TODO use native "logging" loggers with custom handlers
@property
def logger(self):
return logging.getLogger(f"streams.{self.name}")

# TypeTransformer object to perform output data transformation
transformer: TypeTransformer = TypeTransformer(TransformConfig.NoTransform)
Expand Down
2 changes: 1 addition & 1 deletion airbyte-cdk/python/setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@

setup(
name="airbyte-cdk",
version="0.1.25",
version="0.1.27",
description="A framework for writing Airbyte Connectors.",
long_description=README,
long_description_content_type="text/markdown",
Expand Down
75 changes: 75 additions & 0 deletions airbyte-cdk/python/unit_tests/test_logger.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,75 @@
#
# Copyright (c) 2021 Airbyte, Inc., all rights reserved.
#


import json
from typing import Dict

import pytest
from airbyte_cdk.logger import AirbyteLogFormatter, init_logger


@pytest.fixture(scope="session")
def logger():
logger = init_logger("Test logger")
return logger


def test_formatter(logger, caplog):
formatter = AirbyteLogFormatter()
logger.info("Test formatter")
record = caplog.records[0]
formatted_record = formatter.format(record)
formatted_record_data = json.loads(formatted_record)
assert formatted_record_data.get("type") == "LOG"
log = formatted_record_data.get("log")
assert isinstance(log, Dict)
level = log.get("level")
message = log.get("message")
assert level == "INFO"
assert message == "Test formatter"


def test_trace(logger, caplog):
logger.trace("Test trace 1")
record = caplog.records[0]
assert record.levelname == "TRACE"
assert record.message == "Test trace 1"


def test_debug(logger, caplog):
logger.debug("Test debug 1")
record = caplog.records[0]
assert record.levelname == "DEBUG"
assert record.message == "Test debug 1"


def test_info(logger, caplog):
logger.info("Test info 1")
logger.info("Test info 2")
assert len(caplog.records) == 2
first_record = caplog.records[0]
assert first_record.levelname == "INFO"
assert first_record.message == "Test info 1"


def test_warn(logger, caplog):
logger.warn("Test warn 1")
record = caplog.records[0]
assert record.levelname == "WARNING"
assert record.message == "Test warn 1"


def test_error(logger, caplog):
logger.error("Test error 1")
record = caplog.records[0]
assert record.levelname == "ERROR"
assert record.message == "Test error 1"


def test_fatal(logger, caplog):
logger.fatal("Test fatal 1")
record = caplog.records[0]
assert record.levelname == "CRITICAL"
assert record.message == "Test fatal 1"
Original file line number Diff line number Diff line change
Expand Up @@ -44,8 +44,10 @@ class Config:

class Level(Enum):
FATAL = "FATAL"
CRITICAL = "CRITICAL"
ERROR = "ERROR"
WARN = "WARN"
WARNING = "WARNING"
INFO = "INFO"
DEBUG = "DEBUG"
TRACE = "TRACE"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -86,8 +86,10 @@ definitions:
type: string
enum:
- FATAL
- CRITICAL
- ERROR
- WARN
- WARNING
- INFO
- DEBUG
- TRACE
Expand Down

0 comments on commit d5c0499

Please sign in to comment.