Skip to content

fix(cdk): determine state from manager if not received a state in per partition router #544

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 6 commits into from
May 15, 2025

Conversation

aldogonzalez8
Copy link
Contributor

@aldogonzalez8 aldogonzalez8 commented May 14, 2025

We found that indeed it seems we have an issue where state is not passed downstream for this flow here when:

  1. Stream uses async retriever
  2. Asyc Retriever has a SubstreamPartitionRouter
  3. Is incremental stream

I was able to reproduce for:

Downstream, the state is never passed to ConcurrentPerPartitionCursor._set_initial_state(stream_state) then we don't have a self._cursor_per_partition here. There is an alternative flow when no cursor is present, which is also followed in the looms; neither seems to retrieve a state that can be used in the ConcurrentCursor.

Summary by CodeRabbit

  • New Features
    • Added support for asynchronous data retrieval with partitioned request handling in declarative streams.
    • Introduced incremental sync capabilities with datetime-based cursors and dynamic pagination.
    • Enabled session token authentication with automatic token refresh and API key injection.
  • Bug Fixes
    • Improved handling of stream state to ensure a valid state is always used when creating concurrent cursors, reducing the risk of errors when no stream state is provided.
    • Enhanced stream state migration application to maintain accurate and up-to-date stream states during incremental syncs.
  • Tests
    • Added comprehensive tests verifying asynchronous retriever behavior with partitioned cursors and incremental sync functionality.

Resolves https://github.com/airbytehq/airbyte-internal-issues/issues/13018

Copy link
Contributor

coderabbitai bot commented May 14, 2025

📝 Walkthrough

Walkthrough

The _build_incremental_cursor method in ModelToComponentFactory was updated to retrieve the stream state from the connector state manager when the retriever is an AsyncRetriever and an incremental sync with a stream slicer exists. This retrieved state is then passed to create_concurrent_cursor_from_perpartition_cursor instead of an empty dictionary. Additionally, create_concurrent_cursor_from_perpartition_cursor now applies stream state migrations to the provided state before cursor creation. No changes were made to method signatures or external interfaces.

Changes

File(s) Change Summary
airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py Modified _build_incremental_cursor to fetch and pass actual stream state from _connector_state_manager for async retrievers with incremental stream slicers; updated create_concurrent_cursor_from_perpartition_cursor to apply state migrations before cursor creation. No signature changes.
unit_tests/sources/declarative/parsers/resources/stream_with_incremental_and_aync_retriever_with_partition_router.yaml Added new YAML declarative stream configuration defining an async retriever with partition router, incremental sync, session token authentication, paginated HTTP requesters, and nested component references.
unit_tests/sources/declarative/parsers/test_model_to_component_factory.py Added new test test_stream_with_incremental_and_async_retriever_with_partition_router verifying async retriever with concurrent per-partition cursor behavior using the new YAML manifest; included helper functions for YAML loading and factory instantiation; updated imports accordingly.

Sequence Diagram(s)

sequenceDiagram
    participant Caller
    participant ModelToComponentFactory
    participant ConnectorStateManager

    Caller->>ModelToComponentFactory: _build_incremental_cursor(model)
    alt model.retriever == "AsyncRetriever" and incremental sync with stream slicer
        ModelToComponentFactory->>ConnectorStateManager: get_stream_state(stream_name, None)
        ConnectorStateManager-->>ModelToComponentFactory: stream_state
        ModelToComponentFactory->>ModelToComponentFactory: apply_state_migrations(stream_state)
        ModelToComponentFactory->>ModelToComponentFactory: create_concurrent_cursor_from_perpartition_cursor(stream_state)
    else
        ModelToComponentFactory->>ModelToComponentFactory: create_concurrent_cursor_from_perpartition_cursor({})
    end
    ModelToComponentFactory-->>Caller: IncrementalCursor
Loading

Possibly related PRs

Suggested labels

bug

Suggested reviewers

  • maxi297
  • tolik0
  • darynaishchenko

Would you like to consider adding a test to confirm the fallback logic for stream_state retrieval in async incremental syncs, wdyt?

Note

⚡️ AI Code Reviews for VS Code, Cursor, Windsurf

CodeRabbit now has a plugin for VS Code, Cursor and Windsurf. This brings AI code reviews directly in the code editor. Each commit is reviewed immediately, finding bugs before the PR is raised. Seamless context handoff to your AI code agent ensures that you can easily incorporate review feedback.
Learn more here.


Note

⚡️ Faster reviews with caching

CodeRabbit now supports caching for code and dependencies, helping speed up reviews. This means quicker feedback, reduced wait times, and a smoother review experience overall. Cached data is encrypted and stored securely. This feature will be automatically enabled for all accounts on May 16th. To opt out, configure Review - Disable Cache at either the organization or repository level. If you prefer to disable all data retention across your organization, simply turn off the Data Retention setting under your Organization Settings.
Enjoy the performance boost—your workflow just got faster.

