Skip to content

fix: (CDK) (Manifest) - Add Manifest Normalization module (reduce commonalities + handle schema $refs) #447

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 37 commits into from
Apr 25, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
37 commits
Select commit Hold shift + click to select a range
a488ab3
deduplication version 1
bazarnov Mar 26, 2025
7d910ee
deduplication version 2
bazarnov Mar 26, 2025
691d16a
updated duplicates collection
bazarnov Mar 27, 2025
081e7a8
deduplicate most frequent tags, use existing refs if definitions.shar…
bazarnov Mar 31, 2025
180af86
Merge remote-tracking branch 'origin/main' into baz/cdk/extract-commo…
bazarnov Mar 31, 2025
138b607
formatted"
bazarnov Mar 31, 2025
f10e601
updated to account type for the given duplicated key
bazarnov Mar 31, 2025
66fe38e
add the reduce_commons: true, for Connector Builder case
bazarnov Mar 31, 2025
8798042
enabled the reduce_commons: True for Connector Builder case
bazarnov Mar 31, 2025
1d425ee
refactorred and cleaned up the code, moved to use the class instead
bazarnov Apr 1, 2025
06b183a
formatted
bazarnov Apr 1, 2025
1fa891c
formatted
bazarnov Apr 1, 2025
00e31a7
cleaned up
bazarnov Apr 1, 2025
a5aba82
added the dedicated tests
bazarnov Apr 1, 2025
e017e92
Merge remote-tracking branch 'origin/main' into baz/cdk/extract-commo…
bazarnov Apr 1, 2025
0e8394f
Merge remote-tracking branch 'origin/main' into baz/cdk/extract-commo…
bazarnov Apr 2, 2025
9f7d498
formatted
bazarnov Apr 2, 2025
6ec240a
updated normalizer
bazarnov Apr 8, 2025
acdecdb
Merge remote-tracking branch 'origin/main' into baz/cdk/extract-commo…
bazarnov Apr 8, 2025
5f5c6b1
attempt to fix the Connector Builder tests
bazarnov Apr 8, 2025
e97afa5
Merge remote-tracking branch 'origin/main' into baz/cdk/extract-commo…
bazarnov Apr 11, 2025
be3bab1
revert test
bazarnov Apr 11, 2025
748892d
Merge remote-tracking branch 'origin/main' into baz/cdk/extract-commo…
bazarnov Apr 15, 2025
b10d7a1
removed post_resolve_manifest flag
bazarnov Apr 15, 2025
0587481
nit
bazarnov Apr 15, 2025
d929167
add _-should_normalize flag handling
bazarnov Apr 17, 2025
3859c5b
Merge remote-tracking branch 'origin/main' into baz/cdk/extract-commo…
bazarnov Apr 17, 2025
9de27ef
formatted
bazarnov Apr 17, 2025
c403a0e
rename sharable > linkable, shared > linked
bazarnov Apr 17, 2025
297ae37
Merge remote-tracking branch 'origin/main' into baz/cdk/extract-commo…
bazarnov Apr 19, 2025
38f7da6
updated the order of operations; normalization should go after pre-pr…
bazarnov Apr 19, 2025
7d71f4b
fixed
bazarnov Apr 19, 2025
304235c
add schema extraction + unit test
bazarnov Apr 21, 2025
348aaae
Merge branch 'main' into baz/cdk/extract-common-manifest-parts
bazarnov Apr 23, 2025
2c8d164
updated test comments
bazarnov Apr 24, 2025
2010419
Merge remote-tracking branch 'origin/main' into baz/cdk/extract-commo…
bazarnov Apr 25, 2025
8d7be4e
updated linked
bazarnov Apr 25, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 10 additions & 0 deletions airbyte_cdk/connector_builder/connector_builder_handler.py
Original file line number Diff line number Diff line change
Expand Up @@ -56,12 +56,22 @@ def get_limits(config: Mapping[str, Any]) -> TestLimits:
return TestLimits(max_records, max_pages_per_slice, max_slices, max_streams)


def should_normalize_manifest(config: Mapping[str, Any]) -> bool:
"""
Check if the manifest should be normalized.
:param config: The configuration to check
:return: True if the manifest should be normalized, False otherwise.
"""
return config.get("__should_normalize", False)


def create_source(config: Mapping[str, Any], limits: TestLimits) -> ManifestDeclarativeSource:
manifest = config["__injected_declarative_manifest"]
return ManifestDeclarativeSource(
config=config,
emit_connector_builder_messages=True,
source_config=manifest,
normalize_manifest=should_normalize_manifest(config),
component_factory=ModelToComponentFactory(
emit_connector_builder_messages=True,
limit_pages_fetched_per_slice=limits.max_pages_per_slice,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1917,8 +1917,9 @@ definitions:
type: string
enum: [HttpRequester]
url_base:
linkable: true
title: API Base URL
description: Base URL of the API source. Do not put sensitive information (e.g. API tokens) into this field - Use the Authentication component for this.
description: The Base URL of the API source. Do not put sensitive information (e.g. API tokens) into this field - Use the Authentication component for this.
type: string
interpolation_context:
- config
Expand All @@ -1936,7 +1937,7 @@ definitions:
- "https://example.com/api/v1/resource/{{ next_page_token['id'] }}"
path:
title: URL Path
description: Path the specific API endpoint that this stream represents. Do not put sensitive information (e.g. API tokens) into this field - Use the Authentication component for this.
description: The Path the specific API endpoint that this stream represents. Do not put sensitive information (e.g. API tokens) into this field - Use the Authentication component for this.
type: string
interpolation_context:
- config
Expand Down Expand Up @@ -1964,6 +1965,7 @@ definitions:
- POST
authenticator:
title: Authenticator
linkable: true
description: Authentication method to use for requests sent to the API.
anyOf:
- "$ref": "#/definitions/NoAuth"
Expand Down
160 changes: 122 additions & 38 deletions airbyte_cdk/sources/declarative/manifest_declarative_source.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,9 @@
from airbyte_cdk.sources.declarative.models.declarative_component_schema import (
DeclarativeStream as DeclarativeStreamModel,
)
from airbyte_cdk.sources.declarative.models.declarative_component_schema import Spec as SpecModel
from airbyte_cdk.sources.declarative.models.declarative_component_schema import (
Spec as SpecModel,
)
from airbyte_cdk.sources.declarative.models.declarative_component_schema import (
StateDelegatingStream as StateDelegatingStreamModel,
)
Expand All @@ -39,6 +41,9 @@
from airbyte_cdk.sources.declarative.parsers.manifest_component_transformer import (
ManifestComponentTransformer,
)
from airbyte_cdk.sources.declarative.parsers.manifest_normalizer import (
ManifestNormalizer,
)
from airbyte_cdk.sources.declarative.parsers.manifest_reference_resolver import (
ManifestReferenceResolver,
)
Expand All @@ -57,6 +62,24 @@
from airbyte_cdk.utils.traced_exception import AirbyteTracedException


def _get_declarative_component_schema() -> Dict[str, Any]:
try:
raw_component_schema = pkgutil.get_data(
"airbyte_cdk", "sources/declarative/declarative_component_schema.yaml"
)
if raw_component_schema is not None:
declarative_component_schema = yaml.load(raw_component_schema, Loader=yaml.SafeLoader)
return declarative_component_schema # type: ignore
else:
raise RuntimeError(
"Failed to read manifest component json schema required for deduplication"
)
except FileNotFoundError as e:
raise FileNotFoundError(
f"Failed to read manifest component json schema required for deduplication: {e}"
)


class ManifestDeclarativeSource(DeclarativeSource):
"""Declarative source defined by a manifest of low-code components that define source connector behavior"""

Expand All @@ -68,29 +91,25 @@ def __init__(
debug: bool = False,
emit_connector_builder_messages: bool = False,
component_factory: Optional[ModelToComponentFactory] = None,
):
normalize_manifest: Optional[bool] = False,
) -> None:
"""
Args:
config: The provided config dict.
source_config: The manifest of low-code components that describe the source connector.
debug: True if debug mode is enabled.
emit_connector_builder_messages: True if messages should be emitted to the connector builder.
component_factory: optional factory if ModelToComponentFactory's default behavior needs to be tweaked.
normalize_manifest: Optional flag to indicate if the manifest should be normalized.
"""
self.logger = logging.getLogger(f"airbyte.{self.name}")
# For ease of use we don't require the type to be specified at the top level manifest, but it should be included during processing
manifest = dict(source_config)
if "type" not in manifest:
manifest["type"] = "DeclarativeSource"

self._should_normalize = normalize_manifest
self._declarative_component_schema = _get_declarative_component_schema()
# If custom components are needed, locate and/or register them.
self.components_module: ModuleType | None = get_registered_components_module(config=config)
# resolve all components in the manifest
self._source_config = self._preprocess_manifest(dict(source_config))

resolved_source_config = ManifestReferenceResolver().preprocess_manifest(manifest)
propagated_source_config = ManifestComponentTransformer().propagate_types_and_parameters(
"", resolved_source_config, {}
)
self._source_config = propagated_source_config
self._debug = debug
self._emit_connector_builder_messages = emit_connector_builder_messages
self._constructor = (
Expand All @@ -105,22 +124,91 @@ def __init__(
self._slice_logger: SliceLogger = (
AlwaysLogSliceLogger() if emit_connector_builder_messages else DebugSliceLogger()
)

self._config = config or {}

# validate resolved manifest against the declarative component schema
self._validate_source()

# apply additional post-processing to the manifest
self._postprocess_manifest()

@property
def resolved_manifest(self) -> Mapping[str, Any]:
"""
Returns the resolved manifest configuration for the source.

This property provides access to the internal source configuration as a mapping,
which contains all settings and parameters required to define the source's behavior.

Returns:
Mapping[str, Any]: The resolved source configuration manifest.
"""
return self._source_config

def _preprocess_manifest(self, manifest: Dict[str, Any]) -> Dict[str, Any]:
"""
Preprocesses the provided manifest dictionary by resolving any manifest references.

This method modifies the input manifest in place, resolving references using the
ManifestReferenceResolver to ensure all references within the manifest are properly handled.

Args:
manifest (Dict[str, Any]): The manifest dictionary to preprocess and resolve references in.

Returns:
None
"""
# For ease of use we don't require the type to be specified at the top level manifest, but it should be included during processing
manifest = self._fix_source_type(manifest)
# Resolve references in the manifest
resolved_manifest = ManifestReferenceResolver().preprocess_manifest(manifest)
# Propagate types and parameters throughout the manifest
propagated_manifest = ManifestComponentTransformer().propagate_types_and_parameters(
"", resolved_manifest, {}
)

return propagated_manifest

def _postprocess_manifest(self) -> None:
"""
Post-processes the manifest after validation.
This method is responsible for any additional modifications or transformations needed
after the manifest has been validated and before it is used in the source.
"""
# apply manifest normalization, if required
self._normalize_manifest()

def _normalize_manifest(self) -> None:
"""
This method is used to normalize the manifest. It should be called after the manifest has been validated.

Connector Builder UI rendering requires the manifest to be in a specific format.
- references have been resolved
- the commonly used definitions are extracted to the `definitions.linked.*`
"""
if self._should_normalize:
normalizer = ManifestNormalizer(self._source_config, self._declarative_component_schema)
self._source_config = normalizer.normalize()

def _fix_source_type(self, manifest: Dict[str, Any]) -> Dict[str, Any]:
"""
Fix the source type in the manifest. This is necessary because the source type is not always set in the manifest.
"""
if "type" not in manifest:
manifest["type"] = "DeclarativeSource"

return manifest

@property
def message_repository(self) -> MessageRepository:
return self._message_repository

@property
def dynamic_streams(self) -> List[Dict[str, Any]]:
return self._dynamic_stream_configs(
manifest=self._source_config, config=self._config, with_dynamic_stream_name=True
manifest=self._source_config,
config=self._config,
with_dynamic_stream_name=True,
)

@property
Expand All @@ -143,7 +231,10 @@ def connection_checker(self) -> ConnectionChecker:

def streams(self, config: Mapping[str, Any]) -> List[Stream]:
self._emit_manifest_debug_message(
extra_args={"source_name": self.name, "parsed_config": json.dumps(self._source_config)}
extra_args={
"source_name": self.name,
"parsed_config": json.dumps(self._source_config),
}
)

stream_configs = self._stream_configs(self._source_config) + self._dynamic_stream_configs(
Expand All @@ -156,9 +247,11 @@ def streams(self, config: Mapping[str, Any]) -> List[Stream]:

source_streams = [
self._constructor.create_component(
StateDelegatingStreamModel
if stream_config.get("type") == StateDelegatingStreamModel.__name__
else DeclarativeStreamModel,
(
StateDelegatingStreamModel
if stream_config.get("type") == StateDelegatingStreamModel.__name__
else DeclarativeStreamModel
),
stream_config,
config,
emit_connector_builder_messages=self._emit_connector_builder_messages,
Expand All @@ -174,7 +267,9 @@ def _initialize_cache_for_parent_streams(
) -> List[Dict[str, Any]]:
parent_streams = set()

def update_with_cache_parent_configs(parent_configs: list[dict[str, Any]]) -> None:
def update_with_cache_parent_configs(
parent_configs: list[dict[str, Any]],
) -> None:
for parent_config in parent_configs:
parent_streams.add(parent_config["stream"]["name"])
if parent_config["stream"]["type"] == "StateDelegatingStream":
Expand Down Expand Up @@ -229,7 +324,10 @@ def spec(self, logger: logging.Logger) -> ConnectorSpecification:
"""
self._configure_logger_level(logger)
self._emit_manifest_debug_message(
extra_args={"source_name": self.name, "parsed_config": json.dumps(self._source_config)}
extra_args={
"source_name": self.name,
"parsed_config": json.dumps(self._source_config),
}
)

spec = self._source_config.get("spec")
Expand Down Expand Up @@ -266,25 +364,9 @@ def _validate_source(self) -> None:
"""
Validates the connector manifest against the declarative component schema
"""
try:
raw_component_schema = pkgutil.get_data(
"airbyte_cdk", "sources/declarative/declarative_component_schema.yaml"
)
if raw_component_schema is not None:
declarative_component_schema = yaml.load(
raw_component_schema, Loader=yaml.SafeLoader
)
else:
raise RuntimeError(
"Failed to read manifest component json schema required for validation"
)
except FileNotFoundError as e:
raise FileNotFoundError(
f"Failed to read manifest component json schema required for validation: {e}"
)

try:
validate(self._source_config, declarative_component_schema)
validate(self._source_config, self._declarative_component_schema)
except ValidationError as e:
raise ValidationError(
"Validation against json schema defined in declarative_component_schema.yaml schema failed"
Expand Down Expand Up @@ -382,7 +464,9 @@ def _dynamic_stream_configs(

# Create a resolver for dynamic components based on type
components_resolver = self._constructor.create_component(
COMPONENTS_RESOLVER_TYPE_MAPPING[resolver_type], components_resolver_config, config
COMPONENTS_RESOLVER_TYPE_MAPPING[resolver_type],
components_resolver_config,
config,
)

stream_template_config = dynamic_definition["stream_template"]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2209,7 +2209,7 @@ class HttpRequester(BaseModel):
type: Literal["HttpRequester"]
url_base: str = Field(
...,
description="Base URL of the API source. Do not put sensitive information (e.g. API tokens) into this field - Use the Authentication component for this.",
description="The Base URL of the API source. Do not put sensitive information (e.g. API tokens) into this field - Use the Authentication component for this.",
examples=[
"https://connect.squareup.com/v2",
"{{ config['base_url'] or 'https://app.posthog.com'}}/api",
Expand All @@ -2220,7 +2220,7 @@ class HttpRequester(BaseModel):
)
path: Optional[str] = Field(
None,
description="Path the specific API endpoint that this stream represents. Do not put sensitive information (e.g. API tokens) into this field - Use the Authentication component for this.",
description="The Path the specific API endpoint that this stream represents. Do not put sensitive information (e.g. API tokens) into this field - Use the Authentication component for this.",
examples=[
"/products",
"/quotes/{{ stream_partition['id'] }}/quote_line_groups",
Expand Down
9 changes: 9 additions & 0 deletions airbyte_cdk/sources/declarative/parsers/custom_exceptions.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,3 +19,12 @@ class UndefinedReferenceException(Exception):

def __init__(self, path: str, reference: str) -> None:
super().__init__(f"Undefined reference {reference} from {path}")


class ManifestNormalizationException(Exception):
"""
Raised when a circular reference is detected in a manifest.
"""

def __init__(self, message: str) -> None:
super().__init__(f"Failed to deduplicate manifest: {message}")
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@

import copy
import typing
from typing import Any, Mapping, Optional
from typing import Any, Dict, Mapping, Optional

PARAMETERS_STR = "$parameters"

Expand Down Expand Up @@ -95,7 +95,7 @@ def propagate_types_and_parameters(
declarative_component: Mapping[str, Any],
parent_parameters: Mapping[str, Any],
use_parent_parameters: Optional[bool] = None,
) -> Mapping[str, Any]:
) -> Dict[str, Any]:
"""
Recursively transforms the specified declarative component and subcomponents to propagate parameters and insert the
default component type if it was not already present. The resulting transformed components are a deep copy of the input
Expand Down
Loading
Loading