Skip to content

fix(concurrent-perpartition-cursor): Fix memory issues #568

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 9 commits into from
Jun 12, 2025

Conversation

tolik0
Copy link
Contributor

@tolik0 tolik0 commented May 28, 2025

Summary by CodeRabbit

  • Bug Fixes

    • Improved memory management and resource cleanup for partition processing, reducing the risk of memory leaks during concurrent operations.
  • Tests

    • Enhanced test coverage for partition handling, including scenarios with duplicate partitions and improved assertions on internal state cleanup.

@github-actions github-actions bot added the bug Something isn't working label May 28, 2025
Copy link
Contributor

coderabbitai bot commented May 28, 2025

📝 Walkthrough

Walkthrough

The update introduces explicit sequence tracking and enhanced cleanup logic for partition lifecycle management in the concurrent partition cursor. Internal data structures are refined to better manage partition states, sequence numbers, and resource cleanup. Unit tests are expanded and new tests are added to cover duplicate partition handling and verify the correctness of internal state management.

Changes

File(s) Change Summary
airbyte_cdk/sources/declarative/incremental/concurrent_partition_cursor.py Refactored partition lifecycle logic: added _cleanup_if_done for resource cleanup, introduced sequence tracking attributes, updated parent state management to use sequence numbers, and improved finished partition removal.
unit_tests/sources/declarative/incremental/test_concurrent_perpartitioncursor.py Expanded assertions in existing tests to cover new internal tracking structures, and added new tests for duplicate partition scenarios. Introduced helper function _make_inner_cursor.

Sequence Diagram(s)

sequenceDiagram
    participant ParentCursor
    participant ConcurrentCursor
    participant Partition
    participant InnerCursor

    ParentCursor->>ConcurrentCursor: Create new partition
    ConcurrentCursor->>Partition: Assign sequence number, track state
    Partition->>InnerCursor: Generate slices
    InnerCursor-->>Partition: Return slices
    Partition-->>ConcurrentCursor: Mark partition as done
    ConcurrentCursor->>ConcurrentCursor: Cleanup partition resources
    ConcurrentCursor->>ParentCursor: Update parent state (if all prior partitions complete)
Loading

Would you like to also see a diagram for the duplicate partition handling logic, or does this overview suffice for your needs? Wdyt?

✨ Finishing Touches
  • 📝 Generate Docstrings

Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out.

❤️ Share
🪧 Tips

Chat

There are 3 ways to chat with CodeRabbit:

‼️ IMPORTANT
Auto-reply has been disabled for this repository in the CodeRabbit settings. The CodeRabbit bot will not respond to your replies unless it is explicitly tagged.

  • Files and specific lines of code (under the "Files changed" tab): Tag @coderabbitai in a new review comment at the desired location with your query. Examples:
    • @coderabbitai explain this code block.
    • @coderabbitai modularize this function.
  • PR comments: Tag @coderabbitai in a new PR comment to ask questions about the PR branch. For the best results, please provide a very specific query, as very limited context is provided in this mode. Examples:
    • @coderabbitai gather interesting stats about this repository and render them as a table. Additionally, render a pie chart showing the language distribution in the codebase.
    • @coderabbitai read src/utils.ts and explain its main purpose.
    • @coderabbitai read the files in the src/scheduler package and generate a class diagram using mermaid and a README in the markdown format.
    • @coderabbitai help me debug CodeRabbit configuration file.

Support

Need help? Create a ticket on our support page for assistance with any issues or questions.

Note: Be mindful of the bot's finite context window. It's strongly recommended to break down tasks such as reading entire modules into smaller chunks. For a focused discussion, use review comments to chat about specific files and their changes, instead of using the PR comments.

CodeRabbit Commands (Invoked using PR comments)

  • @coderabbitai pause to pause the reviews on a PR.
  • @coderabbitai resume to resume the paused reviews.
  • @coderabbitai review to trigger an incremental review. This is useful when automatic reviews are disabled for the repository.
  • @coderabbitai full review to do a full review from scratch and review all the files again.
  • @coderabbitai summary to regenerate the summary of the PR.
  • @coderabbitai generate docstrings to generate docstrings for this PR.
  • @coderabbitai generate sequence diagram to generate a sequence diagram of the changes in this PR.
  • @coderabbitai resolve resolve all the CodeRabbit review comments.
  • @coderabbitai configuration to show the current CodeRabbit configuration for the repository.
  • @coderabbitai help to get help.

Other keywords and placeholders

  • Add @coderabbitai ignore anywhere in the PR description to prevent this PR from being reviewed.
  • Add @coderabbitai summary to generate the high-level summary at a specific location in the PR description.
  • Add @coderabbitai anywhere in the PR title to generate the title automatically.

CodeRabbit Configuration File (.coderabbit.yaml)

  • You can programmatically configure CodeRabbit by adding a .coderabbit.yaml file to the root of your repository.
  • Please see the configuration documentation for more information.
  • If your editor has YAML language server enabled, you can add the path at the top of this file to enable auto-completion and validation: # yaml-language-server: $schema=https://coderabbit.ai/integrations/schema.v2.json

Documentation and Community

  • Visit our Documentation for detailed information on how to use CodeRabbit.
  • Join our Discord Community to get help, request features, and share feedback.
  • Follow us on X/Twitter for updates and announcements.

Copy link
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 2

🔭 Outside diff range comments (1)
unit_tests/sources/declarative/incremental/test_concurrent_perpartitioncursor.py (1)

1-1: ⚠️ Potential issue

Fix the formatting issue to pass pipeline checks

The pipeline is failing due to formatting issues. Could you run ruff format on this file to fix the code style issues? This will help ensure the PR passes all checks, wdyt?

🧰 Tools
🪛 GitHub Actions: Linters

[error] 1-1: Ruff formatting check failed. File would be reformatted. Run 'ruff format' to fix code style issues.

🧹 Nitpick comments (3)
airbyte_cdk/sources/declarative/incremental/concurrent_partition_cursor.py (1)

102-108: Well-designed sequence tracking data structures!

The choice of data structures is appropriate:

  • OrderedDict for maintaining insertion order
  • deque for efficient sequence tracking
  • Simple counter for sequence generation

Would it be helpful to add a brief comment explaining the relationship between these new attributes? Something like "These attributes work together to track partition processing order and enable safe parent state updates"? wdyt?

unit_tests/sources/declarative/incremental/test_concurrent_perpartitioncursor.py (2)

3439-3440: Verify the semaphore cleanup behavior aligns with the new implementation

I see you've updated the expected semaphore count from 2 to 1, reflecting that semaphores are now cleaned up after partition completion. This looks correct based on the PR's objective to fix memory issues.

The TODO comment mentions adding checks for other internal values - would you like to expand on what specific internal state should be validated here? Perhaps checking _partition_parent_state_map or _finished_partitions could strengthen this test, wdyt?


3523-3524: Confirm the zero semaphore expectation is correct

The assertion now expects 0 semaphores instead of 1, which is consistent with the more aggressive cleanup behavior mentioned in the AI summary. This aligns well with the memory issue fixes.

Similar to the previous test, the TODO suggests adding more internal state checks. Would it be helpful to validate the state of _cursor_per_partition or other cleanup-related internal structures here as well, wdyt?

📜 Review details

Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between 155cdc8 and 45d3eaf.

📒 Files selected for processing (2)
  • airbyte_cdk/sources/declarative/incremental/concurrent_partition_cursor.py (6 hunks)
  • unit_tests/sources/declarative/incremental/test_concurrent_perpartitioncursor.py (2 hunks)
🧰 Additional context used
🪛 GitHub Actions: Linters
unit_tests/sources/declarative/incremental/test_concurrent_perpartitioncursor.py

[error] 1-1: Ruff formatting check failed. File would be reformatted. Run 'ruff format' to fix code style issues.

airbyte_cdk/sources/declarative/incremental/concurrent_partition_cursor.py

[error] 5-29: Ruff: Import block is un-sorted or un-formatted. Organize imports. 1 fixable error with the --fix option.

⏰ Context from checks skipped due to timeout of 90000ms (9)
  • GitHub Check: Check: 'source-hardcoded-records' (skip=false)
  • GitHub Check: Check: 'source-amplitude' (skip=false)
  • GitHub Check: Check: 'source-shopify' (skip=false)
  • GitHub Check: Check: 'source-pokeapi' (skip=false)
  • GitHub Check: SDM Docker Image Build
  • GitHub Check: Pytest (All, Python 3.10, Ubuntu)
  • GitHub Check: Pytest (All, Python 3.11, Ubuntu)
  • GitHub Check: Pytest (Fast)
  • GitHub Check: Analyze (python)
🔇 Additional comments (4)
airbyte_cdk/sources/declarative/incremental/concurrent_partition_cursor.py (4)

169-171: Good placement of cleanup logic.

Cleaning up resources after updating the global cursor ensures that state is consistent before freeing memory.


180-189: Elegant sequence-based parent state update logic!

The implementation correctly ensures that parent state is only updated when all partitions with lower or equal sequence numbers have been closed. The use of deque[0] for O(1) access to the oldest open sequence is efficient.


