Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Convert the server to micronaut #19194

Merged
merged 143 commits into from
Jan 6, 2023
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
Show all changes
143 commits
Select commit Hold shift + click to select a range
7027679
Extract Operation API
benmoriceau Nov 3, 2022
eb18779
Extract scheduler API
benmoriceau Nov 3, 2022
05ffc3a
Format
benmoriceau Nov 3, 2022
09d63e5
Merge branch 'bmoric/extract-operation-api' of github.com:airbytehq/a…
benmoriceau Nov 3, 2022
585a8a9
Merge branch 'master' of github.com:airbytehq/airbyte into bmoric/ext…
benmoriceau Nov 3, 2022
6fc0a85
Merge branch 'bmoric/extract-operation-api' of github.com:airbytehq/a…
benmoriceau Nov 3, 2022
97b33a1
Merge branch 'master' of github.com:airbytehq/airbyte into bmoric/ext…
benmoriceau Nov 3, 2022
88c2557
extract source api
benmoriceau Nov 3, 2022
4f799f5
Merge branch 'master' of github.com:airbytehq/airbyte into bmoric/ext…
benmoriceau Nov 3, 2022
af86517
Extract source definition api
benmoriceau Nov 4, 2022
c76ed4e
Add path
benmoriceau Nov 4, 2022
6a57dcc
Extract State API
benmoriceau Nov 4, 2022
30b991d
extract webbackend api
benmoriceau Nov 4, 2022
c72225d
extract webbackend api
benmoriceau Nov 4, 2022
ecad901
extract workspace api
benmoriceau Nov 4, 2022
1cf081b
Extract source definition specification api
benmoriceau Nov 4, 2022
e1649ab
Remove configuration API
benmoriceau Nov 4, 2022
7bb500b
Merge branch 'master' of github.com:airbytehq/airbyte into bmoric/ext…
benmoriceau Nov 8, 2022
0962577
Merge branch 'master' into bmoric/extract-source-definition-specifica…
benmoriceau Nov 8, 2022
025b12b
Merge branch 'bmoric/extract-source-definition-specification-api' of …
benmoriceau Nov 8, 2022
4582738
Merge branch 'master' of github.com:airbytehq/airbyte into bmoric/rm-…
benmoriceau Nov 8, 2022
35a8624
tmp
benmoriceau Nov 9, 2022
f8452dd
Checkstyle
benmoriceau Nov 9, 2022
3d8948d
tmp
benmoriceau Nov 10, 2022
a4a5b5d
tmp
benmoriceau Nov 10, 2022
7639d5d
Merge branch 'master' of github.com:airbytehq/airbyte into bmoric/con…
benmoriceau Nov 15, 2022
470f637
Merge branch 'master' of github.com:airbytehq/airbyte into bmoric/con…
benmoriceau Nov 15, 2022
abfd514
Inject but don't resolve Bean
benmoriceau Nov 16, 2022
46c9d8e
tmp
benmoriceau Nov 16, 2022
b67653c
Tmp
benmoriceau Nov 16, 2022
b6b397e
Merge branch 'master' of github.com:airbytehq/airbyte into bmoric/con…
benmoriceau Nov 18, 2022
3b5b39c
fix build
benmoriceau Nov 18, 2022
1358315
TMP
benmoriceau Nov 21, 2022
1a60f15
Tmp
benmoriceau Nov 22, 2022
6229c09
Clean up
benmoriceau Nov 22, 2022
e24a488
better thread pool
benmoriceau Nov 22, 2022
0d8ae2f
Change port to 8080
benmoriceau Nov 22, 2022
ce1932e
Fix port
benmoriceau Nov 22, 2022
6c4da77
Merge branch 'master' of github.com:airbytehq/airbyte into bmoric/con…
benmoriceau Nov 22, 2022
f5b23cf
Rm unused
benmoriceau Nov 22, 2022
bba0af8
Cors filter
benmoriceau Nov 22, 2022
5e2c9a5
Format
benmoriceau Nov 22, 2022
002a39e
rename
benmoriceau Nov 28, 2022
4d36cd1
Tmp
benmoriceau Nov 30, 2022
f5cbc7b
Config based
benmoriceau Nov 30, 2022
3b2e18a
Rm health controller ref
benmoriceau Nov 30, 2022
bad7f07
Merge branch 'master' of github.com:airbytehq/airbyte into bmoric/con…
benmoriceau Nov 30, 2022
d3aecda
tmp
benmoriceau Nov 30, 2022
8f03443
Pool size
benmoriceau Dec 1, 2022
4666776
Mock healthcheck
benmoriceau Dec 2, 2022
267094a
Revert "Mock healthcheck"
benmoriceau Dec 2, 2022
95fa38c
Revert "Revert "Mock healthcheck""
benmoriceau Dec 2, 2022
245dd11
Restore health check
benmoriceau Dec 2, 2022
c237c8e
Tmp
benmoriceau Dec 2, 2022
4447ba8
format
benmoriceau Dec 2, 2022
57b0af8
Rm deprecated
benmoriceau Dec 2, 2022
8fef069
Fix PMD
benmoriceau Dec 3, 2022
44146ef
Merge branch 'master' of github.com:airbytehq/airbyte into bmoric/con…
benmoriceau Dec 3, 2022
77ff93a
Tmp
benmoriceau Dec 5, 2022
1c84878
Fix proxy test
benmoriceau Dec 5, 2022
a3fadde
Remove useless annotation
benmoriceau Dec 5, 2022
03d902a
set auto commit as false
benmoriceau Dec 7, 2022
caef0f6
Merge branch 'master' of github.com:airbytehq/airbyte into bmoric/con…
benmoriceau Dec 7, 2022
3c55fb0
Clean up and PR comments
benmoriceau Dec 7, 2022
92a5efa
Bmoric/convert attempt micronaut (#19847)
benmoriceau Dec 7, 2022
1dd8988
Comments and banner
benmoriceau Dec 7, 2022
6542588
Merge branch 'master' of github.com:airbytehq/airbyte into bmoric/con…
benmoriceau Dec 7, 2022
1d5ccae
Merge branch 'master' of github.com:airbytehq/airbyte into bmoric/con…
benmoriceau Dec 8, 2022
8402ffd
Merge branch 'master' of github.com:airbytehq/airbyte into bmoric/con…
benmoriceau Dec 8, 2022
f78b7ac
Non related files
benmoriceau Dec 8, 2022
04a2cac
rm tmp
benmoriceau Dec 8, 2022
2eece6d
Merge branch 'master' of github.com:airbytehq/airbyte into bmoric/con…
benmoriceau Dec 8, 2022
eb31d50
Fix build
benmoriceau Dec 8, 2022
a91d26b
Merge branch 'master' of github.com:airbytehq/airbyte into bmoric/con…
benmoriceau Dec 9, 2022
018d3dd
Format
benmoriceau Dec 9, 2022
d50555d
Hit the micronaut server directly
benmoriceau Dec 9, 2022
ce9127e
micronaut OperationApiController (#20270)
colesnodgrass Dec 9, 2022
01d5ebc
Bmoric/convert connection micronaut (#20211)
benmoriceau Dec 9, 2022
fa87e26
Bmoric/convert destination controller micronaut (#20269)
benmoriceau Dec 9, 2022
cb5d151
Bmoric/convert destination definition controller micronaut (#20277)
benmoriceau Dec 9, 2022
d35f939
convert StateApiController to Micronaut (#20329)
colesnodgrass Dec 9, 2022
db06338
Move dest oauth to micronaut (#20318)
benmoriceau Dec 9, 2022
711a7c7
Bmoric/convert source micronaut (#20334)
benmoriceau Dec 9, 2022
45cac68
Migrate to micronaut (#20339)
benmoriceau Dec 12, 2022
06c0af8
Migrate source to micronaut
benmoriceau Dec 12, 2022
ae5fad3
Merge branch 'master' of github.com:airbytehq/airbyte into bmoric/con…
benmoriceau Dec 12, 2022
f801280
convert SchedulerApiController to Micronaut (#20337)
colesnodgrass Dec 12, 2022
9fe2319
Bmoric/convert source definition micronaut (#20338)
benmoriceau Dec 12, 2022
fe595fb
update SourceOauthApiController to Micronaut (#20386)
colesnodgrass Dec 12, 2022
e9484a5
convert WorkspaceApiController to micronaut (#20214)
colesnodgrass Dec 12, 2022
1f257ed
Bmoric/convert jobs micronaut (#20382)
benmoriceau Dec 12, 2022
013135e
Bmoric/convert source definition specification micronaut (#20379)
benmoriceau Dec 12, 2022
38314dc
convert database assert call to Micronaut (#20406)
colesnodgrass Dec 13, 2022
ae2e034
convert NotificationsApiController to Micronaut (#20396)
colesnodgrass Dec 13, 2022
a8c11d2
Migrate logs to micronaut (#20400)
benmoriceau Dec 13, 2022
6a480ef
Bmoric/convert webbackend micronaut (#20403)
benmoriceau Dec 13, 2022
c0682a3
Merge branch 'master' of github.com:airbytehq/airbyte into bmoric/con…
benmoriceau Dec 21, 2022
5d13c4c
Cleanup (#20459)
benmoriceau Dec 21, 2022
fcb271d
Delete logs API
benmoriceau Dec 22, 2022
990a681
Merge branch 'master' of github.com:airbytehq/airbyte into bmoric/con…
benmoriceau Dec 22, 2022
60cbe90
Revert "Delete logs API"
benmoriceau Dec 23, 2022
a43f2d3
Rm flaky test
benmoriceau Dec 23, 2022
f001bdb
Format
benmoriceau Dec 23, 2022
97dd2df
Try to fix test
benmoriceau Dec 23, 2022
53f30c5
Format
benmoriceau Dec 24, 2022
c0a9ebc
Merge branch 'master' of github.com:airbytehq/airbyte into bmoric/con…
benmoriceau Jan 3, 2023
3e2c92f
Remove optional
benmoriceau Jan 3, 2023
f085d82
Rm import
benmoriceau Jan 3, 2023
ddf7228
Test sleep
benmoriceau Jan 3, 2023
33c5842
Simplify injection
benmoriceau Jan 3, 2023
ab6fead
update import
benmoriceau Jan 3, 2023
44caebb
Remove sleep
benmoriceau Jan 3, 2023
d2029e4
More injection
benmoriceau Jan 3, 2023
fb153cc
Remove more requirement
benmoriceau Jan 3, 2023
2f5602e
imports
benmoriceau Jan 3, 2023
bf8586e
Remove more requirement
benmoriceau Jan 3, 2023
250a6b3
Fix yaml
benmoriceau Jan 3, 2023
c390464
Remove unused conf
benmoriceau Jan 3, 2023
624edd9
Add role
benmoriceau Jan 3, 2023
e6e04f1
Test acceptance test
benmoriceau Jan 3, 2023
ad554f1
Merge branch 'master' of github.com:airbytehq/airbyte into bmoric/con…
benmoriceau Jan 3, 2023
19a1603
Update env
benmoriceau Jan 4, 2023
09ff988
Revert "Update to Micronaut 3.8.0 (#20716)"
benmoriceau Jan 4, 2023
c183e67
Update helm chart
benmoriceau Jan 4, 2023
446f8c2
Fix helm chart
benmoriceau Jan 4, 2023
d4b4d49
Merge branch 'master' of github.com:airbytehq/airbyte into bmoric/con…
benmoriceau Jan 4, 2023
55b1499
Convert Application Listener
benmoriceau Jan 5, 2023
18ebd34
Format
benmoriceau Jan 5, 2023
90e1ebe
Add explicit deployment mode
benmoriceau Jan 5, 2023
95d1c24
Change check port
benmoriceau Jan 5, 2023
f7c1e0c
Update version and bump version to the right value
benmoriceau Jan 5, 2023
dcb85ec
Cleanup
benmoriceau Jan 5, 2023
ecc6798
Merge branch 'master' of github.com:airbytehq/airbyte into bmoric/con…
benmoriceau Jan 5, 2023
777270a
Update FE end to end test
benmoriceau Jan 5, 2023
24cd7a8
Allow head request
benmoriceau Jan 5, 2023
d943232
Merge branch 'master' into bmoric/convert-health-micronaut
benmoriceau Jan 5, 2023
80264fd
Merge branch 'master' into bmoric/convert-health-micronaut
benmoriceau Jan 5, 2023
d046c55
Merge branch 'master' of github.com:airbytehq/airbyte into bmoric/con…
benmoriceau Jan 6, 2023
2e282b9
Fix controller
benmoriceau Jan 6, 2023
61bf41b
Merge branch 'master' of github.com:airbytehq/airbyte into bmoric/con…
benmoriceau Jan 6, 2023
d6c7db6
Format
benmoriceau Jan 6, 2023
cfd8881
Fix http client Bean
benmoriceau Jan 6, 2023
6e2c561
Format
benmoriceau Jan 6, 2023
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
Merge branch 'master' of github.com:airbytehq/airbyte into bmoric/rm-…
…configuration-api
  • Loading branch information
benmoriceau committed Nov 8, 2022
commit 4582738ce5396c7d5e1d72a0980e6a5b42ed5072
3 changes: 3 additions & 0 deletions airbyte-cdk/python/CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,8 @@
# Changelog

## 0.7.1
Low-code: Decouple yaml manifest parsing from the declarative source implementation

## 0.7.0
Low-code: Allow connector specifications to be defined in the manifest

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,173 @@
#
# Copyright (c) 2022 Airbyte, Inc., all rights reserved.
#

import inspect
import json
import logging
import typing
from dataclasses import dataclass, fields
from enum import Enum, EnumMeta
from typing import Any, List, Mapping, Union

from airbyte_cdk.models import ConnectorSpecification
from airbyte_cdk.sources.declarative.checks import CheckStream
from airbyte_cdk.sources.declarative.checks.connection_checker import ConnectionChecker
from airbyte_cdk.sources.declarative.declarative_source import DeclarativeSource
from airbyte_cdk.sources.declarative.declarative_stream import DeclarativeStream
from airbyte_cdk.sources.declarative.exceptions import InvalidConnectorDefinitionException
from airbyte_cdk.sources.declarative.parsers.factory import DeclarativeComponentFactory
from airbyte_cdk.sources.declarative.types import ConnectionDefinition
from airbyte_cdk.sources.streams.core import Stream
from dataclasses_jsonschema import JsonSchemaMixin
from jsonschema.validators import validate


@dataclass
class ConcreteDeclarativeSource(JsonSchemaMixin):
version: str
checker: CheckStream
streams: List[DeclarativeStream]


class ManifestDeclarativeSource(DeclarativeSource):
"""Declarative source defined by a manifest of low-code components that define source connector behavior"""

VALID_TOP_LEVEL_FIELDS = {"check", "definitions", "spec", "streams", "version"}

def __init__(self, source_config: ConnectionDefinition):
"""
:param source_config(Mapping[str, Any]): The manifest of low-code components that describe the source connector
"""
self.logger = logging.getLogger(f"airbyte.{self.name}")
self._source_config = source_config
self._factory = DeclarativeComponentFactory()

self._validate_source()

# Stopgap to protect the top-level namespace until it's validated through the schema
unknown_fields = [key for key in self._source_config.keys() if key not in self.VALID_TOP_LEVEL_FIELDS]
if unknown_fields:
raise InvalidConnectorDefinitionException(f"Found unknown top-level fields: {unknown_fields}")

@property
def connection_checker(self) -> ConnectionChecker:
check = self._source_config["check"]
if "class_name" not in check:
check["class_name"] = "airbyte_cdk.sources.declarative.checks.check_stream.CheckStream"
return self._factory.create_component(check, dict())(source=self)

def streams(self, config: Mapping[str, Any]) -> List[Stream]:
self._emit_manifest_debug_message(extra_args={"source_name": self.name, "parsed_config": json.dumps(self._source_config)})

source_streams = [self._factory.create_component(stream_config, config, True)() for stream_config in self._stream_configs()]
for stream in source_streams:
# make sure the log level is always applied to the stream's logger
self._apply_log_level_to_stream_logger(self.logger, stream)
return source_streams

def spec(self, logger: logging.Logger) -> ConnectorSpecification:
"""
Returns the connector specification (spec) as defined in the Airbyte Protocol. The spec is an object describing the possible
configurations (e.g: username and password) which can be configured when running this connector. For low-code connectors, this
will first attempt to load the spec from the manifest's spec block, otherwise it will load it from "spec.yaml" or "spec.json"
in the project root.
"""
self._emit_manifest_debug_message(extra_args={"source_name": self.name, "parsed_config": json.dumps(self._source_config)})

spec = self._source_config.get("spec")
if spec:
if "class_name" not in spec:
spec["class_name"] = "airbyte_cdk.sources.declarative.spec.Spec"
spec_component = self._factory.create_component(spec, dict())()
return spec_component.generate_spec()
else:
return super().spec(logger)

def _validate_source(self):
full_config = {}
if "version" in self._source_config:
full_config["version"] = self._source_config["version"]
if "check" in self._source_config:
full_config["checker"] = self._source_config["check"]
streams = [self._factory.create_component(stream_config, {}, False)() for stream_config in self._stream_configs()]
if len(streams) > 0:
full_config["streams"] = streams
declarative_source_schema = ConcreteDeclarativeSource.json_schema()
validate(full_config, declarative_source_schema)

def _stream_configs(self):
stream_configs = self._source_config.get("streams", [])
for s in stream_configs:
if "class_name" not in s:
s["class_name"] = "airbyte_cdk.sources.declarative.declarative_stream.DeclarativeStream"
return stream_configs

@staticmethod
def generate_schema() -> str:
expanded_source_definition = ManifestDeclarativeSource.expand_schema_interfaces(ConcreteDeclarativeSource, {})
expanded_schema = expanded_source_definition.json_schema()
return json.dumps(expanded_schema, cls=SchemaEncoder)

@staticmethod
def expand_schema_interfaces(expand_class: type, visited: dict) -> type:
"""
Recursive function that takes in class type that will have its interface fields unpacked and expended and then recursively
attempt the same expansion on all the class' underlying fields that are declarative component. It also performs expansion
with respect to interfaces that are contained within generic data types.
:param expand_class: The declarative component class that will have its interface fields expanded
:param visited: cache used to store a record of already visited declarative classes that have already been seen
:return: The expanded declarative component
"""

# Recursive base case to stop recursion if we have already expanded an interface in case of cyclical components
# like CompositeErrorHandler
if expand_class.__name__ in visited:
return visited[expand_class.__name__]
visited[expand_class.__name__] = expand_class

next_classes = []
class_fields = fields(expand_class)
for field in class_fields:
unpacked_field_types = DeclarativeComponentFactory.unpack(field.type)
expand_class.__annotations__[field.name] = unpacked_field_types
next_classes.extend(ManifestDeclarativeSource._get_next_expand_classes(field.type))
for next_class in next_classes:
ManifestDeclarativeSource.expand_schema_interfaces(next_class, visited)
return expand_class

@staticmethod
def _get_next_expand_classes(field_type) -> list[type]:
"""
Parses through a given field type and assembles a list of all underlying declarative components. For a concrete declarative class
it will return itself. For a declarative interface it will return its subclasses. For declarative components in a generic type
it will return the unpacked classes. Any non-declarative types will be skipped.
:param field_type: A field type that
:return:
"""
generic_type = typing.get_origin(field_type)
if generic_type is None:
# We can only continue parsing declarative that inherit from the JsonSchemaMixin class because it is used
# to generate the final json schema
if inspect.isclass(field_type) and issubclass(field_type, JsonSchemaMixin) and not isinstance(field_type, EnumMeta):
subclasses = field_type.__subclasses__()
if subclasses:
return subclasses
else:
return [field_type]
elif generic_type == list or generic_type == Union:
next_classes = []
for underlying_type in typing.get_args(field_type):
next_classes.extend(ManifestDeclarativeSource._get_next_expand_classes(underlying_type))
return next_classes
return []

def _emit_manifest_debug_message(self, extra_args: dict):
self.logger.debug("declarative source created from manifest", extra=extra_args)


class SchemaEncoder(json.JSONEncoder):
def default(self, obj):
if isinstance(obj, property) or isinstance(obj, Enum):
return str(obj)
return json.JSONEncoder.default(self, obj)
Original file line number Diff line number Diff line change
Expand Up @@ -2,184 +2,31 @@
# Copyright (c) 2022 Airbyte, Inc., all rights reserved.
#

import inspect
import json
import logging
import pkgutil
import typing
from dataclasses import dataclass, fields
from enum import Enum, EnumMeta
from typing import Any, List, Mapping, Union

from airbyte_cdk.models import ConnectorSpecification
from airbyte_cdk.sources.declarative.checks import CheckStream
from airbyte_cdk.sources.declarative.checks.connection_checker import ConnectionChecker
from airbyte_cdk.sources.declarative.declarative_source import DeclarativeSource
from airbyte_cdk.sources.declarative.declarative_stream import DeclarativeStream
from airbyte_cdk.sources.declarative.exceptions import InvalidConnectorDefinitionException
from airbyte_cdk.sources.declarative.parsers.factory import DeclarativeComponentFactory
from airbyte_cdk.sources.declarative.manifest_declarative_source import ManifestDeclarativeSource
from airbyte_cdk.sources.declarative.parsers.yaml_parser import YamlParser
from airbyte_cdk.sources.streams.core import Stream
from dataclasses_jsonschema import JsonSchemaMixin
from jsonschema.validators import validate
from airbyte_cdk.sources.declarative.types import ConnectionDefinition


@dataclass
class ConcreteDeclarativeSource(JsonSchemaMixin):
version: str
checker: CheckStream
streams: List[DeclarativeStream]


class YamlDeclarativeSource(DeclarativeSource):
class YamlDeclarativeSource(ManifestDeclarativeSource):
"""Declarative source defined by a yaml file"""

VALID_TOP_LEVEL_FIELDS = {"check", "definitions", "spec", "streams", "version"}

def __init__(self, path_to_yaml):
"""
:param path_to_yaml: Path to the yaml file describing the source
"""
self.logger = logging.getLogger(f"airbyte.{self.name}")
self._factory = DeclarativeComponentFactory()
self._path_to_yaml = path_to_yaml
self._source_config = self._read_and_parse_yaml_file(path_to_yaml)

self._validate_source()

# Stopgap to protect the top-level namespace until it's validated through the schema
unknown_fields = [key for key in self._source_config.keys() if key not in self.VALID_TOP_LEVEL_FIELDS]
if unknown_fields:
raise InvalidConnectorDefinitionException(f"Found unknown top-level fields: {unknown_fields}")

@property
def connection_checker(self) -> ConnectionChecker:
check = self._source_config["check"]
if "class_name" not in check:
check["class_name"] = "airbyte_cdk.sources.declarative.checks.check_stream.CheckStream"
return self._factory.create_component(check, dict())(source=self)

def streams(self, config: Mapping[str, Any]) -> List[Stream]:
self.logger.debug(
"parsed YAML into declarative source",
extra={"path_to_yaml_file": self._path_to_yaml, "source_name": self.name, "parsed_config": json.dumps(self._source_config)},
)
source_streams = [self._factory.create_component(stream_config, config, True)() for stream_config in self._stream_configs()]
for stream in source_streams:
# make sure the log level is always appied to the stream's logger
self._apply_log_level_to_stream_logger(self.logger, stream)
return source_streams

def spec(self, logger: logging.Logger) -> ConnectorSpecification:
"""
Returns the connector specification (spec) as defined in the Airbyte Protocol. The spec is an object describing the possible
configurations (e.g: username and password) which can be configured when running this connector. For low-code connectors, this
will first attempt to load the spec from the manifest's spec block, otherwise it will load it from "spec.yaml" or "spec.json"
in the project root.
"""

self.logger.debug(
"parsed YAML into declarative source",
extra={"path_to_yaml_file": self._path_to_yaml, "source_name": self.name, "parsed_config": json.dumps(self._source_config)},
)

spec = self._source_config.get("spec")
if spec:
if "class_name" not in spec:
spec["class_name"] = "airbyte_cdk.sources.declarative.spec.Spec"
spec_component = self._factory.create_component(spec, dict())()
return spec_component.generate_spec()
else:
return super().spec(logger)
source_config = self._read_and_parse_yaml_file(path_to_yaml)
super().__init__(source_config)

def _read_and_parse_yaml_file(self, path_to_yaml_file):
def _read_and_parse_yaml_file(self, path_to_yaml_file) -> ConnectionDefinition:
package = self.__class__.__module__.split(".")[0]

yaml_config = pkgutil.get_data(package, path_to_yaml_file)
decoded_yaml = yaml_config.decode()
return YamlParser().parse(decoded_yaml)

def _validate_source(self):
full_config = {}
if "version" in self._source_config:
full_config["version"] = self._source_config["version"]
if "check" in self._source_config:
full_config["checker"] = self._source_config["check"]
streams = [self._factory.create_component(stream_config, {}, False)() for stream_config in self._stream_configs()]
if len(streams) > 0:
full_config["streams"] = streams
declarative_source_schema = ConcreteDeclarativeSource.json_schema()
validate(full_config, declarative_source_schema)

def _stream_configs(self):
stream_configs = self._source_config.get("streams", [])
for s in stream_configs:
if "class_name" not in s:
s["class_name"] = "airbyte_cdk.sources.declarative.declarative_stream.DeclarativeStream"
return stream_configs

@staticmethod
def generate_schema() -> str:
expanded_source_definition = YamlDeclarativeSource.expand_schema_interfaces(ConcreteDeclarativeSource, {})
expanded_schema = expanded_source_definition.json_schema()
return json.dumps(expanded_schema, cls=SchemaEncoder)

@staticmethod
def expand_schema_interfaces(expand_class: type, visited: dict) -> type:
"""
Recursive function that takes in class type that will have its interface fields unpacked and expended and then recursively
attempt the same expansion on all the class' underlying fields that are declarative component. It also performs expansion
with respect to interfaces that are contained within generic data types.
:param expand_class: The declarative component class that will have its interface fields expanded
:param visited: cache used to store a record of already visited declarative classes that have already been seen
:return: The expanded declarative component
"""

# Recursive base case to stop recursion if we have already expanded an interface in case of cyclical components
# like CompositeErrorHandler
if expand_class.__name__ in visited:
return visited[expand_class.__name__]
visited[expand_class.__name__] = expand_class

next_classes = []
class_fields = fields(expand_class)
for field in class_fields:
unpacked_field_types = DeclarativeComponentFactory.unpack(field.type)
expand_class.__annotations__[field.name] = unpacked_field_types
next_classes.extend(YamlDeclarativeSource._get_next_expand_classes(field.type))
for next_class in next_classes:
YamlDeclarativeSource.expand_schema_interfaces(next_class, visited)
return expand_class

@staticmethod
def _get_next_expand_classes(field_type) -> list[type]:
"""
Parses through a given field type and assembles a list of all underlying declarative components. For a concrete declarative class
it will return itself. For a declarative interface it will return its subclasses. For declarative components in a generic type
it will return the unpacked classes. Any non-declarative types will be skipped.
:param field_type: A field type that
:return:
"""
generic_type = typing.get_origin(field_type)
if generic_type is None:
# We can only continue parsing declarative that inherit from the JsonSchemaMixin class because it is used
# to generate the final json schema
if inspect.isclass(field_type) and issubclass(field_type, JsonSchemaMixin) and not isinstance(field_type, EnumMeta):
subclasses = field_type.__subclasses__()
if subclasses:
return subclasses
else:
return [field_type]
elif generic_type == list or generic_type == Union:
next_classes = []
for underlying_type in typing.get_args(field_type):
next_classes.extend(YamlDeclarativeSource._get_next_expand_classes(underlying_type))
return next_classes
return []


class SchemaEncoder(json.JSONEncoder):
def default(self, obj):
if isinstance(obj, property) or isinstance(obj, Enum):
return str(obj)
return json.JSONEncoder.default(self, obj)
def _emit_manifest_debug_message(self, extra_args: dict):
extra_args["path_to_yaml"] = self._path_to_yaml
self.logger.debug("declarative source created from parsed YAML manifest", extra=extra_args)
2 changes: 1 addition & 1 deletion airbyte-cdk/python/setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@

setup(
name="airbyte-cdk",
version="0.7.0",
version="0.7.1",
description="A framework for writing Airbyte Connectors.",
long_description=README,
long_description_content_type="text/markdown",
Expand Down
Loading
You are viewing a condensed version of this merge commit. You can view the full changes here.