Skip to content

fix(concurrent): Ensure default concurrency level does not generate deadlock #148

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 2 commits into from
Dec 9, 2024

Conversation

maxi297
Copy link
Contributor

@maxi297 maxi297 commented Dec 6, 2024

Given a stream that has a lot of partitions, we can end up in a case where the only thread (which is a PartitionEnqueuer that is generating partition) can't generate more partitions because they are not consumed.

This comment explains how we limit the number of partitions being generated. Basically, every time we try to add a partition to the list of partitions ready to be consumed, we do this:

  • prune the list of futures to only keep the partition reader threads that have been completed
  • if the number of futures remaining is higher than a certain value, the thread will sleep before checking again if we can create a thread for this partition

The locks on this algorithm seems approximative hence why we have this log in case we really go too much over it:

        if len(self._futures) > self._logging_threshold:
            self._logger.warning(
                f"ThreadPoolManager: The list of futures is getting bigger than expected ({len(self._futures)})"
            )

The current limit is 10 000 threads waiting in the queue.

In order to prevent if nothing is configured, we will set the default to 2 threads.

Summary by CodeRabbit

  • New Features
    • Introduced a new field definition for flexible data manipulation within component schemas.
  • Improvements
    • Updated concurrency handling to enhance performance and prevent potential deadlocks.
    • Refined authentication sections for clearer usage instructions.
  • Documentation
    • Enhanced schema definitions with improved descriptions and examples for better understanding.

@github-actions github-actions bot added the bug Something isn't working label Dec 6, 2024
Copy link
Contributor

coderabbitai bot commented Dec 6, 2024

📝 Walkthrough

Walkthrough

The changes in this pull request involve significant updates to the ConcurrentDeclarativeSource class and the declarative_component_schema.yaml file. A new class-level constant, _LOWEST_SAFE_CONCURRENCY_LEVEL, has been introduced to replace the previous concurrency level constant. This impacts the initialization logic and the handling of partitions. Additionally, the schema file has been enhanced with a new field definition, updates to existing definitions, and clarifications regarding error handling and authentication properties.

Changes

File Path Change Summary
airbyte_cdk/sources/declarative/concurrent_declarative_source.py - Updated concurrency constant from SINGLE_THREADED_CONCURRENCY_LEVEL = 1 to _LOWEST_SAFE_CONCURRENCY_LEVEL = 2.
- Adjusted initialization logic to default to the new concurrency level.
- Maintained error handling for component generation.
airbyte_cdk/sources/declarative/declarative_component_schema.yaml - Added new field definition AddedFieldDefinition.
- Updated ConcurrencyLevel description to include deadlock warnings.
- Refined BearerAuthenticator and ApiKeyAuthenticator definitions regarding the inject_into property.

Possibly related PRs

  • chore: fix typing issues #135: The changes in this PR also modify the ConcurrentDeclarativeSource class in the same file, focusing on stream handling and error checking, which may interact with the new concurrency model introduced in the main PR.

Suggested reviewers

  • maxi297

📜 Recent review details

Configuration used: CodeRabbit UI
Review profile: CHILL

📥 Commits

Reviewing files that changed from the base of the PR and between 59505ff and 153b25b.

📒 Files selected for processing (2)
  • airbyte_cdk/sources/declarative/concurrent_declarative_source.py (2 hunks)
  • airbyte_cdk/sources/declarative/declarative_component_schema.yaml (1 hunks)
👮 Files not reviewed due to content moderation or server errors (2)
  • airbyte_cdk/sources/declarative/concurrent_declarative_source.py
  • airbyte_cdk/sources/declarative/declarative_component_schema.yaml

Thank you for using CodeRabbit. We offer it for free to the OSS community and would appreciate your support in helping us grow. If you find it useful, would you consider giving us a shout-out on your favorite social media?

❤️ Share
🪧 Tips

Chat

There are 3 ways to chat with CodeRabbit:

‼️ IMPORTANT
Auto-reply has been disabled for this repository in the CodeRabbit settings. The CodeRabbit bot will not respond to your replies unless it is explicitly tagged.

  • Files and specific lines of code (under the "Files changed" tab): Tag @coderabbitai in a new review comment at the desired location with your query. Examples:
    • @coderabbitai generate unit testing code for this file.
    • @coderabbitai modularize this function.
  • PR comments: Tag @coderabbitai in a new PR comment to ask questions about the PR branch. For the best results, please provide a very specific query, as very limited context is provided in this mode. Examples:
    • @coderabbitai gather interesting stats about this repository and render them as a table. Additionally, render a pie chart showing the language distribution in the codebase.
    • @coderabbitai read src/utils.ts and generate unit testing code.
    • @coderabbitai read the files in the src/scheduler package and generate a class diagram using mermaid and a README in the markdown format.
    • @coderabbitai help me debug CodeRabbit configuration file.

Note: Be mindful of the bot's finite context window. It's strongly recommended to break down tasks such as reading entire modules into smaller chunks. For a focused discussion, use review comments to chat about specific files and their changes, instead of using the PR comments.

CodeRabbit Commands (Invoked using PR comments)

  • @coderabbitai pause to pause the reviews on a PR.
  • @coderabbitai resume to resume the paused reviews.
  • @coderabbitai review to trigger an incremental review. This is useful when automatic reviews are disabled for the repository.
  • @coderabbitai full review to do a full review from scratch and review all the files again.
  • @coderabbitai summary to regenerate the summary of the PR.
  • @coderabbitai resolve resolve all the CodeRabbit review comments.
  • @coderabbitai configuration to show the current CodeRabbit configuration for the repository.
  • @coderabbitai help to get help.

Other keywords and placeholders

  • Add @coderabbitai ignore anywhere in the PR description to prevent this PR from being reviewed.
  • Add @coderabbitai summary to generate the high-level summary at a specific location in the PR description.
  • Add @coderabbitai anywhere in the PR title to generate the title automatically.

CodeRabbit Configuration File (.coderabbit.yaml)

  • You can programmatically configure CodeRabbit by adding a .coderabbit.yaml file to the root of your repository.
  • Please see the configuration documentation for more information.
  • If your editor has YAML language server enabled, you can add the path at the top of this file to enable auto-completion and validation: # yaml-language-server: $schema=https://coderabbit.ai/integrations/schema.v2.json

Documentation and Community

  • Visit our Documentation for detailed information on how to use CodeRabbit.
  • Join our Discord Community to get help, request features, and share feedback.
  • Follow us on X/Twitter for updates and announcements.

@maxi297 maxi297 changed the title fix(concurrency): Ensure default concurrency level does not generate deadlock fix(concurrent): Ensure default concurrency level does not generate deadlock Dec 6, 2024
@maxi297 maxi297 requested review from brianjlai and tolik0 December 6, 2024 18:48
Copy link
Contributor

@brianjlai brianjlai left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

a couple questions! but makes sense!

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
bug Something isn't working
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants