-
Notifications
You must be signed in to change notification settings - Fork 24
fix(cdk): determine state from manager if not received a state in per partition router #544
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
fix(cdk): determine state from manager if not received a state in per partition router #544
Conversation
📝 WalkthroughWalkthroughThe Changes
Sequence Diagram(s)sequenceDiagram
participant Caller
participant ModelToComponentFactory
participant ConnectorStateManager
Caller->>ModelToComponentFactory: _build_incremental_cursor(model)
alt model.retriever == "AsyncRetriever" and incremental sync with stream slicer
ModelToComponentFactory->>ConnectorStateManager: get_stream_state(stream_name, None)
ConnectorStateManager-->>ModelToComponentFactory: stream_state
ModelToComponentFactory->>ModelToComponentFactory: apply_state_migrations(stream_state)
ModelToComponentFactory->>ModelToComponentFactory: create_concurrent_cursor_from_perpartition_cursor(stream_state)
else
ModelToComponentFactory->>ModelToComponentFactory: create_concurrent_cursor_from_perpartition_cursor({})
end
ModelToComponentFactory-->>Caller: IncrementalCursor
Possibly related PRs
Suggested labels
Suggested reviewers
Would you like to consider adding a test to confirm the fallback logic for Note ⚡️ AI Code Reviews for VS Code, Cursor, WindsurfCodeRabbit now has a plugin for VS Code, Cursor and Windsurf. This brings AI code reviews directly in the code editor. Each commit is reviewed immediately, finding bugs before the PR is raised. Seamless context handoff to your AI code agent ensures that you can easily incorporate review feedback. Note ⚡️ Faster reviews with cachingCodeRabbit now supports caching for code and dependencies, helping speed up reviews. This means quicker feedback, reduced wait times, and a smoother review experience overall. Cached data is encrypted and stored securely. This feature will be automatically enabled for all accounts on May 16th. To opt out, configure ✨ Finishing Touches
Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. 🪧 TipsChatThere are 3 ways to chat with CodeRabbit:
SupportNeed help? Create a ticket on our support page for assistance with any issues or questions. Note: Be mindful of the bot's finite context window. It's strongly recommended to break down tasks such as reading entire modules into smaller chunks. For a focused discussion, use review comments to chat about specific files and their changes, instead of using the PR comments. CodeRabbit Commands (Invoked using PR comments)
Other keywords and placeholders
CodeRabbit Configuration File (
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 0
🧹 Nitpick comments (1)
airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py (1)
1-3827
: Might need formatting to pass pipeline checksI noticed the pipeline failures indicating that Ruff formatting checks are failing. Consider running
ruff format
on this file to fix the style issues and make the build pass.What do you think about running the formatter before merging?
🧰 Tools
🪛 GitHub Actions: Linters
[error] 1-1: Ruff formatting check failed. File would be reformatted. Run 'ruff format' to fix code style issues.
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro
📒 Files selected for processing (1)
airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py
(1 hunks)
🧰 Additional context used
🧬 Code Graph Analysis (1)
airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py (2)
airbyte_cdk/sources/declarative/partition_routers/grouping_partition_router.py (1)
get_stream_state
(148-150)airbyte_cdk/sources/connector_state_manager.py (1)
get_stream_state
(53-67)
🪛 GitHub Actions: Linters
airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py
[error] 1-1: Ruff formatting check failed. File would be reformatted. Run 'ruff format' to fix code style issues.
⏰ Context from checks skipped due to timeout of 90000ms (9)
- GitHub Check: Check: 'source-pokeapi' (skip=false)
- GitHub Check: Check: 'source-amplitude' (skip=false)
- GitHub Check: Check: 'source-hardcoded-records' (skip=false)
- GitHub Check: Check: 'source-shopify' (skip=false)
- GitHub Check: Pytest (Fast)
- GitHub Check: SDM Docker Image Build
- GitHub Check: Pytest (All, Python 3.11, Ubuntu)
- GitHub Check: Pytest (All, Python 3.10, Ubuntu)
- GitHub Check: Analyze (python)
🔇 Additional comments (1)
airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py (1)
1493-1494
: Good addition to ensure state propagation! LGTM 👍Nice fix for ensuring state is available in the per partition router. This addresses the issue where state wasn't being passed downstream in async streaming scenarios with SubstreamPartitionRouter.
airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py
Outdated
Show resolved
Hide resolved
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
thanks for digging deep into this! just one thing I wanted to mention, but whichever approach we go with i'm fine.
How easy/hard would this be to unit test?
airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py
Outdated
Show resolved
Hide resolved
…_from_perpartition_cursor
…hat is incremental should honor state
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 2
🧹 Nitpick comments (2)
unit_tests/sources/declarative/parsers/test_model_to_component_factory.py (1)
947-1036
: Great comprehensive test for the core PR functionality.This test thoroughly validates the state propagation fix for asynchronous retrievers with partition routers. The time freezing, detailed state setup, and assertions on various components of the cursor state provide excellent coverage.
A couple of suggestions to consider:
- Would it be valuable to add a docstring explaining what specific bug this test is verifying?
- The json import could use an absolute import style for consistency with the rest of the file, wdyt?
Otherwise, this is a well-crafted test that directly addresses the issue in the PR objectives.
unit_tests/sources/declarative/parsers/resources/stream_with_incremental_and_aync_retriever_with_partition_router.yaml (1)
140-140
: Add a newline at EOF
YAMLLint flagged “no new line character at the end of file.” Can we add a blank line to satisfy the linter? wdyt?🧰 Tools
🪛 YAMLlint (1.35.1)
[error] 140-140: no new line character at the end of file
(new-line-at-end-of-file)
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro
Cache: Disabled due to data retention organization setting
Knowledge Base: Disabled due to data retention organization setting
📒 Files selected for processing (2)
unit_tests/sources/declarative/parsers/resources/stream_with_incremental_and_aync_retriever_with_partition_router.yaml
(1 hunks)unit_tests/sources/declarative/parsers/test_model_to_component_factory.py
(5 hunks)
🧰 Additional context used
🪛 YAMLlint (1.35.1)
unit_tests/sources/declarative/parsers/resources/stream_with_incremental_and_aync_retriever_with_partition_router.yaml
[error] 140-140: no new line character at the end of file
(new-line-at-end-of-file)
⏰ Context from checks skipped due to timeout of 90000ms (5)
- GitHub Check: Check: 'source-shopify' (skip=false)
- GitHub Check: Check: 'source-amplitude' (skip=false)
- GitHub Check: Pytest (All, Python 3.10, Ubuntu)
- GitHub Check: Pytest (All, Python 3.11, Ubuntu)
- GitHub Check: Pytest (Fast)
🔇 Additional comments (7)
unit_tests/sources/declarative/parsers/test_model_to_component_factory.py (6)
4-4
: Good addition of deepcopy for immutable state handling.This import provides a clean way to create a complete copy of the input config, which is essential for maintaining state immutability when working with connector states.
8-9
: Solid import choices for file handling and type annotations.The Path import provides a more robust way to handle file paths compared to string concatenation, and the comprehensive typing imports improve code clarity.
48-48
: Good addition of ConcurrentPerPartitionCursor to imports.This is directly related to the PR's focus on fixing state propagation in the concurrent cursor components.
173-176
: Nice inclusion of concurrent cursor imports.These imports support the core functionality that the PR is addressing with state conversion in the concurrent streaming context.
197-203
: Well-structured helper function for factory creation.This is a clean abstraction that allows tests to create factories with optional state management, improving test readability.
205-210
: Good file reading utility function.Using Path from pathlib provides a cross-platform way to handle file paths, which is more robust than string concatenation. The context manager ensures proper file handling.
unit_tests/sources/declarative/parsers/resources/stream_with_incremental_and_aync_retriever_with_partition_router.yaml (1)
30-35
: Potential mis-shape ofrequest_authentication
block?
I see we’re nestinginject_into
twice underrequest_authentication
—one to specifyRequestOption
and another forheader
. Could we double-check the CDK spec? Typically an ApiKey injection is a flat mapping (inject_into: header
+field_name: ...
). Would you mind verifying this against the expected schema? wdyt?
We found that indeed it seems we have an issue where state is not passed downstream for this flow here when:
I was able to reproduce for:
source-bing-ads manifest migration.
Current implementation of source-amazon-Ads, e.g., sponsored_display_campaigns_report_stream_daily.
Downstream, the state is never passed to ConcurrentPerPartitionCursor._set_initial_state(stream_state) then we don't have a self._cursor_per_partition here. There is an alternative flow when no cursor is present, which is also followed in the looms; neither seems to retrieve a state that can be used in the ConcurrentCursor.
Summary by CodeRabbit
Resolves https://github.com/airbytehq/airbyte-internal-issues/issues/13018