273-277: Proper thread-safe sequence assignment.

The sequence assignment is correctly synchronized under the lock, ensuring unique sequences for each partition.


323-323: Consistent partition removal check.

Good change! Checking for presence in _seq_by_partition is more reliable since cleaned-up partitions are removed from this mapping.

Copy link
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 0

♻️ Duplicate comments (2)
airbyte_cdk/sources/declarative/incremental/concurrent_partition_cursor.py (2)

9-9: Import formatting issue persists.

This is the same import sorting issue that was flagged in previous reviews and is causing pipeline failures. Could you run ruff --fix to resolve this automatically? wdyt?


463-481: ⚠️ Potential issue

Thread safety and performance concerns in cleanup method.

This method has the same issues flagged in previous reviews:

  1. Thread safety: The method accesses shared data structures without explicit locking documentation
  2. Performance: deque.remove(seq) on line 478 is O(n) operation
  3. Protected member access: Line 470 accesses _value which static analysis flags
  4. Logging format: Line 480 should use lazy formatting

Would you consider addressing these concerns as suggested in the previous review? wdyt?

     def _cleanup_if_done(self, partition_key: str) -> None:
         """
         Free every in-memory structure that belonged to a completed partition:
         cursor, semaphore, flag inside `_finished_partitions`
+        
+        Note: This method must be called while holding self._lock
         """
+        assert self._lock.locked(), "_cleanup_if_done must be called while holding the lock"
+        
         if not (
                 partition_key in self._finished_partitions
-                and self._semaphore_per_partition[partition_key]._value == 0
+                and self._semaphore_per_partition[partition_key]._value == 0  # type: ignore[attr-defined]
         ):
            return

         self._semaphore_per_partition.pop(partition_key, None)
         self._finished_partitions.discard(partition_key)

-        seq = self._seq_by_partition.pop(partition_key)
-        self._open_seqs.remove(seq)
+        seq = self._seq_by_partition.pop(partition_key, None)
+        if seq is not None:
+            try:
+                self._open_seqs.remove(seq)
+            except ValueError:
+                logger.warning("Sequence %s not found in open_seqs for partition %s", seq, partition_key)

-        logger.debug(f"Partition {partition_key} fully processed and cleaned up.")
+        logger.debug("Partition %s fully processed and cleaned up.", partition_key)
🧰 Tools
🪛 Pylint (3.3.7)

[warning] 470-470: Access to a protected member _value of a client class

(W0212)


[warning] 480-480: Use lazy % formatting in logging functions

(W1203)

🧹 Nitpick comments (1)
airbyte_cdk/sources/declarative/incremental/concurrent_partition_cursor.py (1)

182-200: Address the unused variable warning.

The static analysis tool flagged earliest_key as unused on line 186. Since you're only using the value part of the tuple, would you consider using _ for the unused key? wdyt?

-            earliest_key, (candidate_state, candidate_seq) = next(
+            _, (candidate_state, candidate_seq) = next(
                 iter(self._partition_parent_state_map.items())
             )
🧰 Tools
🪛 Pylint (3.3.7)

[warning] 186-186: Unused variable 'earliest_key'

(W0612)

📜 Review details

Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between 45d3eaf and 0ccfb19.

📒 Files selected for processing (2)
  • airbyte_cdk/sources/declarative/incremental/concurrent_partition_cursor.py (6 hunks)
  • unit_tests/sources/declarative/incremental/test_concurrent_perpartitioncursor.py (4 hunks)
🧰 Additional context used
🧬 Code Graph Analysis (1)
airbyte_cdk/sources/declarative/incremental/concurrent_partition_cursor.py (3)
airbyte_cdk/sources/streams/checkpoint/per_partition_key_serializer.py (1)
  • PerPartitionKeySerializer (7-22)
airbyte_cdk/sources/declarative/incremental/global_substream_cursor.py (1)
  • Timer (50-68)
airbyte_cdk/sources/streams/concurrent/cursor.py (1)
  • _emit_state_message (274-283)
🪛 Pylint (3.3.7)
airbyte_cdk/sources/declarative/incremental/concurrent_partition_cursor.py

[warning] 186-186: Unused variable 'earliest_key'

(W0612)


[refactor] 288-288: Do not raise StopIteration in generator, use return statement instead

(R1708)


[warning] 470-470: Access to a protected member _value of a client class

(W0212)


[warning] 480-480: Use lazy % formatting in logging functions

(W1203)

unit_tests/sources/declarative/incremental/test_concurrent_perpartitioncursor.py

[warning] 3440-3440: Access to a protected member _semaphore_per_partition of a client class

(W0212)


[warning] 3441-3441: Access to a protected member _finished_partitions of a client class

(W0212)


[warning] 3442-3442: Access to a protected member _open_seqs of a client class

(W0212)


[warning] 3443-3443: Access to a protected member _seq_by_partition of a client class

(W0212)


[warning] 3528-3528: Access to a protected member _semaphore_per_partition of a client class

(W0212)


[warning] 3529-3529: Access to a protected member _finished_partitions of a client class

(W0212)


[warning] 3530-3530: Access to a protected member _open_seqs of a client class

(W0212)


[warning] 3531-3531: Access to a protected member _seq_by_partition of a client class

(W0212)


[warning] 3608-3608: Access to a protected member _semaphore_per_partition of a client class

(W0212)


[warning] 3609-3609: Access to a protected member _finished_partitions of a client class

(W0212)


[warning] 3610-3610: Access to a protected member _open_seqs of a client class

(W0212)


[warning] 3611-3611: Access to a protected member _seq_by_partition of a client class

(W0212)


[warning] 3730-3730: Access to a protected member _open_seqs of a client class

(W0212)


[warning] 3731-3731: Access to a protected member _seq_by_partition of a client class

(W0212)


[warning] 3732-3732: Access to a protected member _partition_parent_state_map of a client class

(W0212)


[warning] 3733-3733: Access to a protected member _partition_parent_state_map of a client class

(W0212)


[warning] 3740-3740: Access to a protected member _finished_partitions of a client class

(W0212)


[warning] 3741-3741: Access to a protected member _semaphore_per_partition of a client class

(W0212)


[warning] 3742-3742: Access to a protected member _open_seqs of a client class

(W0212)


[warning] 3743-3743: Access to a protected member _seq_by_partition of a client class

(W0212)

🪛 GitHub Actions: Linters
airbyte_cdk/sources/declarative/incremental/concurrent_partition_cursor.py

[error] 5-29: Ruff import sorting error: Import block is un-sorted or un-formatted. Run 'ruff --fix' to automatically fix import order.

⏰ Context from checks skipped due to timeout of 90000ms (6)
  • GitHub Check: Check: 'source-amplitude' (skip=false)
  • GitHub Check: Check: 'source-shopify' (skip=false)
  • GitHub Check: Check: 'source-hardcoded-records' (skip=false)
  • GitHub Check: Pytest (All, Python 3.11, Ubuntu)
  • GitHub Check: Pytest (All, Python 3.10, Ubuntu)
  • GitHub Check: Pytest (Fast)
🔇 Additional comments (9)
airbyte_cdk/sources/declarative/incremental/concurrent_partition_cursor.py (4)

102-114: The sequence tracking data structures look well-designed.

The new attributes for sequence-based tracking provide a clear approach to managing partition lifecycle and parent state updates. The type hints are correct and the comments are helpful.


175-176: Good addition of cleanup call.

Adding the cleanup call after partition processing ensures proper resource deallocation. This aligns well with the new sequence-based tracking system.


280-293: The sequence assignment logic looks correct.

The implementation properly assigns unique sequence numbers and tracks them in the appropriate data structures. The parent state mapping with sequence numbers is a good approach for coordinating updates.

🧰 Tools
🪛 Pylint (3.3.7)

[refactor] 288-288: Do not raise StopIteration in generator, use return statement instead

(R1708)


330-330:

✅ Verification successful

Verify the cleanup detection logic change.

The change from checking _finished_partitions to checking _seq_by_partition for cleanup detection makes sense with the new sequence tracking. Could you confirm this correctly identifies partitions that have been fully processed and cleaned up? wdyt?


🏁 Script executed:

#!/bin/bash
# Description: Search for other uses of _finished_partitions to ensure consistency
rg "_finished_partitions" -A 3 -B 3

Length of output: 8910


🏁 Script executed:

#!/bin/bash
# Display code context around lines 250-360 to locate the cleanup logic
sed -n '250,360p' airbyte_cdk/sources/declarative/incremental/concurrent_partition_cursor.py

Length of output: 5581


Verify cleanup detection via _seq_by_partition

It looks like _ensure_partition_limit now drops partitions when partition_key not in self._seq_by_partition, which corresponds to partitions that have already been cleaned up by _cleanup_if_done (where we pop from _seq_by_partition). Given that:

  • We add an entry to _seq_by_partition at slice generation start.
  • We only remove that entry in _cleanup_if_done once the last slice is emitted and the semaphore count is zero.

That means by the time _ensure_partition_limit runs, any partition absent from _seq_by_partition has indeed finished and been cleaned up. wdyt?

• No other code still uses _finished_partitions for removal—cleanup and limit logic are now consistently driven by _seq_by_partition.

resolve_review_comment

unit_tests/sources/declarative/incremental/test_concurrent_perpartitioncursor.py (5)

3439-3443: Excellent addition of sequence-based tracking validation!

These new assertions effectively test the internal state management for the sequence-based partition tracking. The checks for _finished_partitions, _open_seqs, and _seq_by_partition align perfectly with the memory leak fixes mentioned in the PR.

One small question - would it be helpful to also verify that the counts match expected values rather than just checking lengths? For instance, asserting that _open_seqs contains exactly the expected sequence numbers, wdyt?

🧰 Tools
🪛 Pylint (3.3.7)

[warning] 3440-3440: Access to a protected member _semaphore_per_partition of a client class

(W0212)


[warning] 3441-3441: Access to a protected member _finished_partitions of a client class

(W0212)


[warning] 3442-3442: Access to a protected member _open_seqs of a client class

(W0212)


[warning] 3443-3443: Access to a protected member _seq_by_partition of a client class

(W0212)


3527-3531: Consistent cleanup verification patterns - nicely done!

The pattern of verifying all internal data structures are properly managed is consistent across test methods. This helps ensure the sequence-based cleanup logic works correctly in different scenarios.

🧰 Tools
🪛 Pylint (3.3.7)

[warning] 3528-3528: Access to a protected member _semaphore_per_partition of a client class

(W0212)


[warning] 3529-3529: Access to a protected member _finished_partitions of a client class

(W0212)


[warning] 3530-3530: Access to a protected member _open_seqs of a client class

(W0212)


[warning] 3531-3531: Access to a protected member _seq_by_partition of a client class

(W0212)


3606-3611: Thorough cleanup validation for memory leak prevention!

These assertions are crucial for verifying that the memory issues are resolved. Checking that all internal variables (_semaphore_per_partition, _finished_partitions, _open_seqs, _seq_by_partition) are cleaned up after processing ensures no resource leaks.

🧰 Tools
🪛 Pylint (3.3.7)

[warning] 3608-3608: Access to a protected member _semaphore_per_partition of a client class

(W0212)


[warning] 3609-3609: Access to a protected member _finished_partitions of a client class

(W0212)


[warning] 3610-3610: Access to a protected member _open_seqs of a client class

(W0212)


[warning] 3611-3611: Access to a protected member _seq_by_partition of a client class

(W0212)


3730-3733: Smart validation of sequence-based parent state mapping!

The assertions verify that _partition_parent_state_map entries now include sequence numbers as expected from the refactoring. Lines 3732-3733 effectively test that the partition keys map to tuples containing both parent state and sequence information.

Just wondering - should we also validate that the sequence numbers are in the expected order or range, wdyt?

🧰 Tools
🪛 Pylint (3.3.7)

[warning] 3730-3730: Access to a protected member _open_seqs of a client class

(W0212)


[warning] 3731-3731: Access to a protected member _seq_by_partition of a client class

(W0212)


[warning] 3732-3732: Access to a protected member _partition_parent_state_map of a client class

(W0212)


[warning] 3733-3733: Access to a protected member _partition_parent_state_map of a client class

(W0212)


3740-3743: Comprehensive cleanup verification after partition closure!

The test properly validates that closing partitions triggers the cleanup of all sequence-related tracking structures. This is essential for confirming the memory issue fixes work as intended.

🧰 Tools
🪛 Pylint (3.3.7)

[warning] 3740-3740: Access to a protected member _finished_partitions of a client class

(W0212)


[warning] 3741-3741: Access to a protected member _semaphore_per_partition of a client class

(W0212)


[warning] 3742-3742: Access to a protected member _open_seqs of a client class

(W0212)


[warning] 3743-3743: Access to a protected member _seq_by_partition of a client class

(W0212)

@tolik0
Copy link
Contributor Author

tolik0 commented Jun 2, 2025

/autofix

Auto-Fix Job Info

This job attempts to auto-fix any linting or formating issues. If any fixes are made,
those changes will be automatically committed and pushed back to the PR.

Note: This job can only be run by maintainers. On PRs from forks, this command requires
that the PR author has enabled the Allow edits from maintainers option.

PR auto-fix job started... Check job output.

✅ Changes applied successfully.

Copy link
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 0

♻️ Duplicate comments (1)
airbyte_cdk/sources/declarative/incremental/concurrent_partition_cursor.py (1)

463-481: 🛠️ Refactor suggestion

Address thread safety documentation and performance concerns.

This is a duplicate of previous feedback about the cleanup method. The implementation has several areas for improvement that were identified in past reviews:

  1. Protected member access: Line 470 accesses _semaphore_per_partition[partition_key]._value which is a protected member
  2. Performance: deque.remove(seq) on line 478 is O(n) operation
  3. Error handling: Missing error handling for edge cases
  4. Logging format: Should use lazy formatting

Here's an improved version addressing these concerns:

 def _cleanup_if_done(self, partition_key: str) -> None:
     """
     Free every in-memory structure that belonged to a completed partition:
     cursor, semaphore, flag inside `_finished_partitions`
+    
+    Note: This method must be called while holding self._lock
     """
+    assert self._lock.locked(), "_cleanup_if_done must be called while holding the lock"
+    
     if not (
             partition_key in self._finished_partitions
-            and self._semaphore_per_partition[partition_key]._value == 0
+            and self._semaphore_per_partition.get(partition_key, threading.Semaphore(1)).acquire(blocking=False)
     ):
        return
+    # Release the semaphore we just acquired for the check
+    if partition_key in self._semaphore_per_partition:
+        self._semaphore_per_partition[partition_key].release()

     self._semaphore_per_partition.pop(partition_key, None)
     self._finished_partitions.discard(partition_key)

-    seq = self._seq_by_partition.pop(partition_key)
-    self._open_seqs.remove(seq)
+    seq = self._seq_by_partition.pop(partition_key, None)
+    if seq is not None:
+        try:
+            self._open_seqs.remove(seq)
+        except ValueError:
+            logger.warning("Sequence %s not found in open_seqs for partition %s", seq, partition_key)

-    logger.debug(f"Partition {partition_key} fully processed and cleaned up.")
+    logger.debug("Partition %s fully processed and cleaned up.", partition_key)

What do you think about this approach? The semaphore check is still not ideal due to the race condition, but it's safer than accessing protected members. wdyt?

🧰 Tools
🪛 Pylint (3.3.7)

[warning] 470-470: Access to a protected member _value of a client class

(W0212)


[warning] 480-480: Use lazy % formatting in logging functions

(W1203)

🧹 Nitpick comments (1)
airbyte_cdk/sources/declarative/incremental/concurrent_partition_cursor.py (1)

186-196: Fix unused variable and consider edge case.

The logic improvement using sequence numbers is great! However, there's an unused variable that should be removed.

-        while self._partition_parent_state_map:
-            earliest_key, (candidate_state, candidate_seq) = next(
-                iter(self._partition_parent_state_map.items())
-            )
+        while self._partition_parent_state_map:
+            _, (candidate_state, candidate_seq) = next(
+                iter(self._partition_parent_state_map.items())
+            )

Also, wdyt about adding a safety check for empty _open_seqs to be more explicit about the condition?

🧰 Tools
🪛 Pylint (3.3.7)

[warning] 186-186: Unused variable 'earliest_key'

(W0612)

📜 Review details

Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between 0ccfb19 and 276bb8c.

📒 Files selected for processing (1)
  • airbyte_cdk/sources/declarative/incremental/concurrent_partition_cursor.py (6 hunks)
🧰 Additional context used
🧬 Code Graph Analysis (1)
airbyte_cdk/sources/declarative/incremental/concurrent_partition_cursor.py (3)
airbyte_cdk/sources/streams/checkpoint/per_partition_key_serializer.py (1)
  • PerPartitionKeySerializer (7-22)
airbyte_cdk/sources/declarative/incremental/global_substream_cursor.py (1)
  • Timer (50-68)
airbyte_cdk/sources/streams/concurrent/cursor.py (1)
  • _emit_state_message (274-283)
🪛 Pylint (3.3.7)
airbyte_cdk/sources/declarative/incremental/concurrent_partition_cursor.py

[warning] 186-186: Unused variable 'earliest_key'

(W0612)


[refactor] 288-288: Do not raise StopIteration in generator, use return statement instead

(R1708)


[warning] 470-470: Access to a protected member _value of a client class

(W0212)


[warning] 480-480: Use lazy % formatting in logging functions

(W1203)

⏰ Context from checks skipped due to timeout of 90000ms (7)
  • GitHub Check: Check: 'source-amplitude' (skip=false)
  • GitHub Check: Check: 'source-shopify' (skip=false)
  • GitHub Check: Check: 'source-pokeapi' (skip=false)
  • GitHub Check: Check: 'source-hardcoded-records' (skip=false)
  • GitHub Check: Pytest (Fast)
  • GitHub Check: Pytest (All, Python 3.10, Ubuntu)
  • GitHub Check: Pytest (All, Python 3.11, Ubuntu)
🔇 Additional comments (7)
airbyte_cdk/sources/declarative/incremental/concurrent_partition_cursor.py (7)

9-9: Import looks good!

The addition of deque import is necessary for the new sequence tracking functionality.


102-104: Nice enhancement to the data structure!

Storing tuples with both parent state and sequence number enables better coordination of parent state updates. The type annotation clearly indicates the structure.


109-114: Good addition of sequence tracking attributes!

The new attributes provide the foundation for proper partition lifecycle management and resource cleanup. The comments clearly explain their purpose.


175-176: Excellent addition of cleanup logic!

Calling cleanup after partition processing ensures proper resource deallocation and prevents memory leaks.


280-284: Solid sequence assignment implementation!

The sequence generation and tracking logic is clean and thread-safe within the lock context.


289-292: Good integration of sequence numbers!

The parent state map now properly stores the sequence number alongside the state, enabling the improved coordination logic.


330-330: Smart optimization for cleanup detection!

Using _seq_by_partition to identify finished partitions is more efficient than checking multiple conditions.

@tolik0 tolik0 self-assigned this Jun 2, 2025
@tolik0 tolik0 requested a review from maxi297 June 2, 2025 16:06
Copy link
Contributor

@maxi297 maxi297 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

My understanding of this PR:

  • We stopped relying on the _finished_partitions to clean the parent state. This allows us to do the following point
  • We clean everything once we close a partition which the partition key has been fully processed (i.e. stream slices were created and were all closed)

The drawback is that we now have an integer in memory for each partition (which should be very limited as an impact). The thing I haven't thought of is how this works if the same partition key is emitted multiple time by the parent stream. Let's say we have:

  • Parent record ID = 1
  • Parent record ID = 2
  • Parent record ID = ...
  • Parent record ID = 1000
  • Parent record ID = 1

It feels like there are two cases here:

  • The memory items related to parent record ID = 1 are cleaned (because all stream slices have been processed) before we emit parent record ID = 1 again which means that we will cleaned stuff up for parent record ID = 1 and in that case, it is like we have never processed parent record ID = 1
  • We are still processing the stream slices for parent record ID = 1 before we emit it a second time. In that case, does that mean that we won't clean all the parent ids from 2 to 1000 until we reprocess all the parent record ID = 1 again?

Copy link
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 1

🧹 Nitpick comments (5)
airbyte_cdk/sources/declarative/incremental/concurrent_partition_cursor.py (4)

102-114: Consider adding documentation for the new complex state tracking.

The new data structures introduce sophisticated partition lifecycle management. While the implementation looks correct, the relationships between these structures could benefit from more documentation. Would you consider adding docstrings explaining how these work together? For example:

+        # Sequence-based partition tracking for memory management and parent state coordination:
+        # - _processing_partitions_indexes: Active partition sequence numbers (sorted)
+        # - _partition_key_to_index: Maps partition keys to their sequence numbers
+        # - _generated_partitions_count: Monotonic counter for sequence generation
         self._partition_parent_state_map: OrderedDict[str, tuple[Mapping[str, Any], int]] = (
             OrderedDict()
         )
🧰 Tools
🪛 Pylint (3.3.7)

[refactor] 110-110: Consider using [] instead of list()

(R1734)


284-288: Consider using a more Pythonic list initialization.

The sequence tracking logic looks correct and thread-safe since it's under the lock. However, static analysis suggests using [] instead of list() for initialization. Also, consider if using a deque might be more efficient for the frequent append/remove operations on _processing_partitions_indexes. Wdyt?

-        self._processing_partitions_indexes: List[int] = list()
+        self._processing_partitions_indexes: List[int] = []

467-485: Review the cleanup method for potential improvements.

The cleanup method addresses the memory issues well, but there are a few considerations:

  1. Performance: list.remove(seq) is O(n). Given that this could be called frequently, would using a more efficient data structure help?

  2. Logging: Static analysis suggests using lazy formatting for the debug log.

  3. Error handling: Should we handle the case where seq might not be in _processing_partitions_indexes?

     def _cleanup_if_done(self, partition_key: str) -> None:
         """
         Free every in-memory structure that belonged to a completed partition:
         cursor, semaphore, flag inside `_finished_partitions`
+        
+        Note: This method must be called while holding self._lock
         """
         if not (
             partition_key in self._partitions_done_generating_stream_slices
             and self._semaphore_per_partition[partition_key]._value == 0
         ):
             return

         self._semaphore_per_partition.pop(partition_key, None)
         self._partitions_done_generating_stream_slices.discard(partition_key)

-        seq = self._partition_key_to_index.pop(partition_key)
-        self._processing_partitions_indexes.remove(seq)
+        seq = self._partition_key_to_index.pop(partition_key, None)
+        if seq is not None:
+            try:
+                self._processing_partitions_indexes.remove(seq)
+            except ValueError:
+                logger.warning("Sequence %s not found in processing indexes for partition %s", seq, partition_key)

-        logger.debug(f"Partition {partition_key} fully processed and cleaned up.")
+        logger.debug("Partition %s fully processed and cleaned up.", partition_key)
🧰 Tools
🪛 Pylint (3.3.7)

[warning] 474-474: Access to a protected member _value of a client class

(W0212)


[warning] 484-484: Use lazy % formatting in logging functions

(W1203)


171-171: Consider alternatives to accessing semaphore's protected _value attribute.

The code accesses _value on the semaphore, which is a protected member. While this works, it might be more robust to use the public API or track the semaphore count separately. However, I understand this might be necessary for the current implementation. Wdyt about documenting why this approach is needed?

Also applies to: 474-474

🧰 Tools
🪛 Pylint (3.3.7)

[warning] 171-171: Access to a protected member _value of a client class

(W0212)

unit_tests/sources/declarative/incremental/test_concurrent_perpartitioncursor.py (1)

3793-3800: Simple and focused helper function

The _make_inner_cursor helper function is well-designed - it creates a properly configured mock cursor with minimal setup. The function is focused and reusable.

One small suggestion: would it be worth adding a docstring that's a bit more descriptive about the mock's behavior, particularly that it yields exactly one slice? The current docstring is good but could be slightly more detailed, wdyt?

 def _make_inner_cursor(ts: str) -> MagicMock:
-    """Return an inner cursor that yields exactly one slice and has a proper state."""
+    """
+    Return a mock inner cursor for testing.
+    
+    Args:
+        ts: The timestamp to use for the cursor state
+        
+    Returns:
+        A MagicMock configured to yield exactly one dummy slice and return the given timestamp as state
+    """
📜 Review details

Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between 276bb8c and 262370e.

📒 Files selected for processing (2)
  • airbyte_cdk/sources/declarative/incremental/concurrent_partition_cursor.py (7 hunks)
  • unit_tests/sources/declarative/incremental/test_concurrent_perpartitioncursor.py (5 hunks)
🧰 Additional context used
🪛 Pylint (3.3.7)
airbyte_cdk/sources/declarative/incremental/concurrent_partition_cursor.py

[refactor] 110-110: Consider using [] instead of list()

(R1734)


[warning] 171-171: Access to a protected member _value of a client class

(W0212)


[warning] 186-186: Unused variable 'earliest_key'

(W0612)


[refactor] 292-292: Do not raise StopIteration in generator, use return statement instead

(R1708)


[warning] 474-474: Access to a protected member _value of a client class

(W0212)


[warning] 484-484: Use lazy % formatting in logging functions

(W1203)

unit_tests/sources/declarative/incremental/test_concurrent_perpartitioncursor.py

[warning] 3440-3440: Access to a protected member _semaphore_per_partition of a client class

(W0212)


[warning] 3441-3441: Access to a protected member _partitions_done_generating_stream_slices of a client class

(W0212)


[warning] 3442-3442: Access to a protected member _processing_partitions_indexes of a client class

(W0212)


[warning] 3443-3443: Access to a protected member _partition_key_to_index of a client class

(W0212)


[warning] 3528-3528: Access to a protected member _semaphore_per_partition of a client class

(W0212)


[warning] 3529-3529: Access to a protected member _partitions_done_generating_stream_slices of a client class

(W0212)


[warning] 3530-3530: Access to a protected member _processing_partitions_indexes of a client class

(W0212)


[warning] 3531-3531: Access to a protected member _partition_key_to_index of a client class

(W0212)


[warning] 3608-3608: Access to a protected member _semaphore_per_partition of a client class

(W0212)


[warning] 3609-3609: Access to a protected member _partitions_done_generating_stream_slices of a client class

(W0212)


[warning] 3610-3610: Access to a protected member _processing_partitions_indexes of a client class

(W0212)


[warning] 3611-3611: Access to a protected member _partition_key_to_index of a client class

(W0212)


[warning] 3730-3730: Access to a protected member _processing_partitions_indexes of a client class

(W0212)


[warning] 3731-3731: Access to a protected member _partition_key_to_index of a client class

(W0212)


[warning] 3732-3732: Access to a protected member _partition_parent_state_map of a client class

(W0212)


[warning] 3733-3733: Access to a protected member _partition_parent_state_map of a client class

(W0212)


[warning] 3740-3740: Access to a protected member _partitions_done_generating_stream_slices of a client class

(W0212)


[warning] 3741-3741: Access to a protected member _semaphore_per_partition of a client class

(W0212)


[warning] 3742-3742: Access to a protected member _processing_partitions_indexes of a client class

(W0212)


[warning] 3743-3743: Access to a protected member _partition_key_to_index of a client class

(W0212)


[convention] 3803-3803: Missing function or method docstring

(C0116)


[warning] 3839-3839: Access to a protected member _partition_router of a client class

(W0212)


[warning] 3860-3860: Access to a protected member _IS_PARTITION_DUPLICATION_LOGGED of a client class

(W0212)


[warning] 3861-3861: Access to a protected member _semaphore_per_partition of a client class

(W0212)


[warning] 3862-3862: Access to a protected member _processing_partitions_indexes of a client class

(W0212)


[warning] 3863-3863: Access to a protected member _partition_key_to_index of a client class

(W0212)


[convention] 3866-3866: Missing function or method docstring

(C0116)


[warning] 3893-3893: Access to a protected member _partition_router of a client class

(W0212)


[warning] 3910-3910: Access to a protected member _IS_PARTITION_DUPLICATION_LOGGED of a client class

(W0212)


[warning] 3911-3911: Access to a protected member _cursor_per_partition of a client class

(W0212)


[warning] 3912-3912: Access to a protected member _semaphore_per_partition of a client class

(W0212)


[warning] 3913-3913: Access to a protected member _processing_partitions_indexes of a client class

(W0212)


[warning] 3914-3914: Access to a protected member _partition_key_to_index of a client class

(W0212)

⏰ Context from checks skipped due to timeout of 90000ms (5)
  • GitHub Check: Check: 'source-shopify' (skip=false)
  • GitHub Check: Check: 'source-amplitude' (skip=false)
  • GitHub Check: Pytest (Fast)
  • GitHub Check: Pytest (All, Python 3.10, Ubuntu)
  • GitHub Check: Pytest (All, Python 3.11, Ubuntu)
🔇 Additional comments (10)
airbyte_cdk/sources/declarative/incremental/concurrent_partition_cursor.py (3)

69-70: Constants look good, but consider the past feedback about readability.

Nice addition of the constants! This addresses the previous feedback about using PARENT_STATE = 0 and GENERATION_SEQUENCE = 1 for better readability. The naming clearly indicates their purpose in tuple indexing.


279-279: Early return for duplicates is a clean solution.

This early return approach for handling duplicate partitions is much cleaner than the previous logic. It prevents unnecessary processing and maintains the warning for debugging purposes. Nice work!


170-177:

✅ Verification successful

Good integration of cleanup logic, but verify thread safety.

The cleanup integration looks well-placed after the global cursor update. However, I want to verify that _cleanup_if_done is always called under the lock as intended, since it accesses shared data structures. The implementation looks correct, but wdyt about adding an assertion to make this requirement explicit?


🏁 Script executed:

#!/bin/bash
# Verify that _cleanup_if_done is only called from contexts that hold the lock
rg -A 5 -B 5 "_cleanup_if_done"

Length of output: 2563


🏁 Script executed:

#!/bin/bash
# Inspect the initial part of the concurrent_partition_cursor to locate lock definitions and usage
sed -n '1,300p' airbyte_cdk/sources/declarative/incremental/concurrent_partition_cursor.py

Length of output: 12533


Thread safety confirmed for _cleanup_if_done

I checked the calls to _cleanup_if_done and it’s only ever invoked inside the with self._lock: block in close_partition (no other call sites). The lock guarantees that shared state is protected. Adding an assertion in _cleanup_if_done (e.g. switching to an RLock and asserting ownership) could make this even more explicit, but isn’t strictly required given the current usage.

Wdyt—should we leave it as is or introduce an explicit lock-ownership check?

🧰 Tools
🪛 Pylint (3.3.7)

[warning] 171-171: Access to a protected member _value of a client class

(W0212)

unit_tests/sources/declarative/incremental/test_concurrent_perpartitioncursor.py (7)

3439-3443: LGTM: Enhanced internal state tracking validation

These new assertions properly validate the sequence-based partition tracking structures mentioned in the PR objectives. The checks ensure that internal state management is working correctly after partition operations.

The static analysis warnings about accessing protected members are expected and appropriate in test code that needs to verify internal implementation details, wdyt?

🧰 Tools
🪛 Pylint (3.3.7)

[warning] 3440-3440: Access to a protected member _semaphore_per_partition of a client class

(W0212)


[warning] 3441-3441: Access to a protected member _partitions_done_generating_stream_slices of a client class

(W0212)


[warning] 3442-3442: Access to a protected member _processing_partitions_indexes of a client class

(W0212)


[warning] 3443-3443: Access to a protected member _partition_key_to_index of a client class

(W0212)


3527-3531: Consistent internal state validation

Good consistency with the previous assertions pattern. These checks validate the same internal tracking structures to ensure proper cleanup behavior across different test scenarios.

🧰 Tools
🪛 Pylint (3.3.7)

[warning] 3528-3528: Access to a protected member _semaphore_per_partition of a client class

(W0212)


[warning] 3529-3529: Access to a protected member _partitions_done_generating_stream_slices of a client class

(W0212)


[warning] 3530-3530: Access to a protected member _processing_partitions_indexes of a client class

(W0212)


[warning] 3531-3531: Access to a protected member _partition_key_to_index of a client class

(W0212)


3606-3611: Comprehensive cleanup verification

Excellent addition of comprehensive cleanup verification! This ensures that all the new internal tracking structures are properly cleaned up when all partitions are finished, which is crucial for preventing memory leaks in the concurrent processing implementation.

🧰 Tools
🪛 Pylint (3.3.7)

[warning] 3608-3608: Access to a protected member _semaphore_per_partition of a client class

(W0212)


[warning] 3609-3609: Access to a protected member _partitions_done_generating_stream_slices of a client class

(W0212)


[warning] 3610-3610: Access to a protected member _processing_partitions_indexes of a client class

(W0212)


[warning] 3611-3611: Access to a protected member _partition_key_to_index of a client class

(W0212)


3730-3733: Detailed partition state verification

Great addition of more granular assertions that verify specific parent state values in addition to collection lengths. This provides better test coverage of the partition lifecycle and parent state management logic.

🧰 Tools
🪛 Pylint (3.3.7)

[warning] 3730-3730: Access to a protected member _processing_partitions_indexes of a client class

(W0212)


[warning] 3731-3731: Access to a protected member _partition_key_to_index of a client class

(W0212)


[warning] 3732-3732: Access to a protected member _partition_parent_state_map of a client class

(W0212)


[warning] 3733-3733: Access to a protected member _partition_parent_state_map of a client class

(W0212)


3740-3743: Final cleanup validation

Perfect final verification that all internal structures are cleaned up after partition processing completes. This completes the comprehensive cleanup testing strategy.

🧰 Tools
🪛 Pylint (3.3.7)

[warning] 3740-3740: Access to a protected member _partitions_done_generating_stream_slices of a client class

(W0212)


[warning] 3741-3741: Access to a protected member _semaphore_per_partition of a client class

(W0212)


[warning] 3742-3742: Access to a protected member _processing_partitions_indexes of a client class

(W0212)


[warning] 3743-3743: Access to a protected member _partition_key_to_index of a client class

(W0212)


3803-3864: Excellent duplicate partition handling test

This test provides great coverage for the duplicate partition scenario where partitions are cleaned up before duplicates arrive. The test logic is clear and well-structured:

  1. Sets up a sequence of partitions with a duplicate
  2. Processes them with cleanup between iterations
  3. Verifies that no duplication is detected and cleanup works properly

The test covers an important edge case in concurrent partition processing. Well done!

🧰 Tools
🪛 Pylint (3.3.7)

[convention] 3803-3803: Missing function or method docstring

(C0116)


[warning] 3839-3839: Access to a protected member _partition_router of a client class

(W0212)


[warning] 3860-3860: Access to a protected member _IS_PARTITION_DUPLICATION_LOGGED of a client class

(W0212)


[warning] 3861-3861: Access to a protected member _semaphore_per_partition of a client class

(W0212)


[warning] 3862-3862: Access to a protected member _processing_partitions_indexes of a client class

(W0212)


[warning] 3863-3863: Access to a protected member _partition_key_to_index of a client class

(W0212)


3866-3914: Thorough duplicate detection during processing

Excellent test for detecting duplicates while partitions are still being processed! This covers the complementary scenario to the previous test. Key strengths:

  1. Tests the case where duplicates are detected before cleanup
  2. Verifies that the warning logging flag is properly set
  3. Ensures that only unique partitions are emitted (2 out of 3)
  4. Validates proper cleanup after processing

This provides comprehensive coverage of duplicate partition handling scenarios, which is crucial for the reliability of concurrent processing, wdyt?

🧰 Tools
🪛 Pylint (3.3.7)

[convention] 3866-3866: Missing function or method docstring

(C0116)


[warning] 3893-3893: Access to a protected member _partition_router of a client class

(W0212)


[warning] 3910-3910: Access to a protected member _IS_PARTITION_DUPLICATION_LOGGED of a client class

(W0212)


[warning] 3911-3911: Access to a protected member _cursor_per_partition of a client class

(W0212)


[warning] 3912-3912: Access to a protected member _semaphore_per_partition of a client class

(W0212)


[warning] 3913-3913: Access to a protected member _processing_partitions_indexes of a client class

(W0212)


[warning] 3914-3914: Access to a protected member _partition_key_to_index of a client class

(W0212)

@tolik0 tolik0 requested a review from lazebnyi June 3, 2025 17:52
Copy link
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 0

🧹 Nitpick comments (3)
unit_tests/sources/declarative/incremental/test_concurrent_perpartitioncursor.py (3)

3793-3800: Consider adding a docstring for the helper function?

The _make_inner_cursor function is a useful helper, but it's missing documentation. Would you consider adding a brief docstring to describe its purpose and return value? Something like:

def _make_inner_cursor(ts: str) -> MagicMock:
    """Create a mock inner cursor with a single slice and specified timestamp state.
    
    Args:
        ts: The timestamp string to use for the cursor state
        
    Returns:
        A configured MagicMock representing an inner cursor
    """

3867-3932: Great coverage for the cursor-exists duplicate scenario!

This test nicely complements the previous one by covering the case where the duplicate partition appears but the cursor still exists. The verification that _IS_PARTITION_DUPLICATION_LOGGED remains False and that only 2 cursors are kept (for "1" & "2") is exactly right for this scenario.

Consider adding a docstring to explain the test's purpose? Something like:

def test_duplicate_partition_after_closing_partition_cursor_exists():
    """Test duplicate partition handling when the original cursor still exists after closing."""
🧰 Tools
🪛 Pylint (3.3.7)

[convention] 3867-3867: Missing function or method docstring

(C0116)


[warning] 3901-3901: Access to a protected member _partition_router of a client class

(W0212)


[warning] 3926-3926: Access to a protected member _IS_PARTITION_DUPLICATION_LOGGED of a client class

(W0212)


[warning] 3927-3927: Access to a protected member _cursor_per_partition of a client class

(W0212)


[warning] 3928-3928: Access to a protected member _semaphore_per_partition of a client class

(W0212)


[warning] 3929-3929: Access to a protected member _processing_partitions_indexes of a client class

(W0212)


[warning] 3930-3930: Access to a protected member _partition_key_to_index of a client class

(W0212)


[warning] 3931-3931: Access to a protected member _partitions_done_generating_stream_slices of a client class

(W0212)


3934-3983: Comprehensive duplicate detection while processing!

This test covers the important edge case where a duplicate partition is detected while the original is still being processed. The key assertions are spot-on:

  • Only 2 slices generated (duplicate skipped)
  • _IS_PARTITION_DUPLICATION_LOGGED is True (warning emitted)
  • Proper cleanup of all tracking structures

The test demonstrates the duplicate detection working as intended. Would you consider adding a docstring here too for consistency?

🧰 Tools
🪛 Pylint (3.3.7)

[convention] 3934-3934: Missing function or method docstring

(C0116)


[warning] 3961-3961: Access to a protected member _partition_router of a client class

(W0212)


[warning] 3978-3978: Access to a protected member _IS_PARTITION_DUPLICATION_LOGGED of a client class

(W0212)


[warning] 3979-3979: Access to a protected member _cursor_per_partition of a client class

(W0212)


[warning] 3980-3980: Access to a protected member _semaphore_per_partition of a client class

(W0212)


[warning] 3981-3981: Access to a protected member _processing_partitions_indexes of a client class

(W0212)


[warning] 3982-3982: Access to a protected member _partition_key_to_index of a client class

(W0212)


[warning] 3983-3983: Access to a protected member _partitions_done_generating_stream_slices of a client class

(W0212)

📜 Review details

Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between 262370e and dd49c3d.

📒 Files selected for processing (1)
  • unit_tests/sources/declarative/incremental/test_concurrent_perpartitioncursor.py (5 hunks)
🧰 Additional context used
🪛 Pylint (3.3.7)
unit_tests/sources/declarative/incremental/test_concurrent_perpartitioncursor.py

[warning] 3440-3440: Access to a protected member _semaphore_per_partition of a client class

(W0212)


[warning] 3441-3441: Access to a protected member _partitions_done_generating_stream_slices of a client class

(W0212)


[warning] 3442-3442: Access to a protected member _processing_partitions_indexes of a client class

(W0212)


[warning] 3443-3443: Access to a protected member _partition_key_to_index of a client class

(W0212)


[warning] 3528-3528: Access to a protected member _semaphore_per_partition of a client class

(W0212)


[warning] 3529-3529: Access to a protected member _partitions_done_generating_stream_slices of a client class

(W0212)


[warning] 3530-3530: Access to a protected member _processing_partitions_indexes of a client class

(W0212)


[warning] 3531-3531: Access to a protected member _partition_key_to_index of a client class

(W0212)


[warning] 3608-3608: Access to a protected member _semaphore_per_partition of a client class

(W0212)


[warning] 3609-3609: Access to a protected member _partitions_done_generating_stream_slices of a client class

(W0212)


[warning] 3610-3610: Access to a protected member _processing_partitions_indexes of a client class

(W0212)


[warning] 3611-3611: Access to a protected member _partition_key_to_index of a client class

(W0212)


[warning] 3730-3730: Access to a protected member _processing_partitions_indexes of a client class

(W0212)


[warning] 3731-3731: Access to a protected member _partition_key_to_index of a client class

(W0212)


[warning] 3732-3732: Access to a protected member _partition_parent_state_map of a client class

(W0212)


[warning] 3733-3733: Access to a protected member _partition_parent_state_map of a client class

(W0212)


[warning] 3740-3740: Access to a protected member _partitions_done_generating_stream_slices of a client class

(W0212)


[warning] 3741-3741: Access to a protected member _semaphore_per_partition of a client class

(W0212)


[warning] 3742-3742: Access to a protected member _processing_partitions_indexes of a client class

(W0212)


[warning] 3743-3743: Access to a protected member _partition_key_to_index of a client class

(W0212)


[convention] 3803-3803: Missing function or method docstring

(C0116)


[warning] 3839-3839: Access to a protected member _partition_router of a client class

(W0212)


[warning] 3860-3860: Access to a protected member _IS_PARTITION_DUPLICATION_LOGGED of a client class

(W0212)


[warning] 3861-3861: Access to a protected member _semaphore_per_partition of a client class

(W0212)


[warning] 3862-3862: Access to a protected member _processing_partitions_indexes of a client class

(W0212)


[warning] 3863-3863: Access to a protected member _partition_key_to_index of a client class

(W0212)


[warning] 3864-3864: Access to a protected member _partitions_done_generating_stream_slices of a client class

(W0212)


[convention] 3867-3867: Missing function or method docstring

(C0116)


[warning] 3901-3901: Access to a protected member _partition_router of a client class

(W0212)


[warning] 3926-3926: Access to a protected member _IS_PARTITION_DUPLICATION_LOGGED of a client class

(W0212)


[warning] 3927-3927: Access to a protected member _cursor_per_partition of a client class

(W0212)


[warning] 3928-3928: Access to a protected member _semaphore_per_partition of a client class

(W0212)


[warning] 3929-3929: Access to a protected member _processing_partitions_indexes of a client class

(W0212)


[warning] 3930-3930: Access to a protected member _partition_key_to_index of a client class

(W0212)


[warning] 3931-3931: Access to a protected member _partitions_done_generating_stream_slices of a client class

(W0212)


[convention] 3934-3934: Missing function or method docstring

(C0116)


[warning] 3961-3961: Access to a protected member _partition_router of a client class

(W0212)


[warning] 3978-3978: Access to a protected member _IS_PARTITION_DUPLICATION_LOGGED of a client class

(W0212)


[warning] 3979-3979: Access to a protected member _cursor_per_partition of a client class

(W0212)


[warning] 3980-3980: Access to a protected member _semaphore_per_partition of a client class

(W0212)


[warning] 3981-3981: Access to a protected member _processing_partitions_indexes of a client class

(W0212)


[warning] 3982-3982: Access to a protected member _partition_key_to_index of a client class

(W0212)


[warning] 3983-3983: Access to a protected member _partitions_done_generating_stream_slices of a client class

(W0212)

⏰ Context from checks skipped due to timeout of 90000ms (7)
  • GitHub Check: Check: 'source-pokeapi' (skip=false)
  • GitHub Check: Check: 'source-amplitude' (skip=false)
  • GitHub Check: Check: 'source-hardcoded-records' (skip=false)
  • GitHub Check: Check: 'source-shopify' (skip=false)
  • GitHub Check: Pytest (All, Python 3.10, Ubuntu)
  • GitHub Check: Pytest (All, Python 3.11, Ubuntu)
  • GitHub Check: Pytest (Fast)
🔇 Additional comments (6)
unit_tests/sources/declarative/incremental/test_concurrent_perpartitioncursor.py (6)

3439-3443: Enhanced internal state verification looks solid!

These new assertions effectively verify that the sequence-based partition tracking structures are properly maintained. The checks for _partitions_done_generating_stream_slices, _processing_partitions_indexes, and _partition_key_to_index provide good coverage of the internal state management. The assertion that there's exactly 1 item in each structure makes sense given the test context where one partition is still being processed.

🧰 Tools
🪛 Pylint (3.3.7)

[warning] 3440-3440: Access to a protected member _semaphore_per_partition of a client class

(W0212)


[warning] 3441-3441: Access to a protected member _partitions_done_generating_stream_slices of a client class

(W0212)


[warning] 3442-3442: Access to a protected member _processing_partitions_indexes of a client class

(W0212)


[warning] 3443-3443: Access to a protected member _partition_key_to_index of a client class

(W0212)


3527-3531: Consistent state verification pattern - nice!

Good to see the same internal state assertions being applied consistently across different test scenarios. This helps ensure the partition lifecycle management works correctly in various contexts.

🧰 Tools
🪛 Pylint (3.3.7)

[warning] 3528-3528: Access to a protected member _semaphore_per_partition of a client class

(W0212)


[warning] 3529-3529: Access to a protected member _partitions_done_generating_stream_slices of a client class

(W0212)


[warning] 3530-3530: Access to a protected member _processing_partitions_indexes of a client class

(W0212)


[warning] 3531-3531: Access to a protected member _partition_key_to_index of a client class

(W0212)


3606-3611: Excellent cleanup verification!

These assertions are crucial for verifying that the memory issues mentioned in the PR title are properly addressed. Ensuring all internal tracking structures are cleaned up to length 0 after partition closure is exactly what we need to prevent memory leaks, wdyt?

🧰 Tools
🪛 Pylint (3.3.7)

[warning] 3608-3608: Access to a protected member _semaphore_per_partition of a client class

(W0212)


[warning] 3609-3609: Access to a protected member _partitions_done_generating_stream_slices of a client class

(W0212)


[warning] 3610-3610: Access to a protected member _processing_partitions_indexes of a client class

(W0212)


[warning] 3611-3611: Access to a protected member _partition_key_to_index of a client class

(W0212)


3730-3733: Comprehensive state tracking verification!

The addition of _processing_partitions_indexes and _partition_key_to_index checks, along with the specific verification of _partition_parent_state_map contents, provides thorough coverage of the new sequence-based tracking. The assertions verify both the structure sizes and the actual content, which is great for catching logic errors.

🧰 Tools
🪛 Pylint (3.3.7)

[warning] 3730-3730: Access to a protected member _processing_partitions_indexes of a client class

(W0212)


[warning] 3731-3731: Access to a protected member _partition_key_to_index of a client class

(W0212)


[warning] 3732-3732: Access to a protected member _partition_parent_state_map of a client class

(W0212)


[warning] 3733-3733: Access to a protected member _partition_parent_state_map of a client class

(W0212)


3740-3743: Perfect cleanup validation!

These assertions ensure that after closing partitions, all the new tracking structures are properly cleaned up, which directly addresses the memory management concerns mentioned in the PR objectives.

🧰 Tools
🪛 Pylint (3.3.7)

[warning] 3740-3740: Access to a protected member _partitions_done_generating_stream_slices of a client class

(W0212)


[warning] 3741-3741: Access to a protected member _semaphore_per_partition of a client class

(W0212)


[warning] 3742-3742: Access to a protected member _processing_partitions_indexes of a client class

(W0212)


[warning] 3743-3743: Access to a protected member _partition_key_to_index of a client class

(W0212)


3803-3865: Excellent duplicate partition test coverage!

This test effectively covers the scenario where a duplicate partition appears after the original cursor has been cleaned up. The test structure is well-organized:

  1. Sets up the partition sequence correctly (1 → 2 → 1)
  2. Uses lazy iteration to ensure proper timing
  3. Verifies that no duplicate warning is logged (since the cursor was cleaned)
  4. Confirms all internal structures are properly cleaned up

The logic makes sense - when the first "1" is closed and cleaned up before the second "1" arrives, it should be treated as a new partition rather than a duplicate, wdyt?

🧰 Tools
🪛 Pylint (3.3.7)

[convention] 3803-3803: Missing function or method docstring

(C0116)


[warning] 3839-3839: Access to a protected member _partition_router of a client class

(W0212)


[warning] 3860-3860: Access to a protected member _IS_PARTITION_DUPLICATION_LOGGED of a client class

(W0212)


[warning] 3861-3861: Access to a protected member _semaphore_per_partition of a client class

(W0212)


[warning] 3862-3862: Access to a protected member _processing_partitions_indexes of a client class

(W0212)


[warning] 3863-3863: Access to a protected member _partition_key_to_index of a client class

(W0212)


[warning] 3864-3864: Access to a protected member _partitions_done_generating_stream_slices of a client class

(W0212)

Copy link
Contributor

@maxi297 maxi297 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just one last question

Copy link
Contributor

@maxi297 maxi297 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Approved! Let's test this with the connector

Copy link
Contributor

@lazebnyi lazebnyi left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

lgtm
great job!

Copy link
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 0

♻️ Duplicate comments (2)
airbyte_cdk/sources/declarative/incremental/concurrent_partition_cursor.py (2)

1-1: Address the pipeline failure by running Ruff formatting.

The GitHub Actions pipeline is failing due to formatting issues. Could you run ruff format to fix the code style issues? wdyt?

🧰 Tools
🪛 GitHub Actions: Linters

[error] 1-1: Ruff formatting check failed. File would be reformatted. Run 'ruff format' to fix code style issues.


186-200: Fix the unused variable flagged in past reviews.

The earliest_key variable is still unused as flagged in previous reviews. Since we only need the state and sequence values, could you address this? wdyt?

-            earliest_key, (candidate_state, candidate_seq) = next(
-                iter(self._partition_parent_state_map.items())
-            )
+            _, (candidate_state, candidate_seq) = next(
+                iter(self._partition_parent_state_map.items())
+            )
🧹 Nitpick comments (2)
airbyte_cdk/sources/declarative/incremental/concurrent_partition_cursor.py (2)

110-110: Consider using list literal for initialization.

The static analysis tool suggests using [] instead of list() for better performance and readability. wdyt about this change?

-        self._processing_partitions_indexes: List[int] = list()
+        self._processing_partitions_indexes: List[int] = []
🧰 Tools
🪛 Pylint (3.3.7)

[refactor] 110-110: Consider using [] instead of list()

(R1734)


485-486: Consider error handling for sequence removal.

While the logic looks correct, the remove() operation could potentially raise a ValueError if the sequence isn't found. Would you consider adding error handling here for robustness? wdyt?

         seq = self._partition_key_to_index.pop(partition_key)
-        self._processing_partitions_indexes.remove(seq)
+        try:
+            self._processing_partitions_indexes.remove(seq)
+        except ValueError:
+            logger.warning(f"Sequence {seq} not found in processing indexes for partition {partition_key}")
📜 Review details

Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between dd49c3d and fc4728c.

📒 Files selected for processing (1)
  • airbyte_cdk/sources/declarative/incremental/concurrent_partition_cursor.py (9 hunks)
🧰 Additional context used
🪛 Pylint (3.3.7)
airbyte_cdk/sources/declarative/incremental/concurrent_partition_cursor.py

[refactor] 110-110: Consider using [] instead of list()

(R1734)


[refactor] 292-292: Do not raise StopIteration in generator, use return statement instead

(R1708)

🪛 GitHub Actions: Linters
airbyte_cdk/sources/declarative/incremental/concurrent_partition_cursor.py

[error] 1-1: Ruff formatting check failed. File would be reformatted. Run 'ruff format' to fix code style issues.

⏰ Context from checks skipped due to timeout of 90000ms (10)
  • GitHub Check: Publish SDM to DockerHub
  • GitHub Check: Analyze (python)
  • GitHub Check: Check: 'source-pokeapi' (skip=false)
  • GitHub Check: Check: 'source-amplitude' (skip=false)
  • GitHub Check: Check: 'source-shopify' (skip=false)
  • GitHub Check: Check: 'source-hardcoded-records' (skip=false)
  • GitHub Check: SDM Docker Image Build
  • GitHub Check: Pytest (Fast)
  • GitHub Check: Pytest (All, Python 3.10, Ubuntu)
  • GitHub Check: Pytest (All, Python 3.11, Ubuntu)
🔇 Additional comments (6)
airbyte_cdk/sources/declarative/incremental/concurrent_partition_cursor.py (6)

69-70: Great improvement using constants for tuple indices!

This addresses the previous review suggestion about using named constants instead of magic numbers. Much more readable now!


102-114: The sequence-based tracking architecture looks solid.

The new data structures for managing partition lifecycle and parent state updates appear well-designed. The separation of concerns between sequence tracking, completion status, and index mapping should improve memory management significantly.

🧰 Tools
🪛 Pylint (3.3.7)

[refactor] 110-110: Consider using [] instead of list()

(R1734)


170-177: Good addition of cleanup logic in partition closing.

The integration of _cleanup_if_done here ensures proper resource deallocation when partitions complete. The logic correctly checks both completion status and semaphore value before cleanup.


279-312: Excellent implementation of duplicate detection and sequence tracking.

The early return for duplicate partitions prevents processing issues, and the sequence-based tracking with proper cleanup in the finally block looks robust. The logic correctly handles both normal completion and exceptional cases.

🧰 Tools
🪛 Pylint (3.3.7)

[refactor] 292-292: Do not raise StopIteration in generator, use return statement instead

(R1708)


338-338: Smart use of the new index tracking for partition cleanup.

Using _partition_key_to_index to identify finished partitions for removal is more efficient than the previous approach. This should improve the partition limit enforcement logic.


471-489: Well-designed cleanup method addresses the memory issues.

This method effectively addresses the PR's objective of fixing memory issues by properly cleaning up all partition-related structures. The logic correctly validates completion conditions before cleanup and maintains consistency across all tracking data structures.

Copy link
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 0

🧹 Nitpick comments (2)
airbyte_cdk/sources/declarative/interpolation/jinja.py (2)

148-148: Good catch on removing the cache to address memory issues! But should we consider a bounded cache alternative, wdyt?

Removing the @cache decorator from _find_undeclared_variables makes sense for memory management, especially in concurrent scenarios where many different template strings could accumulate in the cache. However, this might impact performance if the same templates are parsed repeatedly.

Would you consider implementing a bounded cache (like functools.lru_cache(maxsize=128)) as a middle ground between memory efficiency and performance, wdyt?


156-156: Removing template compilation cache addresses memory concerns - any thoughts on performance impact monitoring, wdyt?

Removing the @cache decorator from _compile is a solid approach to prevent memory accumulation from cached Template objects, which can be quite large. This change aligns well with the PR's memory fix objectives.

Since template compilation can be computationally expensive, would it be worth adding some performance monitoring or metrics to track if this change introduces any noticeable performance regression in production, wdyt?

-    # @cache
+    # @functools.lru_cache(maxsize=100)  # Alternative: bounded cache
     def _compile(self, s: str) -> Template:
📜 Review details

Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between fc4728c and 9057702.

📒 Files selected for processing (1)
  • airbyte_cdk/sources/declarative/interpolation/jinja.py (1 hunks)
⏰ Context from checks skipped due to timeout of 90000ms (10)
  • GitHub Check: Publish SDM to DockerHub
  • GitHub Check: Check: 'source-pokeapi' (skip=false)
  • GitHub Check: Check: 'source-amplitude' (skip=false)
  • GitHub Check: Check: 'source-hardcoded-records' (skip=false)
  • GitHub Check: Check: 'source-shopify' (skip=false)
  • GitHub Check: SDM Docker Image Build
  • GitHub Check: Pytest (Fast)
  • GitHub Check: Pytest (All, Python 3.11, Ubuntu)
  • GitHub Check: Pytest (All, Python 3.10, Ubuntu)
  • GitHub Check: Analyze (python)

Copy link

github-actions bot commented Jun 6, 2025

PyTest Results (Fast)

3 659 tests   3 649 ✅  5m 34s ⏱️
    1 suites     10 💤
    1 files        0 ❌

Results for commit d3b3f52.

♻️ This comment has been updated with latest results.

Copy link

github-actions bot commented Jun 6, 2025

PyTest Results (Full)

3 662 tests   3 652 ✅  17m 22s ⏱️
    1 suites     10 💤
    1 files        0 ❌

Results for commit d3b3f52.

♻️ This comment has been updated with latest results.

Copy link
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 0

♻️ Duplicate comments (1)
airbyte_cdk/sources/declarative/incremental/concurrent_partition_cursor.py (1)

186-199: Fix the unused variable flagged by static analysis.

The sequence-based logic for parent state updates looks much more efficient than checking all partitions. However, there's still an unused variable that should be addressed. Since we only need the state and sequence, wdyt about this fix?

-            earliest_key, (candidate_state, candidate_seq) = next(
+            _, (candidate_state, candidate_seq) = next(
                 iter(self._partition_parent_state_map.items())
             )
🧹 Nitpick comments (1)
airbyte_cdk/sources/declarative/incremental/concurrent_partition_cursor.py (1)

102-114: Nice addition of explicit partition lifecycle tracking.

These new data structures provide comprehensive tracking for partition states and sequences, which should help with the memory management issues. The approach using sequence numbers for coordination looks solid.

One minor suggestion - wdyt about using [] instead of list() on line 110 as suggested by static analysis? It's slightly more idiomatic in Python.

-        self._processing_partitions_indexes: List[int] = list()
+        self._processing_partitions_indexes: List[int] = []
🧰 Tools
🪛 Pylint (3.3.7)

[refactor] 110-110: Consider using [] instead of list()

(R1734)

📜 Review details

Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between 9057702 and d3b3f52.

📒 Files selected for processing (1)
  • airbyte_cdk/sources/declarative/incremental/concurrent_partition_cursor.py (7 hunks)
🧰 Additional context used
🧬 Code Graph Analysis (1)
airbyte_cdk/sources/declarative/incremental/concurrent_partition_cursor.py (4)
airbyte_cdk/sources/streams/checkpoint/per_partition_key_serializer.py (1)
  • PerPartitionKeySerializer (7-22)
airbyte_cdk/sources/declarative/incremental/global_substream_cursor.py (3)
  • Timer (50-68)
  • iterate_with_last_flag_and_state (17-47)
  • stream_slices (102-132)
airbyte_cdk/sources/declarative/partition_routers/substream_partition_router.py (2)
  • stream_slices (150-231)
  • logger (421-422)
airbyte_cdk/sources/declarative/incremental/per_partition_cursor.py (1)
  • stream_slices (61-64)
🪛 Pylint (3.3.7)
airbyte_cdk/sources/declarative/incremental/concurrent_partition_cursor.py

[refactor] 110-110: Consider using [] instead of list()

(R1734)


[refactor] 292-292: Do not raise StopIteration in generator, use return statement instead

(R1708)

⏰ Context from checks skipped due to timeout of 90000ms (13)
  • GitHub Check: Pytest (Fast)
  • GitHub Check: Build and Inspect Python Package
  • GitHub Check: Dependency Analysis with Deptry
  • GitHub Check: Ruff Format Check
  • GitHub Check: preview_docs
  • GitHub Check: update_release_draft
  • GitHub Check: MyPy Check
  • GitHub Check: Ruff Lint Check
  • GitHub Check: Validate PR title
  • GitHub Check: SDM Docker Image Build
  • GitHub Check: Pytest (All, Python 3.11, Ubuntu)
  • GitHub Check: Pytest (All, Python 3.10, Ubuntu)
  • GitHub Check: Analyze (python)
🔇 Additional comments (7)
airbyte_cdk/sources/declarative/incremental/concurrent_partition_cursor.py (7)

12-12: LGTM on the import addition.

Adding List to the type imports supports the new type annotations in the code. Good practice for type safety!


69-70: Great improvement using named constants!

This directly addresses the past review feedback about readability. Using _PARENT_STATE = 0 and _GENERATION_SEQUENCE = 1 makes the tuple indexing much clearer than magic numbers.


176-176: Good integration of cleanup logic.

Calling _cleanup_if_done after the global cursor update ensures that completed partitions are properly cleaned up, which should address the memory issues mentioned in the PR title.


279-279: Smart early return for duplicate partitions.

Returning immediately when a duplicate partition is detected prevents unnecessary processing and potential memory issues. This is a clean solution!


284-296: Excellent sequence tracking implementation.

The logic here properly assigns unique sequence numbers to each partition and maintains all the tracking data structures consistently. The sequence-based approach should make parent state coordination much more reliable.

🧰 Tools
🪛 Pylint (3.3.7)

[refactor] 292-292: Do not raise StopIteration in generator, use return statement instead

(R1708)


334-334: Good refinement of partition cleanup detection.

Using _partition_key_to_index to determine which partitions can be removed is more precise than the previous approach. This should work well with the new lifecycle tracking.


467-485: Excellent cleanup method implementation.

This is the core of the memory fix and looks very solid. The method:

  • Properly checks completion conditions before cleanup
  • Maintains consistency across all tracking data structures
  • Uses safe operations (pop with default, discard)
  • Includes helpful debug logging

The implementation should effectively prevent memory leaks from completed partitions. Nice work on this!

@tolik0
Copy link
Contributor Author

tolik0 commented Jun 11, 2025

Regression tests:

@tolik0 tolik0 merged commit 795a896 into main Jun 12, 2025
34 checks passed
@tolik0 tolik0 deleted the tolik0/concurrent-perpartition/fix-memory-issue branch June 12, 2025 11:02
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
bug Something isn't working
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants