- 
                Notifications
    You must be signed in to change notification settings 
- Fork 30
fix: move schema loader decorator to model_to_component_factory #815
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
fix: move schema loader decorator to model_to_component_factory #815
Conversation
| 👋 Greetings, Airbyte Team Member!Here are some helpful tips and reminders for your convenience. Testing This CDK VersionYou can test this version of the CDK using the following: # Run the CLI from this branch:
uvx 'git+https://github.com/airbytehq/airbyte-python-cdk.git@issue_14931/call_dynamic_schema_loader_only_once#egg=airbyte-python-cdk[dev]' --help
# Update a connector to use the CDK from this branch ref:
cd airbyte-integrations/connectors/source-example
poe use-cdk-branch issue_14931/call_dynamic_schema_loader_only_onceHelpful ResourcesPR Slash CommandsAirbyte Maintainers can execute the following slash commands on your PR: 
 | 
| PyTest Results (Fast)3 817 tests   3 805 ✅  6m 35s ⏱️ Results for commit 3583558. ♻️ This comment has been updated with latest results. | 
| 📝 WalkthroughWalkthroughThis change adds a standalone CachingSchemaLoaderDecorator, applies it centrally in ModelToComponentFactory.create_default_stream (wrapping each constructed schema loader), replaces a union type hint with the base SchemaLoader type, and removes the prior caching class from DeclarativePartitionFactory. A unit test for the new decorator is included. (≈44 words) Changes
 Sequence Diagram(s)sequenceDiagram
  autonumber
  participant Factory as ModelToComponentFactory
  participant Loader as ConcreteSchemaLoader
  participant Cache as CachingSchemaLoaderDecorator
  participant Stream as StreamComponent
  rect rgb(238,245,255)
    Factory->>Loader: construct schema loader (various concrete types)
    Loader-->>Factory: schema_loader instance
    Factory->>Cache: wrap schema_loader -> CachingSchemaLoaderDecorator(schema_loader)
    Cache-->>Factory: cached_loader
    Factory->>Stream: inject cached_loader into stream
  end
  rect rgb(238,255,238)
    Stream->>Cache: get_json_schema()
    alt first call
      Cache->>Loader: get_json_schema()
      Loader-->>Cache: schema (store in cache)
      Cache-->>Stream: schema
    else subsequent calls
      Cache-->>Stream: cached schema
    end
  end