✨ Finishing Touches
  • 📝 Generate Docstrings

Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out.

❤️ Share
🪧 Tips

Chat

There are 3 ways to chat with CodeRabbit:

‼️ IMPORTANT
Auto-reply has been disabled for this repository in the CodeRabbit settings. The CodeRabbit bot will not respond to your replies unless it is explicitly tagged.

  • Files and specific lines of code (under the "Files changed" tab): Tag @coderabbitai in a new review comment at the desired location with your query. Examples:
    • @coderabbitai explain this code block.
    • @coderabbitai modularize this function.
  • PR comments: Tag @coderabbitai in a new PR comment to ask questions about the PR branch. For the best results, please provide a very specific query, as very limited context is provided in this mode. Examples:
    • @coderabbitai gather interesting stats about this repository and render them as a table. Additionally, render a pie chart showing the language distribution in the codebase.
    • @coderabbitai read src/utils.ts and explain its main purpose.
    • @coderabbitai read the files in the src/scheduler package and generate a class diagram using mermaid and a README in the markdown format.
    • @coderabbitai help me debug CodeRabbit configuration file.

Support

Need help? Create a ticket on our support page for assistance with any issues or questions.

Note: Be mindful of the bot's finite context window. It's strongly recommended to break down tasks such as reading entire modules into smaller chunks. For a focused discussion, use review comments to chat about specific files and their changes, instead of using the PR comments.

CodeRabbit Commands (Invoked using PR comments)

  • @coderabbitai pause to pause the reviews on a PR.
  • @coderabbitai resume to resume the paused reviews.
  • @coderabbitai review to trigger an incremental review. This is useful when automatic reviews are disabled for the repository.
  • @coderabbitai full review to do a full review from scratch and review all the files again.
  • @coderabbitai summary to regenerate the summary of the PR.
  • @coderabbitai generate docstrings to generate docstrings for this PR.
  • @coderabbitai generate sequence diagram to generate a sequence diagram of the changes in this PR.
  • @coderabbitai resolve resolve all the CodeRabbit review comments.
  • @coderabbitai configuration to show the current CodeRabbit configuration for the repository.
  • @coderabbitai help to get help.

Other keywords and placeholders

  • Add @coderabbitai ignore anywhere in the PR description to prevent this PR from being reviewed.
  • Add @coderabbitai summary to generate the high-level summary at a specific location in the PR description.
  • Add @coderabbitai anywhere in the PR title to generate the title automatically.

CodeRabbit Configuration File (.coderabbit.yaml)

  • You can programmatically configure CodeRabbit by adding a .coderabbit.yaml file to the root of your repository.
  • Please see the configuration documentation for more information.
  • If your editor has YAML language server enabled, you can add the path at the top of this file to enable auto-completion and validation: # yaml-language-server: $schema=https://coderabbit.ai/integrations/schema.v2.json

Documentation and Community

  • Visit our Documentation for detailed information on how to use CodeRabbit.
  • Join our Discord Community to get help, request features, and share feedback.
  • Follow us on X/Twitter for updates and announcements.

Copy link
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 0

🧹 Nitpick comments (1)
airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py (1)

1-3827: Might need formatting to pass pipeline checks

I noticed the pipeline failures indicating that Ruff formatting checks are failing. Consider running ruff format on this file to fix the style issues and make the build pass.

What do you think about running the formatter before merging?

🧰 Tools
🪛 GitHub Actions: Linters

[error] 1-1: Ruff formatting check failed. File would be reformatted. Run 'ruff format' to fix code style issues.

📜 Review details

Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between dc10839 and 0164a45.

📒 Files selected for processing (1)
  • airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py (1 hunks)
🧰 Additional context used
🧬 Code Graph Analysis (1)
airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py (2)
airbyte_cdk/sources/declarative/partition_routers/grouping_partition_router.py (1)
  • get_stream_state (148-150)
airbyte_cdk/sources/connector_state_manager.py (1)
  • get_stream_state (53-67)
🪛 GitHub Actions: Linters
airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py

[error] 1-1: Ruff formatting check failed. File would be reformatted. Run 'ruff format' to fix code style issues.

⏰ Context from checks skipped due to timeout of 90000ms (9)
  • GitHub Check: Check: 'source-pokeapi' (skip=false)
  • GitHub Check: Check: 'source-amplitude' (skip=false)
  • GitHub Check: Check: 'source-hardcoded-records' (skip=false)
  • GitHub Check: Check: 'source-shopify' (skip=false)
  • GitHub Check: Pytest (Fast)
  • GitHub Check: SDM Docker Image Build
  • GitHub Check: Pytest (All, Python 3.11, Ubuntu)
  • GitHub Check: Pytest (All, Python 3.10, Ubuntu)
  • GitHub Check: Analyze (python)
🔇 Additional comments (1)
airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py (1)

1493-1494: Good addition to ensure state propagation! LGTM 👍

Nice fix for ensuring state is available in the per partition router. This addresses the issue where state wasn't being passed downstream in async streaming scenarios with SubstreamPartitionRouter.

@aldogonzalez8 aldogonzalez8 changed the title bug(cdk): determine state from manager if not received a state in per partition router fix(cdk): determine state from manager if not received a state in per partition router May 14, 2025
@aldogonzalez8 aldogonzalez8 self-assigned this May 14, 2025
@aldogonzalez8 aldogonzalez8 requested review from tolik0 and brianjlai May 14, 2025 18:39
Copy link
Contributor

@brianjlai brianjlai left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

thanks for digging deep into this! just one thing I wanted to mention, but whichever approach we go with i'm fine.

How easy/hard would this be to unit test?

@github-actions github-actions bot added the bug Something isn't working label May 15, 2025
coderabbitai[bot]
coderabbitai bot previously requested changes May 15, 2025
Copy link
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 2

🧹 Nitpick comments (2)
unit_tests/sources/declarative/parsers/test_model_to_component_factory.py (1)

947-1036: Great comprehensive test for the core PR functionality.

This test thoroughly validates the state propagation fix for asynchronous retrievers with partition routers. The time freezing, detailed state setup, and assertions on various components of the cursor state provide excellent coverage.

A couple of suggestions to consider:

  • Would it be valuable to add a docstring explaining what specific bug this test is verifying?
  • The json import could use an absolute import style for consistency with the rest of the file, wdyt?

Otherwise, this is a well-crafted test that directly addresses the issue in the PR objectives.

unit_tests/sources/declarative/parsers/resources/stream_with_incremental_and_aync_retriever_with_partition_router.yaml (1)

140-140: Add a newline at EOF
YAMLLint flagged “no new line character at the end of file.” Can we add a blank line to satisfy the linter? wdyt?

🧰 Tools
🪛 YAMLlint (1.35.1)

[error] 140-140: no new line character at the end of file

(new-line-at-end-of-file)

📜 Review details

Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro
Cache: Disabled due to data retention organization setting
Knowledge Base: Disabled due to data retention organization setting

📥 Commits

Reviewing files that changed from the base of the PR and between e0e2eed and a28ca81.

📒 Files selected for processing (2)
  • unit_tests/sources/declarative/parsers/resources/stream_with_incremental_and_aync_retriever_with_partition_router.yaml (1 hunks)
  • unit_tests/sources/declarative/parsers/test_model_to_component_factory.py (5 hunks)
🧰 Additional context used
🪛 YAMLlint (1.35.1)
unit_tests/sources/declarative/parsers/resources/stream_with_incremental_and_aync_retriever_with_partition_router.yaml

[error] 140-140: no new line character at the end of file

(new-line-at-end-of-file)

⏰ Context from checks skipped due to timeout of 90000ms (5)
  • GitHub Check: Check: 'source-shopify' (skip=false)
  • GitHub Check: Check: 'source-amplitude' (skip=false)
  • GitHub Check: Pytest (All, Python 3.10, Ubuntu)
  • GitHub Check: Pytest (All, Python 3.11, Ubuntu)
  • GitHub Check: Pytest (Fast)
🔇 Additional comments (7)
unit_tests/sources/declarative/parsers/test_model_to_component_factory.py (6)

4-4: Good addition of deepcopy for immutable state handling.

This import provides a clean way to create a complete copy of the input config, which is essential for maintaining state immutability when working with connector states.


8-9: Solid import choices for file handling and type annotations.

The Path import provides a more robust way to handle file paths compared to string concatenation, and the comprehensive typing imports improve code clarity.


48-48: Good addition of ConcurrentPerPartitionCursor to imports.

This is directly related to the PR's focus on fixing state propagation in the concurrent cursor components.


173-176: Nice inclusion of concurrent cursor imports.

These imports support the core functionality that the PR is addressing with state conversion in the concurrent streaming context.


197-203: Well-structured helper function for factory creation.

This is a clean abstraction that allows tests to create factories with optional state management, improving test readability.


205-210: Good file reading utility function.

Using Path from pathlib provides a cross-platform way to handle file paths, which is more robust than string concatenation. The context manager ensures proper file handling.

unit_tests/sources/declarative/parsers/resources/stream_with_incremental_and_aync_retriever_with_partition_router.yaml (1)

30-35: Potential mis-shape of request_authentication block?
I see we’re nesting inject_into twice under request_authentication—one to specify RequestOption and another for header. Could we double-check the CDK spec? Typically an ApiKey injection is a flat mapping (inject_into: header + field_name: ...). Would you mind verifying this against the expected schema? wdyt?

@aldogonzalez8 aldogonzalez8 merged commit db07eff into main May 15, 2025
26 of 28 checks passed
@aldogonzalez8 aldogonzalez8 deleted the ac8/fix-missing-state-for-perpartition-cursor branch May 15, 2025 17:48
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
bug Something isn't working
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants