Skip to content

fix: Properly setup global substream cursor based on manifest #490

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 6 commits into from
Apr 17, 2025

Conversation

maxi297
Copy link
Contributor

@maxi297 maxi297 commented Apr 17, 2025

This PR allows for the support of global cursors when running in the concurrent engine

Summary by CodeRabbit

  • Bug Fixes
    • Improved global cursor handling to support explicit overrides for enhanced data synchronization.
  • Tests
    • Added a test verifying global cursor usage and accurate state management without per-partition states.

@maxi297 maxi297 requested review from tolik0 and lazebnyi April 17, 2025 13:24
@github-actions github-actions bot added bug Something isn't working security labels Apr 17, 2025
@maxi297
Copy link
Contributor Author

maxi297 commented Apr 17, 2025

/autofix

Auto-Fix Job Info

This job attempts to auto-fix any linting or formating issues. If any fixes are made,
those changes will be automatically committed and pushed back to the PR.

Note: This job can only be run by maintainers. On PRs from forks, this command requires
that the PR author has enabled the Allow edits from maintainers option.

PR auto-fix job started... Check job output.

✅ Changes applied successfully.

Copy link
Contributor

coderabbitai bot commented Apr 17, 2025

📝 Walkthrough

Walkthrough

This change updates the logic in the create_concurrent_cursor_from_perpartition_cursor method within the ModelToComponentFactory class. Now, the method sets the use_global_cursor flag to True not only when the partition_router is an instance of GroupingPartitionRouter, but also when the component_definition dictionary contains a truthy "global_substream_cursor" key. Additionally, a new test was added to verify that when this flag is enabled, the state returned is global rather than per-partition. The _group_streams method in concurrent_declarative_source.py was also updated to include GlobalSubstreamCursor in its type checks. No changes were made to public interfaces or method signatures; the update is limited to internal logic and testing.

Changes

File(s) Change Summary
airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py Modified the condition for setting use_global_cursor in create_concurrent_cursor_from_perpartition_cursor to also check for a "global_substream_cursor" key in the component definition.
airbyte_cdk/sources/declarative/concurrent_declarative_source.py Expanded _group_streams method condition to include GlobalSubstreamCursor alongside PerPartitionWithGlobalCursor for stream slicer type checks.
unit_tests/sources/declarative/incremental/test_concurrent_perpartitioncursor.py Added a new test test_given_global_state_when_read_then_state_is_not_per_partition verifying that enabling "global_substream_cursor" results in a global cursor state rather than per-partition states.

Possibly related PRs

Suggested reviewers

  • maxi297
  • tolik0

Would you like to add more tests to cover edge cases around the "global_substream_cursor" flag behavior, wdyt?

Tip

⚡💬 Agentic Chat (Pro Plan, General Availability)
  • We're introducing multi-step agentic chat in review comments and issue comments, within and outside of PR's. This feature enhances review and issue discussions with the CodeRabbit agentic chat by enabling advanced interactions, including the ability to create pull requests directly from comments and add commits to existing pull requests.

📜 Recent review details

Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between 271a66d and 7630382.

📒 Files selected for processing (2)
  • airbyte_cdk/sources/declarative/concurrent_declarative_source.py (2 hunks)
  • unit_tests/sources/declarative/incremental/test_concurrent_perpartitioncursor.py (1 hunks)
🚧 Files skipped from review as they are similar to previous changes (1)
  • unit_tests/sources/declarative/incremental/test_concurrent_perpartitioncursor.py
🧰 Additional context used
🧬 Code Graph Analysis (1)
airbyte_cdk/sources/declarative/concurrent_declarative_source.py (3)
airbyte_cdk/sources/declarative/incremental/concurrent_partition_cursor.py (1)
  • ConcurrentPerPartitionCursor (46-498)
airbyte_cdk/sources/declarative/incremental/global_substream_cursor.py (1)
  • GlobalSubstreamCursor (71-357)
airbyte_cdk/sources/declarative/incremental/per_partition_with_global.py (1)
  • PerPartitionWithGlobalCursor (20-200)
⏰ Context from checks skipped due to timeout of 90000ms (7)
  • GitHub Check: Check: 'source-pokeapi' (skip=false)
  • GitHub Check: Check: 'source-amplitude' (skip=false)
  • GitHub Check: Check: 'source-shopify' (skip=false)
  • GitHub Check: Check: 'source-hardcoded-records' (skip=false)
  • GitHub Check: Pytest (All, Python 3.11, Ubuntu)
  • GitHub Check: Pytest (Fast)
  • GitHub Check: Pytest (All, Python 3.10, Ubuntu)
🔇 Additional comments (2)
airbyte_cdk/sources/declarative/concurrent_declarative_source.py (2)

22-25: Looks good! The GlobalSubstreamCursor import is now included.

The addition of GlobalSubstreamCursor to the import statement is logical since it's needed for the type check update below. This is a necessary change to support the global substream cursor functionality.


367-369:

✅ Verification successful

Type check expanded to include GlobalSubstreamCursor, nice improvement!