Estimated code review effort🎯 3 (Moderate) | ⏱️ ~25 minutes Possibly related PRs
 Suggested labels
 Suggested reviewers
 Wdyt? Pre-merge checks and finishing touches❌ Failed checks (1 warning)
 ✅ Passed checks (2 passed)
 ✨ Finishing touches
 🧪 Generate unit tests (beta)
 Comment  | 
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 0
🧹 Nitpick comments (1)
unit_tests/sources/declarative/schema/test_caching_schema_loader_decorator.py (1)
8-17: LGTM! Test validates core caching behavior.The test correctly verifies that the decorator caches the schema after the first call, ensuring the underlying loader's
get_json_schemais invoked only once despite multiple calls.For more comprehensive coverage, you might consider adding these test cases (totally optional, the current test is sufficient for basic validation):
- Test that the returned schema is correct (not just that caching works)
- Test behavior when the underlying loader returns different types (empty dict, None if that's possible, etc.)
- Test with actual schema loader implementations rather than just mocks
Wdyt? These are just nice-to-haves, not blockers.
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro
📒 Files selected for processing (4)
- airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py(3 hunks)
- airbyte_cdk/sources/declarative/schema/caching_schema_loader_decorator.py(1 hunks)
- airbyte_cdk/sources/declarative/stream_slicers/declarative_partition_generator.py(1 hunks)
- unit_tests/sources/declarative/schema/test_caching_schema_loader_decorator.py(1 hunks)
🧰 Additional context used
🧬 Code graph analysis (4)
unit_tests/sources/declarative/schema/test_caching_schema_loader_decorator.py (2)
airbyte_cdk/sources/declarative/schema/schema_loader.py (1)
SchemaLoader(11-17)airbyte_cdk/sources/declarative/schema/caching_schema_loader_decorator.py (2)
CachingSchemaLoaderDecorator(6-15)
get_json_schema(11-15)
airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py (2)
airbyte_cdk/sources/declarative/schema/schema_loader.py (1)
SchemaLoader(11-17)airbyte_cdk/sources/declarative/schema/caching_schema_loader_decorator.py (1)
CachingSchemaLoaderDecorator(6-15)
airbyte_cdk/sources/declarative/schema/caching_schema_loader_decorator.py (1)
airbyte_cdk/sources/declarative/schema/schema_loader.py (1)
SchemaLoader(11-17)
airbyte_cdk/sources/declarative/stream_slicers/declarative_partition_generator.py (1)
unit_tests/legacy/sources/declarative/test_declarative_stream.py (1)
_schema_loader(243-246)
🔇 Additional comments (4)
airbyte_cdk/sources/declarative/stream_slicers/declarative_partition_generator.py (1)
49-49: LGTM! Clean removal of local caching wrapper.The direct assignment of
schema_loaderis appropriate now that caching is handled upstream inModelToComponentFactory. This simplifies the class and avoids redundant caching layers.airbyte_cdk/sources/declarative/schema/caching_schema_loader_decorator.py (1)
6-15: Consider thread safety for concurrent schema loading.The decorator uses a simple
_loaded_schemacache without synchronization. While schema loading is likely idempotent (worst case: redundant loads), there's a potential race condition where multiple threads could simultaneously checkif self._loaded_schema is Noneand both proceed to load.If concurrent access is possible, consider adding a lock or using a thread-safe initialization pattern. Alternatively, if the decorator instance is always used by a single thread (as suggested by the
DeclarativePartitionFactorydocstring mentioning per-thread retrievers), this might be fine as-is.Could you clarify the threading model here? Wdyt about adding a brief comment explaining the thread safety assumptions?
Additionally, the
# type: ignoreon line 15 might be unnecessary. After the check on line 12,_loaded_schemais guaranteed to be non-None. You could potentially useassert self._loaded_schema is not Nonebefore the return to satisfy the type checker without the ignore. Wdyt?airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py (2)
555-557: LGTM! Clean import additions.The imports for
SchemaLoaderandCachingSchemaLoaderDecoratorare well-placed and support the refactoring.
2099-2118: Verify the universal caching behavior is intentional.The decorator is now applied to all schema loaders at line 2118, regardless of type (composite, dynamic, default, etc.). This is a behavioral change from the previous implementation where caching was only applied within
DeclarativePartitionFactory.A few things to consider:
- Caching scope: The decorator is created per call to
create_default_stream, so each stream gets its own cached schema loader. This seems correct.- Schema loader reuse: If the same schema loader instance is passed to multiple streams, each would get its own caching wrapper. This is probably fine but worth documenting.
- Type simplification: Changing the type hint from a union to just
SchemaLoaderat line 2099 is cleaner and works well with the decorator pattern.Could you confirm this universal caching behavior is the intended design? It looks correct to me, but it's a meaningful change in when/where caching occurs. Wdyt about adding a brief comment at line 2118 explaining why we wrap with the caching decorator here?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 0
🧹 Nitpick comments (1)
airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py (1)
2102-2122: Wrap guard, respect disable_cache, and avoid mutating model.parametersThe new centralized caching is great. Three small tweaks could make it safer and more predictable:
- Avoid mutating model.parameters when preparing DefaultSchemaLoader params by copying the dict.
- Respect self._disable_cache to allow forcing fresh schema loads in special runs.
- Prevent accidental double-wrapping if a custom loader is already decorated. Wdyt?@@ - schema_loader: SchemaLoader + schema_loader: SchemaLoader @@ - else: - options = model.parameters or {} + else: + options = dict(model.parameters or {}) # copy to avoid mutating the model if "name" not in options: options["name"] = model.name schema_loader = DefaultSchemaLoader(config=config, parameters=options) - schema_loader = CachingSchemaLoaderDecorator(schema_loader) + # Centralized caching: respect disable_cache and avoid double-wrapping + if not self._disable_cache and not isinstance(schema_loader, CachingSchemaLoaderDecorator): + schema_loader = CachingSchemaLoaderDecorator(schema_loader)
Optional follow-up: If concurrent calls to get_json_schema are possible, would you consider adding a simple lock in CachingSchemaLoaderDecorator to avoid duplicate first-loads? Happy to sketch it.
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro
📒 Files selected for processing (3)
- airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py(3 hunks)
- unit_tests/sources/declarative/parsers/test_model_to_component_factory.py(2 hunks)
- unit_tests/sources/declarative/schema/test_caching_schema_loader_decorator.py(1 hunks)
🧰 Additional context used
🧬 Code graph analysis (3)
unit_tests/sources/declarative/parsers/test_model_to_component_factory.py (1)
airbyte_cdk/sources/declarative/schema/caching_schema_loader_decorator.py (1)
CachingSchemaLoaderDecorator(6-15)
unit_tests/sources/declarative/schema/test_caching_schema_loader_decorator.py (2)
airbyte_cdk/sources/declarative/schema/schema_loader.py (1)
SchemaLoader(11-17)airbyte_cdk/sources/declarative/schema/caching_schema_loader_decorator.py (2)
CachingSchemaLoaderDecorator(6-15)
get_json_schema(11-15)
airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py (2)
airbyte_cdk/sources/declarative/schema/schema_loader.py (1)
SchemaLoader(11-17)airbyte_cdk/sources/declarative/schema/caching_schema_loader_decorator.py (1)
CachingSchemaLoaderDecorator(6-15)
⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (12)
- GitHub Check: Check: source-pokeapi
- GitHub Check: Check: source-intercom
- GitHub Check: Check: source-shopify
- GitHub Check: Check: source-hardcoded-records
- GitHub Check: Check: destination-motherduck
- GitHub Check: Pytest (Fast)
- GitHub Check: Pytest (All, Python 3.11, Ubuntu)
- GitHub Check: Pytest (All, Python 3.12, Ubuntu)
- GitHub Check: Pytest (All, Python 3.13, Ubuntu)
- GitHub Check: Pytest (All, Python 3.10, Ubuntu)
- GitHub Check: SDM Docker Image Build
- GitHub Check: Manifest Server Docker Image Build
🔇 Additional comments (5)
unit_tests/sources/declarative/schema/test_caching_schema_loader_decorator.py (1)
1-19: LGTM! Test correctly validates caching behavior.The test properly verifies that the decorator caches the schema by ensuring the underlying
get_json_schemais called only once across multiple invocations. Nice use ofMock(spec=SchemaLoader)for type safety.Optional enhancement: You could also assert that all three calls return the same schema instance to more explicitly verify caching, wdyt?
def test_given_previous_calls_when_get_json_schema_then_return_cached_schema(self): decorated = Mock(spec=SchemaLoader) + expected_schema = {"type": "object"} + decorated.get_json_schema.return_value = expected_schema schema_loader = CachingSchemaLoaderDecorator(decorated) - schema_loader.get_json_schema() - schema_loader.get_json_schema() - schema_loader.get_json_schema() + result1 = schema_loader.get_json_schema() + result2 = schema_loader.get_json_schema() + result3 = schema_loader.get_json_schema() assert decorated.get_json_schema.call_count == 1 + assert result1 is result2 is result3 is expected_schemaunit_tests/sources/declarative/parsers/test_model_to_component_factory.py (2)
160-162: LGTM! Import added correctly.The import of
CachingSchemaLoaderDecoratoris necessary for the updatedget_schema_loaderhelper function and follows the existing import conventions.
5106-5111: LGTM! Smart approach to maintain test compatibility.The updated helper function correctly validates that the schema loader is now wrapped with
CachingSchemaLoaderDecorator(as per the PR objectives), while returning the unwrapped_decoratedinstance to maintain backward compatibility with all existing test assertions. This ensures existing tests continue to work without modification.Note: Accessing the private
_decoratedattribute is acceptable in test code for verification purposes, though if this pattern becomes common, you might consider adding a public accessor method to the decorator, wdyt?airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py (2)
554-557: Type narrowing to SchemaLoader looks goodUsing the base interface here simplifies typing and avoids brittle unions. Nice cleanup.
558-560: No double caching detected—centralized approach is in placeBased on the verification, there's only one location in production code where
CachingSchemaLoaderDecoratorwraps the schema loader (line 2121 inmodel_to_component_factory.py). No other schema-loader caching decorators were found elsewhere in the codebase, andDeclarativePartitionFactorydoes not apply additional wrapping. The centralized caching design is clean and avoids redundant layers.
What
Addresses https://github.com/airbytehq/airbyte-internal-issues/issues/14931
How
Move CachingSchemaLoaderDecorator to wrap schema loader instantiated on ModelToComponentFactory
Summary by CodeRabbit
Performance Improvements
Tests