Skip to content

Commit c0ae1c0

Browse files
authored
fix: move schema loader decorator to model_to_component_factory (#815)
1 parent 59e4163 commit c0ae1c0

File tree

5 files changed

+45
-25
lines changed

5 files changed

+45
-25
lines changed

airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py

Lines changed: 6 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -551,9 +551,13 @@
551551
DynamicSchemaLoader,
552552
InlineSchemaLoader,
553553
JsonFileSchemaLoader,
554+
SchemaLoader,
554555
SchemaTypeIdentifier,
555556
TypesMap,
556557
)
558+
from airbyte_cdk.sources.declarative.schema.caching_schema_loader_decorator import (
559+
CachingSchemaLoaderDecorator,
560+
)
557561
from airbyte_cdk.sources.declarative.schema.composite_schema_loader import CompositeSchemaLoader
558562
from airbyte_cdk.sources.declarative.spec import ConfigMigration, Spec
559563
from airbyte_cdk.sources.declarative.stream_slicers import (
@@ -2095,13 +2099,7 @@ def create_default_stream(
20952099
if isinstance(retriever, AsyncRetriever):
20962100
stream_slicer = retriever.stream_slicer
20972101

2098-
schema_loader: Union[
2099-
CompositeSchemaLoader,
2100-
DefaultSchemaLoader,
2101-
DynamicSchemaLoader,
2102-
InlineSchemaLoader,
2103-
JsonFileSchemaLoader,
2104-
]
2102+
schema_loader: SchemaLoader
21052103
if model.schema_loader and isinstance(model.schema_loader, list):
21062104
nested_schema_loaders = [
21072105
self._create_component_from_model(model=nested_schema_loader, config=config)
@@ -2120,6 +2118,7 @@ def create_default_stream(
21202118
if "name" not in options:
21212119
options["name"] = model.name
21222120
schema_loader = DefaultSchemaLoader(config=config, parameters=options)
2121+
schema_loader = CachingSchemaLoaderDecorator(schema_loader)
21232122

21242123
stream_name = model.name or ""
21252124
return DefaultStream(
Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,15 @@
1+
from typing import Any, Mapping, Optional
2+
3+
from airbyte_cdk.sources.declarative.schema import SchemaLoader
4+
5+
6+
class CachingSchemaLoaderDecorator(SchemaLoader):
7+
def __init__(self, schema_loader: SchemaLoader):
8+
self._decorated = schema_loader
9+
self._loaded_schema: Optional[Mapping[str, Any]] = None
10+
11+
def get_json_schema(self) -> Mapping[str, Any]:
12+
if self._loaded_schema is None:
13+
self._loaded_schema = self._decorated.get_json_schema()
14+
15+
return self._loaded_schema # type: ignore # at that point, we assume the schema will be populated

airbyte_cdk/sources/declarative/stream_slicers/declarative_partition_generator.py

Lines changed: 1 addition & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -31,18 +31,6 @@ def get_total_records(self) -> int:
3131
return self.total_record_counter
3232

3333

34-
class SchemaLoaderCachingDecorator(SchemaLoader):
35-
def __init__(self, schema_loader: SchemaLoader):
36-
self._decorated = schema_loader
37-
self._loaded_schema: Optional[Mapping[str, Any]] = None
38-
39-
def get_json_schema(self) -> Mapping[str, Any]:
40-
if self._loaded_schema is None:
41-
self._loaded_schema = self._decorated.get_json_schema()
42-
43-
return self._loaded_schema # type: ignore # at that point, we assume the schema will be populated
44-
45-
4634
class DeclarativePartitionFactory:
4735
def __init__(
4836
self,
@@ -58,7 +46,7 @@ def __init__(
5846
In order to avoid these problems, we will create one retriever per thread which should make the processing thread-safe.
5947
"""
6048
self._stream_name = stream_name
61-
self._schema_loader = SchemaLoaderCachingDecorator(schema_loader)
49+
self._schema_loader = schema_loader
6250
self._retriever = retriever
6351
self._message_repository = message_repository
6452
self._max_records_limit = max_records_limit

unit_tests/sources/declarative/parsers/test_model_to_component_factory.py

Lines changed: 4 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -157,13 +157,12 @@
157157
from airbyte_cdk.sources.declarative.requesters.requester import HttpMethod
158158
from airbyte_cdk.sources.declarative.retrievers import AsyncRetriever, SimpleRetriever
159159
from airbyte_cdk.sources.declarative.schema import InlineSchemaLoader, JsonFileSchemaLoader
160+
from airbyte_cdk.sources.declarative.schema.caching_schema_loader_decorator import (
161+
CachingSchemaLoaderDecorator,
162+
)
160163
from airbyte_cdk.sources.declarative.schema.composite_schema_loader import CompositeSchemaLoader
161164
from airbyte_cdk.sources.declarative.schema.schema_loader import SchemaLoader
162165
from airbyte_cdk.sources.declarative.spec import Spec
163-
from airbyte_cdk.sources.declarative.stream_slicers import StreamSlicerTestReadDecorator
164-
from airbyte_cdk.sources.declarative.stream_slicers.declarative_partition_generator import (
165-
SchemaLoaderCachingDecorator,
166-
)
167166
from airbyte_cdk.sources.declarative.transformations import AddFields, RemoveFields
168167
from airbyte_cdk.sources.declarative.transformations.add_fields import AddedFieldDefinition
169168
from airbyte_cdk.sources.declarative.transformations.keys_replace_transformation import (
@@ -5107,7 +5106,7 @@ def test_create_stream_with_multiple_schema_loaders():
51075106
def get_schema_loader(stream: DefaultStream):
51085107
assert isinstance(
51095108
stream._stream_partition_generator._partition_factory._schema_loader,
5110-
SchemaLoaderCachingDecorator,
5109+
CachingSchemaLoaderDecorator,
51115110
)
51125111
return stream._stream_partition_generator._partition_factory._schema_loader._decorated
51135112

Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,19 @@
1+
from unittest import TestCase
2+
from unittest.mock import Mock
3+
4+
from airbyte_cdk.sources.declarative.schema import SchemaLoader
5+
from airbyte_cdk.sources.declarative.schema.caching_schema_loader_decorator import (
6+
CachingSchemaLoaderDecorator,
7+
)
8+
9+
10+
class CachingSchemaLoaderDecoratorTest(TestCase):
11+
def test_given_previous_calls_when_get_json_schema_then_return_cached_schema(self):
12+
decorated = Mock(spec=SchemaLoader)
13+
schema_loader = CachingSchemaLoaderDecorator(decorated)
14+
15+
schema_loader.get_json_schema()
16+
schema_loader.get_json_schema()
17+
schema_loader.get_json_schema()
18+
19+
assert decorated.get_json_schema.call_count == 1

0 commit comments

Comments
 (0)