The change to include GlobalSubstreamCursor in the instance check alongside PerPartitionWithGlobalCursor properly extends the functionality to support both cursor types for datetime-based cursor streams with partition routing.

This change aligns well with the updates to create_concurrent_cursor_from_perpartition_cursor method in the ModelToComponentFactory class that now considers the "global_substream_cursor" flag in component definitions.


🏁 Script executed:

#!/bin/bash
# Description: Verify that GlobalSubstreamCursor is properly implemented in the codebase

# Check for any test files that verify the functionality of GlobalSubstreamCursor
echo "Checking for test files related to GlobalSubstreamCursor..."
fd -t f "test.*global.*substream.*cursor" -E "*.pyc"

# Look for usages of GlobalSubstreamCursor in the codebase
echo "Looking for GlobalSubstreamCursor usage in the codebase..."
rg "GlobalSubstreamCursor" -A 3 -B 3 --type py

Length of output: 17380


GlobalSubstreamCursor coverage verified
I’ve confirmed that GlobalSubstreamCursor is exercised in unit_tests/sources/declarative/incremental/test_per_partition_cursor.py and unit_tests/sources/declarative/extractors/test_record_filter.py, and the new instance check in concurrent_declarative_source.py aligns with the CursorFactory updates. No further action appears necessary—wdyt?

✨ Finishing Touches
  • 📝 Generate Docstrings

🪧 Tips

Chat

There are 3 ways to chat with CodeRabbit:

‼️ IMPORTANT
Auto-reply has been disabled for this repository in the CodeRabbit settings. The CodeRabbit bot will not respond to your replies unless it is explicitly tagged.

  • Files and specific lines of code (under the "Files changed" tab): Tag @coderabbitai in a new review comment at the desired location with your query. Examples:
    • @coderabbitai generate unit testing code for this file.
    • @coderabbitai modularize this function.
  • PR comments: Tag @coderabbitai in a new PR comment to ask questions about the PR branch. For the best results, please provide a very specific query, as very limited context is provided in this mode. Examples:
    • @coderabbitai gather interesting stats about this repository and render them as a table. Additionally, render a pie chart showing the language distribution in the codebase.
    • @coderabbitai read src/utils.ts and generate unit testing code.
    • @coderabbitai read the files in the src/scheduler package and generate a class diagram using mermaid and a README in the markdown format.
    • @coderabbitai help me debug CodeRabbit configuration file.

Note: Be mindful of the bot's finite context window. It's strongly recommended to break down tasks such as reading entire modules into smaller chunks. For a focused discussion, use review comments to chat about specific files and their changes, instead of using the PR comments.

CodeRabbit Commands (Invoked using PR comments)

  • @coderabbitai pause to pause the reviews on a PR.
  • @coderabbitai resume to resume the paused reviews.
  • @coderabbitai review to trigger an incremental review. This is useful when automatic reviews are disabled for the repository.
  • @coderabbitai full review to do a full review from scratch and review all the files again.
  • @coderabbitai summary to regenerate the summary of the PR.
  • @coderabbitai generate docstrings to generate docstrings for this PR.
  • @coderabbitai resolve resolve all the CodeRabbit review comments.
  • @coderabbitai configuration to show the current CodeRabbit configuration for the repository.
  • @coderabbitai help to get help.

Other keywords and placeholders

  • Add @coderabbitai ignore anywhere in the PR description to prevent this PR from being reviewed.
  • Add @coderabbitai summary to generate the high-level summary at a specific location in the PR description.
  • Add @coderabbitai anywhere in the PR title to generate the title automatically.

CodeRabbit Configuration File (.coderabbit.yaml)

  • You can programmatically configure CodeRabbit by adding a .coderabbit.yaml file to the root of your repository.
  • Please see the configuration documentation for more information.
  • If your editor has YAML language server enabled, you can add the path at the top of this file to enable auto-completion and validation: # yaml-language-server: $schema=https://coderabbit.ai/integrations/schema.v2.json

Documentation and Community

  • Visit our Documentation for detailed information on how to use CodeRabbit.
  • Join our Discord Community to get help, request features, and share feedback.
  • Follow us on X/Twitter for updates and announcements.

Copy link
Contributor

@lazebnyi lazebnyi left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

lgtm

@maxi297
Copy link
Contributor Author

maxi297 commented Apr 17, 2025

/autofix

Auto-Fix Job Info

This job attempts to auto-fix any linting or formating issues. If any fixes are made,
those changes will be automatically committed and pushed back to the PR.

Note: This job can only be run by maintainers. On PRs from forks, this command requires
that the PR author has enabled the Allow edits from maintainers option.

PR auto-fix job started... Check job output.

✅ Changes applied successfully.

@maxi297
Copy link
Contributor Author

maxi297 commented Apr 17, 2025

/autofix

Auto-Fix Job Info

This job attempts to auto-fix any linting or formating issues. If any fixes are made,
those changes will be automatically committed and pushed back to the PR.

Note: This job can only be run by maintainers. On PRs from forks, this command requires
that the PR author has enabled the Allow edits from maintainers option.

PR auto-fix job started... Check job output.

✅ Changes applied successfully.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
bug Something isn't working security
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants