-
Notifications
You must be signed in to change notification settings - Fork 24
fix(concurrent-perpartition-cursor): Fix memory issues #568
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
📝 WalkthroughWalkthroughThe update introduces explicit sequence tracking and enhanced cleanup logic for partition lifecycle management in the concurrent partition cursor. Internal data structures are refined to better manage partition states, sequence numbers, and resource cleanup. Unit tests are expanded and new tests are added to cover duplicate partition handling and verify the correctness of internal state management. Changes
Sequence Diagram(s)sequenceDiagram
participant ParentCursor
participant ConcurrentCursor
participant Partition
participant InnerCursor
ParentCursor->>ConcurrentCursor: Create new partition
ConcurrentCursor->>Partition: Assign sequence number, track state
Partition->>InnerCursor: Generate slices
InnerCursor-->>Partition: Return slices
Partition-->>ConcurrentCursor: Mark partition as done
ConcurrentCursor->>ConcurrentCursor: Cleanup partition resources
ConcurrentCursor->>ParentCursor: Update parent state (if all prior partitions complete)
Would you like to also see a diagram for the duplicate partition handling logic, or does this overview suffice for your needs? Wdyt? ✨ Finishing Touches
Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. 🪧 TipsChatThere are 3 ways to chat with CodeRabbit:
SupportNeed help? Create a ticket on our support page for assistance with any issues or questions. Note: Be mindful of the bot's finite context window. It's strongly recommended to break down tasks such as reading entire modules into smaller chunks. For a focused discussion, use review comments to chat about specific files and their changes, instead of using the PR comments. CodeRabbit Commands (Invoked using PR comments)
Other keywords and placeholders
CodeRabbit Configuration File (
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 2
🔭 Outside diff range comments (1)
unit_tests/sources/declarative/incremental/test_concurrent_perpartitioncursor.py (1)
1-1
:⚠️ Potential issueFix the formatting issue to pass pipeline checks
The pipeline is failing due to formatting issues. Could you run
ruff format
on this file to fix the code style issues? This will help ensure the PR passes all checks, wdyt?🧰 Tools
🪛 GitHub Actions: Linters
[error] 1-1: Ruff formatting check failed. File would be reformatted. Run 'ruff format' to fix code style issues.
🧹 Nitpick comments (3)
airbyte_cdk/sources/declarative/incremental/concurrent_partition_cursor.py (1)
102-108
: Well-designed sequence tracking data structures!The choice of data structures is appropriate:
OrderedDict
for maintaining insertion orderdeque
for efficient sequence tracking- Simple counter for sequence generation
Would it be helpful to add a brief comment explaining the relationship between these new attributes? Something like "These attributes work together to track partition processing order and enable safe parent state updates"? wdyt?
unit_tests/sources/declarative/incremental/test_concurrent_perpartitioncursor.py (2)
3439-3440
: Verify the semaphore cleanup behavior aligns with the new implementationI see you've updated the expected semaphore count from 2 to 1, reflecting that semaphores are now cleaned up after partition completion. This looks correct based on the PR's objective to fix memory issues.
The TODO comment mentions adding checks for other internal values - would you like to expand on what specific internal state should be validated here? Perhaps checking
_partition_parent_state_map
or_finished_partitions
could strengthen this test, wdyt?
3523-3524
: Confirm the zero semaphore expectation is correctThe assertion now expects 0 semaphores instead of 1, which is consistent with the more aggressive cleanup behavior mentioned in the AI summary. This aligns well with the memory issue fixes.
Similar to the previous test, the TODO suggests adding more internal state checks. Would it be helpful to validate the state of
_cursor_per_partition
or other cleanup-related internal structures here as well, wdyt?
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro
📒 Files selected for processing (2)
airbyte_cdk/sources/declarative/incremental/concurrent_partition_cursor.py
(6 hunks)unit_tests/sources/declarative/incremental/test_concurrent_perpartitioncursor.py
(2 hunks)
🧰 Additional context used
🪛 GitHub Actions: Linters
unit_tests/sources/declarative/incremental/test_concurrent_perpartitioncursor.py
[error] 1-1: Ruff formatting check failed. File would be reformatted. Run 'ruff format' to fix code style issues.
airbyte_cdk/sources/declarative/incremental/concurrent_partition_cursor.py
[error] 5-29: Ruff: Import block is un-sorted or un-formatted. Organize imports. 1 fixable error with the --fix
option.
⏰ Context from checks skipped due to timeout of 90000ms (9)
- GitHub Check: Check: 'source-hardcoded-records' (skip=false)
- GitHub Check: Check: 'source-amplitude' (skip=false)
- GitHub Check: Check: 'source-shopify' (skip=false)
- GitHub Check: Check: 'source-pokeapi' (skip=false)
- GitHub Check: SDM Docker Image Build
- GitHub Check: Pytest (All, Python 3.10, Ubuntu)
- GitHub Check: Pytest (All, Python 3.11, Ubuntu)
- GitHub Check: Pytest (Fast)
- GitHub Check: Analyze (python)
🔇 Additional comments (4)
airbyte_cdk/sources/declarative/incremental/concurrent_partition_cursor.py (4)
169-171
: Good placement of cleanup logic.Cleaning up resources after updating the global cursor ensures that state is consistent before freeing memory.
180-189
: Elegant sequence-based parent state update logic!The implementation correctly ensures that parent state is only updated when all partitions with lower or equal sequence numbers have been closed. The use of
deque[0]
for O(1) access to the oldest open sequence is efficient.
273-277
: Proper thread-safe sequence assignment.The sequence assignment is correctly synchronized under the lock, ensuring unique sequences for each partition.
323-323
: Consistent partition removal check.Good change! Checking for presence in
_seq_by_partition
is more reliable since cleaned-up partitions are removed from this mapping.
airbyte_cdk/sources/declarative/incremental/concurrent_partition_cursor.py
Outdated
Show resolved
Hide resolved
airbyte_cdk/sources/declarative/incremental/concurrent_partition_cursor.py
Outdated
Show resolved
Hide resolved
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 0
♻️ Duplicate comments (2)
airbyte_cdk/sources/declarative/incremental/concurrent_partition_cursor.py (2)
9-9
: Import formatting issue persists.This is the same import sorting issue that was flagged in previous reviews and is causing pipeline failures. Could you run
ruff --fix
to resolve this automatically? wdyt?
463-481
:⚠️ Potential issueThread safety and performance concerns in cleanup method.
This method has the same issues flagged in previous reviews:
- Thread safety: The method accesses shared data structures without explicit locking documentation
- Performance:
deque.remove(seq)
on line 478 is O(n) operation- Protected member access: Line 470 accesses
_value
which static analysis flags- Logging format: Line 480 should use lazy formatting
Would you consider addressing these concerns as suggested in the previous review? wdyt?
def _cleanup_if_done(self, partition_key: str) -> None: """ Free every in-memory structure that belonged to a completed partition: cursor, semaphore, flag inside `_finished_partitions` + + Note: This method must be called while holding self._lock """ + assert self._lock.locked(), "_cleanup_if_done must be called while holding the lock" + if not ( partition_key in self._finished_partitions - and self._semaphore_per_partition[partition_key]._value == 0 + and self._semaphore_per_partition[partition_key]._value == 0 # type: ignore[attr-defined] ): return self._semaphore_per_partition.pop(partition_key, None) self._finished_partitions.discard(partition_key) - seq = self._seq_by_partition.pop(partition_key) - self._open_seqs.remove(seq) + seq = self._seq_by_partition.pop(partition_key, None) + if seq is not None: + try: + self._open_seqs.remove(seq) + except ValueError: + logger.warning("Sequence %s not found in open_seqs for partition %s", seq, partition_key) - logger.debug(f"Partition {partition_key} fully processed and cleaned up.") + logger.debug("Partition %s fully processed and cleaned up.", partition_key)🧰 Tools
🪛 Pylint (3.3.7)
[warning] 470-470: Access to a protected member _value of a client class
(W0212)
[warning] 480-480: Use lazy % formatting in logging functions
(W1203)
🧹 Nitpick comments (1)
airbyte_cdk/sources/declarative/incremental/concurrent_partition_cursor.py (1)
182-200
: Address the unused variable warning.The static analysis tool flagged
earliest_key
as unused on line 186. Since you're only using the value part of the tuple, would you consider using_
for the unused key? wdyt?- earliest_key, (candidate_state, candidate_seq) = next( + _, (candidate_state, candidate_seq) = next( iter(self._partition_parent_state_map.items()) )🧰 Tools
🪛 Pylint (3.3.7)
[warning] 186-186: Unused variable 'earliest_key'
(W0612)
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro
📒 Files selected for processing (2)
airbyte_cdk/sources/declarative/incremental/concurrent_partition_cursor.py
(6 hunks)unit_tests/sources/declarative/incremental/test_concurrent_perpartitioncursor.py
(4 hunks)
🧰 Additional context used
🧬 Code Graph Analysis (1)
airbyte_cdk/sources/declarative/incremental/concurrent_partition_cursor.py (3)
airbyte_cdk/sources/streams/checkpoint/per_partition_key_serializer.py (1)
PerPartitionKeySerializer
(7-22)airbyte_cdk/sources/declarative/incremental/global_substream_cursor.py (1)
Timer
(50-68)airbyte_cdk/sources/streams/concurrent/cursor.py (1)
_emit_state_message
(274-283)
🪛 Pylint (3.3.7)
airbyte_cdk/sources/declarative/incremental/concurrent_partition_cursor.py
[warning] 186-186: Unused variable 'earliest_key'
(W0612)
[refactor] 288-288: Do not raise StopIteration in generator, use return statement instead
(R1708)
[warning] 470-470: Access to a protected member _value of a client class
(W0212)
[warning] 480-480: Use lazy % formatting in logging functions
(W1203)
unit_tests/sources/declarative/incremental/test_concurrent_perpartitioncursor.py
[warning] 3440-3440: Access to a protected member _semaphore_per_partition of a client class
(W0212)
[warning] 3441-3441: Access to a protected member _finished_partitions of a client class
(W0212)
[warning] 3442-3442: Access to a protected member _open_seqs of a client class
(W0212)
[warning] 3443-3443: Access to a protected member _seq_by_partition of a client class
(W0212)
[warning] 3528-3528: Access to a protected member _semaphore_per_partition of a client class
(W0212)
[warning] 3529-3529: Access to a protected member _finished_partitions of a client class
(W0212)
[warning] 3530-3530: Access to a protected member _open_seqs of a client class
(W0212)
[warning] 3531-3531: Access to a protected member _seq_by_partition of a client class
(W0212)
[warning] 3608-3608: Access to a protected member _semaphore_per_partition of a client class
(W0212)
[warning] 3609-3609: Access to a protected member _finished_partitions of a client class
(W0212)
[warning] 3610-3610: Access to a protected member _open_seqs of a client class
(W0212)
[warning] 3611-3611: Access to a protected member _seq_by_partition of a client class
(W0212)
[warning] 3730-3730: Access to a protected member _open_seqs of a client class
(W0212)
[warning] 3731-3731: Access to a protected member _seq_by_partition of a client class
(W0212)
[warning] 3732-3732: Access to a protected member _partition_parent_state_map of a client class
(W0212)
[warning] 3733-3733: Access to a protected member _partition_parent_state_map of a client class
(W0212)
[warning] 3740-3740: Access to a protected member _finished_partitions of a client class
(W0212)
[warning] 3741-3741: Access to a protected member _semaphore_per_partition of a client class
(W0212)
[warning] 3742-3742: Access to a protected member _open_seqs of a client class
(W0212)
[warning] 3743-3743: Access to a protected member _seq_by_partition of a client class
(W0212)
🪛 GitHub Actions: Linters
airbyte_cdk/sources/declarative/incremental/concurrent_partition_cursor.py
[error] 5-29: Ruff import sorting error: Import block is un-sorted or un-formatted. Run 'ruff --fix' to automatically fix import order.
⏰ Context from checks skipped due to timeout of 90000ms (6)
- GitHub Check: Check: 'source-amplitude' (skip=false)
- GitHub Check: Check: 'source-shopify' (skip=false)
- GitHub Check: Check: 'source-hardcoded-records' (skip=false)
- GitHub Check: Pytest (All, Python 3.11, Ubuntu)
- GitHub Check: Pytest (All, Python 3.10, Ubuntu)
- GitHub Check: Pytest (Fast)
🔇 Additional comments (9)
airbyte_cdk/sources/declarative/incremental/concurrent_partition_cursor.py (4)
102-114
: The sequence tracking data structures look well-designed.The new attributes for sequence-based tracking provide a clear approach to managing partition lifecycle and parent state updates. The type hints are correct and the comments are helpful.
175-176
: Good addition of cleanup call.Adding the cleanup call after partition processing ensures proper resource deallocation. This aligns well with the new sequence-based tracking system.
280-293
: The sequence assignment logic looks correct.The implementation properly assigns unique sequence numbers and tracks them in the appropriate data structures. The parent state mapping with sequence numbers is a good approach for coordinating updates.
🧰 Tools
🪛 Pylint (3.3.7)
[refactor] 288-288: Do not raise StopIteration in generator, use return statement instead
(R1708)
330-330
:✅ Verification successful
Verify the cleanup detection logic change.
The change from checking
_finished_partitions
to checking_seq_by_partition
for cleanup detection makes sense with the new sequence tracking. Could you confirm this correctly identifies partitions that have been fully processed and cleaned up? wdyt?
🏁 Script executed:
#!/bin/bash # Description: Search for other uses of _finished_partitions to ensure consistency rg "_finished_partitions" -A 3 -B 3Length of output: 8910
🏁 Script executed:
#!/bin/bash # Display code context around lines 250-360 to locate the cleanup logic sed -n '250,360p' airbyte_cdk/sources/declarative/incremental/concurrent_partition_cursor.pyLength of output: 5581
Verify cleanup detection via
_seq_by_partition
It looks like
_ensure_partition_limit
now drops partitions whenpartition_key not in self._seq_by_partition
, which corresponds to partitions that have already been cleaned up by_cleanup_if_done
(where we pop from_seq_by_partition
). Given that:
- We add an entry to
_seq_by_partition
at slice generation start.- We only remove that entry in
_cleanup_if_done
once the last slice is emitted and the semaphore count is zero.That means by the time
_ensure_partition_limit
runs, any partition absent from_seq_by_partition
has indeed finished and been cleaned up. wdyt?• No other code still uses
_finished_partitions
for removal—cleanup and limit logic are now consistently driven by_seq_by_partition
.resolve_review_comment
unit_tests/sources/declarative/incremental/test_concurrent_perpartitioncursor.py (5)
3439-3443
: Excellent addition of sequence-based tracking validation!These new assertions effectively test the internal state management for the sequence-based partition tracking. The checks for
_finished_partitions
,_open_seqs
, and_seq_by_partition
align perfectly with the memory leak fixes mentioned in the PR.One small question - would it be helpful to also verify that the counts match expected values rather than just checking lengths? For instance, asserting that
_open_seqs
contains exactly the expected sequence numbers, wdyt?🧰 Tools
🪛 Pylint (3.3.7)
[warning] 3440-3440: Access to a protected member _semaphore_per_partition of a client class
(W0212)
[warning] 3441-3441: Access to a protected member _finished_partitions of a client class
(W0212)
[warning] 3442-3442: Access to a protected member _open_seqs of a client class
(W0212)
[warning] 3443-3443: Access to a protected member _seq_by_partition of a client class
(W0212)
3527-3531
: Consistent cleanup verification patterns - nicely done!The pattern of verifying all internal data structures are properly managed is consistent across test methods. This helps ensure the sequence-based cleanup logic works correctly in different scenarios.
🧰 Tools
🪛 Pylint (3.3.7)
[warning] 3528-3528: Access to a protected member _semaphore_per_partition of a client class
(W0212)
[warning] 3529-3529: Access to a protected member _finished_partitions of a client class
(W0212)
[warning] 3530-3530: Access to a protected member _open_seqs of a client class
(W0212)
[warning] 3531-3531: Access to a protected member _seq_by_partition of a client class
(W0212)
3606-3611
: Thorough cleanup validation for memory leak prevention!These assertions are crucial for verifying that the memory issues are resolved. Checking that all internal variables (
_semaphore_per_partition
,_finished_partitions
,_open_seqs
,_seq_by_partition
) are cleaned up after processing ensures no resource leaks.🧰 Tools
🪛 Pylint (3.3.7)
[warning] 3608-3608: Access to a protected member _semaphore_per_partition of a client class
(W0212)
[warning] 3609-3609: Access to a protected member _finished_partitions of a client class
(W0212)
[warning] 3610-3610: Access to a protected member _open_seqs of a client class
(W0212)
[warning] 3611-3611: Access to a protected member _seq_by_partition of a client class
(W0212)
3730-3733
: Smart validation of sequence-based parent state mapping!The assertions verify that
_partition_parent_state_map
entries now include sequence numbers as expected from the refactoring. Lines 3732-3733 effectively test that the partition keys map to tuples containing both parent state and sequence information.Just wondering - should we also validate that the sequence numbers are in the expected order or range, wdyt?
🧰 Tools
🪛 Pylint (3.3.7)
[warning] 3730-3730: Access to a protected member _open_seqs of a client class
(W0212)
[warning] 3731-3731: Access to a protected member _seq_by_partition of a client class
(W0212)
[warning] 3732-3732: Access to a protected member _partition_parent_state_map of a client class
(W0212)
[warning] 3733-3733: Access to a protected member _partition_parent_state_map of a client class
(W0212)
3740-3743
: Comprehensive cleanup verification after partition closure!The test properly validates that closing partitions triggers the cleanup of all sequence-related tracking structures. This is essential for confirming the memory issue fixes work as intended.
🧰 Tools
🪛 Pylint (3.3.7)
[warning] 3740-3740: Access to a protected member _finished_partitions of a client class
(W0212)
[warning] 3741-3741: Access to a protected member _semaphore_per_partition of a client class
(W0212)
[warning] 3742-3742: Access to a protected member _open_seqs of a client class
(W0212)
[warning] 3743-3743: Access to a protected member _seq_by_partition of a client class
(W0212)
/autofix
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 0
♻️ Duplicate comments (1)
airbyte_cdk/sources/declarative/incremental/concurrent_partition_cursor.py (1)
463-481
: 🛠️ Refactor suggestionAddress thread safety documentation and performance concerns.
This is a duplicate of previous feedback about the cleanup method. The implementation has several areas for improvement that were identified in past reviews:
- Protected member access: Line 470 accesses
_semaphore_per_partition[partition_key]._value
which is a protected member- Performance:
deque.remove(seq)
on line 478 is O(n) operation- Error handling: Missing error handling for edge cases
- Logging format: Should use lazy formatting
Here's an improved version addressing these concerns:
def _cleanup_if_done(self, partition_key: str) -> None: """ Free every in-memory structure that belonged to a completed partition: cursor, semaphore, flag inside `_finished_partitions` + + Note: This method must be called while holding self._lock """ + assert self._lock.locked(), "_cleanup_if_done must be called while holding the lock" + if not ( partition_key in self._finished_partitions - and self._semaphore_per_partition[partition_key]._value == 0 + and self._semaphore_per_partition.get(partition_key, threading.Semaphore(1)).acquire(blocking=False) ): return + # Release the semaphore we just acquired for the check + if partition_key in self._semaphore_per_partition: + self._semaphore_per_partition[partition_key].release() self._semaphore_per_partition.pop(partition_key, None) self._finished_partitions.discard(partition_key) - seq = self._seq_by_partition.pop(partition_key) - self._open_seqs.remove(seq) + seq = self._seq_by_partition.pop(partition_key, None) + if seq is not None: + try: + self._open_seqs.remove(seq) + except ValueError: + logger.warning("Sequence %s not found in open_seqs for partition %s", seq, partition_key) - logger.debug(f"Partition {partition_key} fully processed and cleaned up.") + logger.debug("Partition %s fully processed and cleaned up.", partition_key)What do you think about this approach? The semaphore check is still not ideal due to the race condition, but it's safer than accessing protected members. wdyt?
🧰 Tools
🪛 Pylint (3.3.7)
[warning] 470-470: Access to a protected member _value of a client class
(W0212)
[warning] 480-480: Use lazy % formatting in logging functions
(W1203)
🧹 Nitpick comments (1)
airbyte_cdk/sources/declarative/incremental/concurrent_partition_cursor.py (1)
186-196
: Fix unused variable and consider edge case.The logic improvement using sequence numbers is great! However, there's an unused variable that should be removed.
- while self._partition_parent_state_map: - earliest_key, (candidate_state, candidate_seq) = next( - iter(self._partition_parent_state_map.items()) - ) + while self._partition_parent_state_map: + _, (candidate_state, candidate_seq) = next( + iter(self._partition_parent_state_map.items()) + )Also, wdyt about adding a safety check for empty
_open_seqs
to be more explicit about the condition?🧰 Tools
🪛 Pylint (3.3.7)
[warning] 186-186: Unused variable 'earliest_key'
(W0612)
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro
📒 Files selected for processing (1)
airbyte_cdk/sources/declarative/incremental/concurrent_partition_cursor.py
(6 hunks)
🧰 Additional context used
🧬 Code Graph Analysis (1)
airbyte_cdk/sources/declarative/incremental/concurrent_partition_cursor.py (3)
airbyte_cdk/sources/streams/checkpoint/per_partition_key_serializer.py (1)
PerPartitionKeySerializer
(7-22)airbyte_cdk/sources/declarative/incremental/global_substream_cursor.py (1)
Timer
(50-68)airbyte_cdk/sources/streams/concurrent/cursor.py (1)
_emit_state_message
(274-283)
🪛 Pylint (3.3.7)
airbyte_cdk/sources/declarative/incremental/concurrent_partition_cursor.py
[warning] 186-186: Unused variable 'earliest_key'
(W0612)
[refactor] 288-288: Do not raise StopIteration in generator, use return statement instead
(R1708)
[warning] 470-470: Access to a protected member _value of a client class
(W0212)
[warning] 480-480: Use lazy % formatting in logging functions
(W1203)
⏰ Context from checks skipped due to timeout of 90000ms (7)
- GitHub Check: Check: 'source-amplitude' (skip=false)
- GitHub Check: Check: 'source-shopify' (skip=false)
- GitHub Check: Check: 'source-pokeapi' (skip=false)
- GitHub Check: Check: 'source-hardcoded-records' (skip=false)
- GitHub Check: Pytest (Fast)
- GitHub Check: Pytest (All, Python 3.10, Ubuntu)
- GitHub Check: Pytest (All, Python 3.11, Ubuntu)
🔇 Additional comments (7)
airbyte_cdk/sources/declarative/incremental/concurrent_partition_cursor.py (7)
9-9
: Import looks good!The addition of
deque
import is necessary for the new sequence tracking functionality.
102-104
: Nice enhancement to the data structure!Storing tuples with both parent state and sequence number enables better coordination of parent state updates. The type annotation clearly indicates the structure.
109-114
: Good addition of sequence tracking attributes!The new attributes provide the foundation for proper partition lifecycle management and resource cleanup. The comments clearly explain their purpose.
175-176
: Excellent addition of cleanup logic!Calling cleanup after partition processing ensures proper resource deallocation and prevents memory leaks.
280-284
: Solid sequence assignment implementation!The sequence generation and tracking logic is clean and thread-safe within the lock context.
289-292
: Good integration of sequence numbers!The parent state map now properly stores the sequence number alongside the state, enabling the improved coordination logic.
330-330
: Smart optimization for cleanup detection!Using
_seq_by_partition
to identify finished partitions is more efficient than checking multiple conditions.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
My understanding of this PR:
- We stopped relying on the
_finished_partitions
to clean the parent state. This allows us to do the following point - We clean everything once we close a partition which the partition key has been fully processed (i.e. stream slices were created and were all closed)
The drawback is that we now have an integer in memory for each partition (which should be very limited as an impact). The thing I haven't thought of is how this works if the same partition key is emitted multiple time by the parent stream. Let's say we have:
- Parent record ID = 1
- Parent record ID = 2
- Parent record ID = ...
- Parent record ID = 1000
- Parent record ID = 1
It feels like there are two cases here:
- The memory items related to parent record ID = 1 are cleaned (because all stream slices have been processed) before we emit parent record ID = 1 again which means that we will cleaned stuff up for parent record ID = 1 and in that case, it is like we have never processed parent record ID = 1
- We are still processing the stream slices for parent record ID = 1 before we emit it a second time. In that case, does that mean that we won't clean all the parent ids from 2 to 1000 until we reprocess all the parent record ID = 1 again?
airbyte_cdk/sources/declarative/incremental/concurrent_partition_cursor.py
Outdated
Show resolved
Hide resolved
airbyte_cdk/sources/declarative/incremental/concurrent_partition_cursor.py
Outdated
Show resolved
Hide resolved
airbyte_cdk/sources/declarative/incremental/concurrent_partition_cursor.py
Outdated
Show resolved
Hide resolved
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 1
🧹 Nitpick comments (5)
airbyte_cdk/sources/declarative/incremental/concurrent_partition_cursor.py (4)
102-114
: Consider adding documentation for the new complex state tracking.The new data structures introduce sophisticated partition lifecycle management. While the implementation looks correct, the relationships between these structures could benefit from more documentation. Would you consider adding docstrings explaining how these work together? For example:
+ # Sequence-based partition tracking for memory management and parent state coordination: + # - _processing_partitions_indexes: Active partition sequence numbers (sorted) + # - _partition_key_to_index: Maps partition keys to their sequence numbers + # - _generated_partitions_count: Monotonic counter for sequence generation self._partition_parent_state_map: OrderedDict[str, tuple[Mapping[str, Any], int]] = ( OrderedDict() )🧰 Tools
🪛 Pylint (3.3.7)
[refactor] 110-110: Consider using [] instead of list()
(R1734)
284-288
: Consider using a more Pythonic list initialization.The sequence tracking logic looks correct and thread-safe since it's under the lock. However, static analysis suggests using
[]
instead oflist()
for initialization. Also, consider if using a deque might be more efficient for the frequent append/remove operations on_processing_partitions_indexes
. Wdyt?- self._processing_partitions_indexes: List[int] = list() + self._processing_partitions_indexes: List[int] = []
467-485
: Review the cleanup method for potential improvements.The cleanup method addresses the memory issues well, but there are a few considerations:
Performance:
list.remove(seq)
is O(n). Given that this could be called frequently, would using a more efficient data structure help?Logging: Static analysis suggests using lazy formatting for the debug log.
Error handling: Should we handle the case where
seq
might not be in_processing_partitions_indexes
?def _cleanup_if_done(self, partition_key: str) -> None: """ Free every in-memory structure that belonged to a completed partition: cursor, semaphore, flag inside `_finished_partitions` + + Note: This method must be called while holding self._lock """ if not ( partition_key in self._partitions_done_generating_stream_slices and self._semaphore_per_partition[partition_key]._value == 0 ): return self._semaphore_per_partition.pop(partition_key, None) self._partitions_done_generating_stream_slices.discard(partition_key) - seq = self._partition_key_to_index.pop(partition_key) - self._processing_partitions_indexes.remove(seq) + seq = self._partition_key_to_index.pop(partition_key, None) + if seq is not None: + try: + self._processing_partitions_indexes.remove(seq) + except ValueError: + logger.warning("Sequence %s not found in processing indexes for partition %s", seq, partition_key) - logger.debug(f"Partition {partition_key} fully processed and cleaned up.") + logger.debug("Partition %s fully processed and cleaned up.", partition_key)🧰 Tools
🪛 Pylint (3.3.7)
[warning] 474-474: Access to a protected member _value of a client class
(W0212)
[warning] 484-484: Use lazy % formatting in logging functions
(W1203)
171-171
: Consider alternatives to accessing semaphore's protected _value attribute.The code accesses
_value
on the semaphore, which is a protected member. While this works, it might be more robust to use the public API or track the semaphore count separately. However, I understand this might be necessary for the current implementation. Wdyt about documenting why this approach is needed?Also applies to: 474-474
🧰 Tools
🪛 Pylint (3.3.7)
[warning] 171-171: Access to a protected member _value of a client class
(W0212)
unit_tests/sources/declarative/incremental/test_concurrent_perpartitioncursor.py (1)
3793-3800
: Simple and focused helper functionThe
_make_inner_cursor
helper function is well-designed - it creates a properly configured mock cursor with minimal setup. The function is focused and reusable.One small suggestion: would it be worth adding a docstring that's a bit more descriptive about the mock's behavior, particularly that it yields exactly one slice? The current docstring is good but could be slightly more detailed, wdyt?
def _make_inner_cursor(ts: str) -> MagicMock: - """Return an inner cursor that yields exactly one slice and has a proper state.""" + """ + Return a mock inner cursor for testing. + + Args: + ts: The timestamp to use for the cursor state + + Returns: + A MagicMock configured to yield exactly one dummy slice and return the given timestamp as state + """
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro
📒 Files selected for processing (2)
airbyte_cdk/sources/declarative/incremental/concurrent_partition_cursor.py
(7 hunks)unit_tests/sources/declarative/incremental/test_concurrent_perpartitioncursor.py
(5 hunks)
🧰 Additional context used
🪛 Pylint (3.3.7)
airbyte_cdk/sources/declarative/incremental/concurrent_partition_cursor.py
[refactor] 110-110: Consider using [] instead of list()
(R1734)
[warning] 171-171: Access to a protected member _value of a client class
(W0212)
[warning] 186-186: Unused variable 'earliest_key'
(W0612)
[refactor] 292-292: Do not raise StopIteration in generator, use return statement instead
(R1708)
[warning] 474-474: Access to a protected member _value of a client class
(W0212)
[warning] 484-484: Use lazy % formatting in logging functions
(W1203)
unit_tests/sources/declarative/incremental/test_concurrent_perpartitioncursor.py
[warning] 3440-3440: Access to a protected member _semaphore_per_partition of a client class
(W0212)
[warning] 3441-3441: Access to a protected member _partitions_done_generating_stream_slices of a client class
(W0212)
[warning] 3442-3442: Access to a protected member _processing_partitions_indexes of a client class
(W0212)
[warning] 3443-3443: Access to a protected member _partition_key_to_index of a client class
(W0212)
[warning] 3528-3528: Access to a protected member _semaphore_per_partition of a client class
(W0212)
[warning] 3529-3529: Access to a protected member _partitions_done_generating_stream_slices of a client class
(W0212)
[warning] 3530-3530: Access to a protected member _processing_partitions_indexes of a client class
(W0212)
[warning] 3531-3531: Access to a protected member _partition_key_to_index of a client class
(W0212)
[warning] 3608-3608: Access to a protected member _semaphore_per_partition of a client class
(W0212)
[warning] 3609-3609: Access to a protected member _partitions_done_generating_stream_slices of a client class
(W0212)
[warning] 3610-3610: Access to a protected member _processing_partitions_indexes of a client class
(W0212)
[warning] 3611-3611: Access to a protected member _partition_key_to_index of a client class
(W0212)
[warning] 3730-3730: Access to a protected member _processing_partitions_indexes of a client class
(W0212)
[warning] 3731-3731: Access to a protected member _partition_key_to_index of a client class
(W0212)
[warning] 3732-3732: Access to a protected member _partition_parent_state_map of a client class
(W0212)
[warning] 3733-3733: Access to a protected member _partition_parent_state_map of a client class
(W0212)
[warning] 3740-3740: Access to a protected member _partitions_done_generating_stream_slices of a client class
(W0212)
[warning] 3741-3741: Access to a protected member _semaphore_per_partition of a client class
(W0212)
[warning] 3742-3742: Access to a protected member _processing_partitions_indexes of a client class
(W0212)
[warning] 3743-3743: Access to a protected member _partition_key_to_index of a client class
(W0212)
[convention] 3803-3803: Missing function or method docstring
(C0116)
[warning] 3839-3839: Access to a protected member _partition_router of a client class
(W0212)
[warning] 3860-3860: Access to a protected member _IS_PARTITION_DUPLICATION_LOGGED of a client class
(W0212)
[warning] 3861-3861: Access to a protected member _semaphore_per_partition of a client class
(W0212)
[warning] 3862-3862: Access to a protected member _processing_partitions_indexes of a client class
(W0212)
[warning] 3863-3863: Access to a protected member _partition_key_to_index of a client class
(W0212)
[convention] 3866-3866: Missing function or method docstring
(C0116)
[warning] 3893-3893: Access to a protected member _partition_router of a client class
(W0212)
[warning] 3910-3910: Access to a protected member _IS_PARTITION_DUPLICATION_LOGGED of a client class
(W0212)
[warning] 3911-3911: Access to a protected member _cursor_per_partition of a client class
(W0212)
[warning] 3912-3912: Access to a protected member _semaphore_per_partition of a client class
(W0212)
[warning] 3913-3913: Access to a protected member _processing_partitions_indexes of a client class
(W0212)
[warning] 3914-3914: Access to a protected member _partition_key_to_index of a client class
(W0212)
⏰ Context from checks skipped due to timeout of 90000ms (5)
- GitHub Check: Check: 'source-shopify' (skip=false)
- GitHub Check: Check: 'source-amplitude' (skip=false)
- GitHub Check: Pytest (Fast)
- GitHub Check: Pytest (All, Python 3.10, Ubuntu)
- GitHub Check: Pytest (All, Python 3.11, Ubuntu)
🔇 Additional comments (10)
airbyte_cdk/sources/declarative/incremental/concurrent_partition_cursor.py (3)
69-70
: Constants look good, but consider the past feedback about readability.Nice addition of the constants! This addresses the previous feedback about using
PARENT_STATE = 0
andGENERATION_SEQUENCE = 1
for better readability. The naming clearly indicates their purpose in tuple indexing.
279-279
: Early return for duplicates is a clean solution.This early return approach for handling duplicate partitions is much cleaner than the previous logic. It prevents unnecessary processing and maintains the warning for debugging purposes. Nice work!
170-177
:✅ Verification successful
Good integration of cleanup logic, but verify thread safety.
The cleanup integration looks well-placed after the global cursor update. However, I want to verify that
_cleanup_if_done
is always called under the lock as intended, since it accesses shared data structures. The implementation looks correct, but wdyt about adding an assertion to make this requirement explicit?
🏁 Script executed:
#!/bin/bash # Verify that _cleanup_if_done is only called from contexts that hold the lock rg -A 5 -B 5 "_cleanup_if_done"Length of output: 2563
🏁 Script executed:
#!/bin/bash # Inspect the initial part of the concurrent_partition_cursor to locate lock definitions and usage sed -n '1,300p' airbyte_cdk/sources/declarative/incremental/concurrent_partition_cursor.pyLength of output: 12533
Thread safety confirmed for
_cleanup_if_done
I checked the calls to
_cleanup_if_done
and it’s only ever invoked inside thewith self._lock:
block inclose_partition
(no other call sites). The lock guarantees that shared state is protected. Adding an assertion in_cleanup_if_done
(e.g. switching to anRLock
and asserting ownership) could make this even more explicit, but isn’t strictly required given the current usage.Wdyt—should we leave it as is or introduce an explicit lock-ownership check?
🧰 Tools
🪛 Pylint (3.3.7)
[warning] 171-171: Access to a protected member _value of a client class
(W0212)
unit_tests/sources/declarative/incremental/test_concurrent_perpartitioncursor.py (7)
3439-3443
: LGTM: Enhanced internal state tracking validationThese new assertions properly validate the sequence-based partition tracking structures mentioned in the PR objectives. The checks ensure that internal state management is working correctly after partition operations.
The static analysis warnings about accessing protected members are expected and appropriate in test code that needs to verify internal implementation details, wdyt?
🧰 Tools
🪛 Pylint (3.3.7)
[warning] 3440-3440: Access to a protected member _semaphore_per_partition of a client class
(W0212)
[warning] 3441-3441: Access to a protected member _partitions_done_generating_stream_slices of a client class
(W0212)
[warning] 3442-3442: Access to a protected member _processing_partitions_indexes of a client class
(W0212)
[warning] 3443-3443: Access to a protected member _partition_key_to_index of a client class
(W0212)
3527-3531
: Consistent internal state validationGood consistency with the previous assertions pattern. These checks validate the same internal tracking structures to ensure proper cleanup behavior across different test scenarios.
🧰 Tools
🪛 Pylint (3.3.7)
[warning] 3528-3528: Access to a protected member _semaphore_per_partition of a client class
(W0212)
[warning] 3529-3529: Access to a protected member _partitions_done_generating_stream_slices of a client class
(W0212)
[warning] 3530-3530: Access to a protected member _processing_partitions_indexes of a client class
(W0212)
[warning] 3531-3531: Access to a protected member _partition_key_to_index of a client class
(W0212)
3606-3611
: Comprehensive cleanup verificationExcellent addition of comprehensive cleanup verification! This ensures that all the new internal tracking structures are properly cleaned up when all partitions are finished, which is crucial for preventing memory leaks in the concurrent processing implementation.
🧰 Tools
🪛 Pylint (3.3.7)
[warning] 3608-3608: Access to a protected member _semaphore_per_partition of a client class
(W0212)
[warning] 3609-3609: Access to a protected member _partitions_done_generating_stream_slices of a client class
(W0212)
[warning] 3610-3610: Access to a protected member _processing_partitions_indexes of a client class
(W0212)
[warning] 3611-3611: Access to a protected member _partition_key_to_index of a client class
(W0212)
3730-3733
: Detailed partition state verificationGreat addition of more granular assertions that verify specific parent state values in addition to collection lengths. This provides better test coverage of the partition lifecycle and parent state management logic.
🧰 Tools
🪛 Pylint (3.3.7)
[warning] 3730-3730: Access to a protected member _processing_partitions_indexes of a client class
(W0212)
[warning] 3731-3731: Access to a protected member _partition_key_to_index of a client class
(W0212)
[warning] 3732-3732: Access to a protected member _partition_parent_state_map of a client class
(W0212)
[warning] 3733-3733: Access to a protected member _partition_parent_state_map of a client class
(W0212)
3740-3743
: Final cleanup validationPerfect final verification that all internal structures are cleaned up after partition processing completes. This completes the comprehensive cleanup testing strategy.
🧰 Tools
🪛 Pylint (3.3.7)
[warning] 3740-3740: Access to a protected member _partitions_done_generating_stream_slices of a client class
(W0212)
[warning] 3741-3741: Access to a protected member _semaphore_per_partition of a client class
(W0212)
[warning] 3742-3742: Access to a protected member _processing_partitions_indexes of a client class
(W0212)
[warning] 3743-3743: Access to a protected member _partition_key_to_index of a client class
(W0212)
3803-3864
: Excellent duplicate partition handling testThis test provides great coverage for the duplicate partition scenario where partitions are cleaned up before duplicates arrive. The test logic is clear and well-structured:
- Sets up a sequence of partitions with a duplicate
- Processes them with cleanup between iterations
- Verifies that no duplication is detected and cleanup works properly
The test covers an important edge case in concurrent partition processing. Well done!
🧰 Tools
🪛 Pylint (3.3.7)
[convention] 3803-3803: Missing function or method docstring
(C0116)
[warning] 3839-3839: Access to a protected member _partition_router of a client class
(W0212)
[warning] 3860-3860: Access to a protected member _IS_PARTITION_DUPLICATION_LOGGED of a client class
(W0212)
[warning] 3861-3861: Access to a protected member _semaphore_per_partition of a client class
(W0212)
[warning] 3862-3862: Access to a protected member _processing_partitions_indexes of a client class
(W0212)
[warning] 3863-3863: Access to a protected member _partition_key_to_index of a client class
(W0212)
3866-3914
: Thorough duplicate detection during processingExcellent test for detecting duplicates while partitions are still being processed! This covers the complementary scenario to the previous test. Key strengths:
- Tests the case where duplicates are detected before cleanup
- Verifies that the warning logging flag is properly set
- Ensures that only unique partitions are emitted (2 out of 3)
- Validates proper cleanup after processing
This provides comprehensive coverage of duplicate partition handling scenarios, which is crucial for the reliability of concurrent processing, wdyt?
🧰 Tools
🪛 Pylint (3.3.7)
[convention] 3866-3866: Missing function or method docstring
(C0116)
[warning] 3893-3893: Access to a protected member _partition_router of a client class
(W0212)
[warning] 3910-3910: Access to a protected member _IS_PARTITION_DUPLICATION_LOGGED of a client class
(W0212)
[warning] 3911-3911: Access to a protected member _cursor_per_partition of a client class
(W0212)
[warning] 3912-3912: Access to a protected member _semaphore_per_partition of a client class
(W0212)
[warning] 3913-3913: Access to a protected member _processing_partitions_indexes of a client class
(W0212)
[warning] 3914-3914: Access to a protected member _partition_key_to_index of a client class
(W0212)
airbyte_cdk/sources/declarative/incremental/concurrent_partition_cursor.py
Show resolved
Hide resolved
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 0
🧹 Nitpick comments (3)
unit_tests/sources/declarative/incremental/test_concurrent_perpartitioncursor.py (3)
3793-3800
: Consider adding a docstring for the helper function?The
_make_inner_cursor
function is a useful helper, but it's missing documentation. Would you consider adding a brief docstring to describe its purpose and return value? Something like:def _make_inner_cursor(ts: str) -> MagicMock: """Create a mock inner cursor with a single slice and specified timestamp state. Args: ts: The timestamp string to use for the cursor state Returns: A configured MagicMock representing an inner cursor """
3867-3932
: Great coverage for the cursor-exists duplicate scenario!This test nicely complements the previous one by covering the case where the duplicate partition appears but the cursor still exists. The verification that
_IS_PARTITION_DUPLICATION_LOGGED
remainsFalse
and that only 2 cursors are kept (for "1" & "2") is exactly right for this scenario.Consider adding a docstring to explain the test's purpose? Something like:
def test_duplicate_partition_after_closing_partition_cursor_exists(): """Test duplicate partition handling when the original cursor still exists after closing."""🧰 Tools
🪛 Pylint (3.3.7)
[convention] 3867-3867: Missing function or method docstring
(C0116)
[warning] 3901-3901: Access to a protected member _partition_router of a client class
(W0212)
[warning] 3926-3926: Access to a protected member _IS_PARTITION_DUPLICATION_LOGGED of a client class
(W0212)
[warning] 3927-3927: Access to a protected member _cursor_per_partition of a client class
(W0212)
[warning] 3928-3928: Access to a protected member _semaphore_per_partition of a client class
(W0212)
[warning] 3929-3929: Access to a protected member _processing_partitions_indexes of a client class
(W0212)
[warning] 3930-3930: Access to a protected member _partition_key_to_index of a client class
(W0212)
[warning] 3931-3931: Access to a protected member _partitions_done_generating_stream_slices of a client class
(W0212)
3934-3983
: Comprehensive duplicate detection while processing!This test covers the important edge case where a duplicate partition is detected while the original is still being processed. The key assertions are spot-on:
- Only 2 slices generated (duplicate skipped)
_IS_PARTITION_DUPLICATION_LOGGED
is True (warning emitted)- Proper cleanup of all tracking structures
The test demonstrates the duplicate detection working as intended. Would you consider adding a docstring here too for consistency?
🧰 Tools
🪛 Pylint (3.3.7)
[convention] 3934-3934: Missing function or method docstring
(C0116)
[warning] 3961-3961: Access to a protected member _partition_router of a client class
(W0212)
[warning] 3978-3978: Access to a protected member _IS_PARTITION_DUPLICATION_LOGGED of a client class
(W0212)
[warning] 3979-3979: Access to a protected member _cursor_per_partition of a client class
(W0212)
[warning] 3980-3980: Access to a protected member _semaphore_per_partition of a client class
(W0212)
[warning] 3981-3981: Access to a protected member _processing_partitions_indexes of a client class
(W0212)
[warning] 3982-3982: Access to a protected member _partition_key_to_index of a client class
(W0212)
[warning] 3983-3983: Access to a protected member _partitions_done_generating_stream_slices of a client class
(W0212)
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro
📒 Files selected for processing (1)
unit_tests/sources/declarative/incremental/test_concurrent_perpartitioncursor.py
(5 hunks)
🧰 Additional context used
🪛 Pylint (3.3.7)
unit_tests/sources/declarative/incremental/test_concurrent_perpartitioncursor.py
[warning] 3440-3440: Access to a protected member _semaphore_per_partition of a client class
(W0212)
[warning] 3441-3441: Access to a protected member _partitions_done_generating_stream_slices of a client class
(W0212)
[warning] 3442-3442: Access to a protected member _processing_partitions_indexes of a client class
(W0212)
[warning] 3443-3443: Access to a protected member _partition_key_to_index of a client class
(W0212)
[warning] 3528-3528: Access to a protected member _semaphore_per_partition of a client class
(W0212)
[warning] 3529-3529: Access to a protected member _partitions_done_generating_stream_slices of a client class
(W0212)
[warning] 3530-3530: Access to a protected member _processing_partitions_indexes of a client class
(W0212)
[warning] 3531-3531: Access to a protected member _partition_key_to_index of a client class
(W0212)
[warning] 3608-3608: Access to a protected member _semaphore_per_partition of a client class
(W0212)
[warning] 3609-3609: Access to a protected member _partitions_done_generating_stream_slices of a client class
(W0212)
[warning] 3610-3610: Access to a protected member _processing_partitions_indexes of a client class
(W0212)
[warning] 3611-3611: Access to a protected member _partition_key_to_index of a client class
(W0212)
[warning] 3730-3730: Access to a protected member _processing_partitions_indexes of a client class
(W0212)
[warning] 3731-3731: Access to a protected member _partition_key_to_index of a client class
(W0212)
[warning] 3732-3732: Access to a protected member _partition_parent_state_map of a client class
(W0212)
[warning] 3733-3733: Access to a protected member _partition_parent_state_map of a client class
(W0212)
[warning] 3740-3740: Access to a protected member _partitions_done_generating_stream_slices of a client class
(W0212)
[warning] 3741-3741: Access to a protected member _semaphore_per_partition of a client class
(W0212)
[warning] 3742-3742: Access to a protected member _processing_partitions_indexes of a client class
(W0212)
[warning] 3743-3743: Access to a protected member _partition_key_to_index of a client class
(W0212)
[convention] 3803-3803: Missing function or method docstring
(C0116)
[warning] 3839-3839: Access to a protected member _partition_router of a client class
(W0212)
[warning] 3860-3860: Access to a protected member _IS_PARTITION_DUPLICATION_LOGGED of a client class
(W0212)
[warning] 3861-3861: Access to a protected member _semaphore_per_partition of a client class
(W0212)
[warning] 3862-3862: Access to a protected member _processing_partitions_indexes of a client class
(W0212)
[warning] 3863-3863: Access to a protected member _partition_key_to_index of a client class
(W0212)
[warning] 3864-3864: Access to a protected member _partitions_done_generating_stream_slices of a client class
(W0212)
[convention] 3867-3867: Missing function or method docstring
(C0116)
[warning] 3901-3901: Access to a protected member _partition_router of a client class
(W0212)
[warning] 3926-3926: Access to a protected member _IS_PARTITION_DUPLICATION_LOGGED of a client class
(W0212)
[warning] 3927-3927: Access to a protected member _cursor_per_partition of a client class
(W0212)
[warning] 3928-3928: Access to a protected member _semaphore_per_partition of a client class
(W0212)
[warning] 3929-3929: Access to a protected member _processing_partitions_indexes of a client class
(W0212)
[warning] 3930-3930: Access to a protected member _partition_key_to_index of a client class
(W0212)
[warning] 3931-3931: Access to a protected member _partitions_done_generating_stream_slices of a client class
(W0212)
[convention] 3934-3934: Missing function or method docstring
(C0116)
[warning] 3961-3961: Access to a protected member _partition_router of a client class
(W0212)
[warning] 3978-3978: Access to a protected member _IS_PARTITION_DUPLICATION_LOGGED of a client class
(W0212)
[warning] 3979-3979: Access to a protected member _cursor_per_partition of a client class
(W0212)
[warning] 3980-3980: Access to a protected member _semaphore_per_partition of a client class
(W0212)
[warning] 3981-3981: Access to a protected member _processing_partitions_indexes of a client class
(W0212)
[warning] 3982-3982: Access to a protected member _partition_key_to_index of a client class
(W0212)
[warning] 3983-3983: Access to a protected member _partitions_done_generating_stream_slices of a client class
(W0212)
⏰ Context from checks skipped due to timeout of 90000ms (7)
- GitHub Check: Check: 'source-pokeapi' (skip=false)
- GitHub Check: Check: 'source-amplitude' (skip=false)
- GitHub Check: Check: 'source-hardcoded-records' (skip=false)
- GitHub Check: Check: 'source-shopify' (skip=false)
- GitHub Check: Pytest (All, Python 3.10, Ubuntu)
- GitHub Check: Pytest (All, Python 3.11, Ubuntu)
- GitHub Check: Pytest (Fast)
🔇 Additional comments (6)
unit_tests/sources/declarative/incremental/test_concurrent_perpartitioncursor.py (6)
3439-3443
: Enhanced internal state verification looks solid!These new assertions effectively verify that the sequence-based partition tracking structures are properly maintained. The checks for
_partitions_done_generating_stream_slices
,_processing_partitions_indexes
, and_partition_key_to_index
provide good coverage of the internal state management. The assertion that there's exactly 1 item in each structure makes sense given the test context where one partition is still being processed.🧰 Tools
🪛 Pylint (3.3.7)
[warning] 3440-3440: Access to a protected member _semaphore_per_partition of a client class
(W0212)
[warning] 3441-3441: Access to a protected member _partitions_done_generating_stream_slices of a client class
(W0212)
[warning] 3442-3442: Access to a protected member _processing_partitions_indexes of a client class
(W0212)
[warning] 3443-3443: Access to a protected member _partition_key_to_index of a client class
(W0212)
3527-3531
: Consistent state verification pattern - nice!Good to see the same internal state assertions being applied consistently across different test scenarios. This helps ensure the partition lifecycle management works correctly in various contexts.
🧰 Tools
🪛 Pylint (3.3.7)
[warning] 3528-3528: Access to a protected member _semaphore_per_partition of a client class
(W0212)
[warning] 3529-3529: Access to a protected member _partitions_done_generating_stream_slices of a client class
(W0212)
[warning] 3530-3530: Access to a protected member _processing_partitions_indexes of a client class
(W0212)
[warning] 3531-3531: Access to a protected member _partition_key_to_index of a client class
(W0212)
3606-3611
: Excellent cleanup verification!These assertions are crucial for verifying that the memory issues mentioned in the PR title are properly addressed. Ensuring all internal tracking structures are cleaned up to length 0 after partition closure is exactly what we need to prevent memory leaks, wdyt?
🧰 Tools
🪛 Pylint (3.3.7)
[warning] 3608-3608: Access to a protected member _semaphore_per_partition of a client class
(W0212)
[warning] 3609-3609: Access to a protected member _partitions_done_generating_stream_slices of a client class
(W0212)
[warning] 3610-3610: Access to a protected member _processing_partitions_indexes of a client class
(W0212)
[warning] 3611-3611: Access to a protected member _partition_key_to_index of a client class
(W0212)
3730-3733
: Comprehensive state tracking verification!The addition of
_processing_partitions_indexes
and_partition_key_to_index
checks, along with the specific verification of_partition_parent_state_map
contents, provides thorough coverage of the new sequence-based tracking. The assertions verify both the structure sizes and the actual content, which is great for catching logic errors.🧰 Tools
🪛 Pylint (3.3.7)
[warning] 3730-3730: Access to a protected member _processing_partitions_indexes of a client class
(W0212)
[warning] 3731-3731: Access to a protected member _partition_key_to_index of a client class
(W0212)
[warning] 3732-3732: Access to a protected member _partition_parent_state_map of a client class
(W0212)
[warning] 3733-3733: Access to a protected member _partition_parent_state_map of a client class
(W0212)
3740-3743
: Perfect cleanup validation!These assertions ensure that after closing partitions, all the new tracking structures are properly cleaned up, which directly addresses the memory management concerns mentioned in the PR objectives.
🧰 Tools
🪛 Pylint (3.3.7)
[warning] 3740-3740: Access to a protected member _partitions_done_generating_stream_slices of a client class
(W0212)
[warning] 3741-3741: Access to a protected member _semaphore_per_partition of a client class
(W0212)
[warning] 3742-3742: Access to a protected member _processing_partitions_indexes of a client class
(W0212)
[warning] 3743-3743: Access to a protected member _partition_key_to_index of a client class
(W0212)
3803-3865
: Excellent duplicate partition test coverage!This test effectively covers the scenario where a duplicate partition appears after the original cursor has been cleaned up. The test structure is well-organized:
- Sets up the partition sequence correctly (1 → 2 → 1)
- Uses lazy iteration to ensure proper timing
- Verifies that no duplicate warning is logged (since the cursor was cleaned)
- Confirms all internal structures are properly cleaned up
The logic makes sense - when the first "1" is closed and cleaned up before the second "1" arrives, it should be treated as a new partition rather than a duplicate, wdyt?
🧰 Tools
🪛 Pylint (3.3.7)
[convention] 3803-3803: Missing function or method docstring
(C0116)
[warning] 3839-3839: Access to a protected member _partition_router of a client class
(W0212)
[warning] 3860-3860: Access to a protected member _IS_PARTITION_DUPLICATION_LOGGED of a client class
(W0212)
[warning] 3861-3861: Access to a protected member _semaphore_per_partition of a client class
(W0212)
[warning] 3862-3862: Access to a protected member _processing_partitions_indexes of a client class
(W0212)
[warning] 3863-3863: Access to a protected member _partition_key_to_index of a client class
(W0212)
[warning] 3864-3864: Access to a protected member _partitions_done_generating_stream_slices of a client class
(W0212)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Just one last question
unit_tests/sources/declarative/incremental/test_concurrent_perpartitioncursor.py
Show resolved
Hide resolved
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Approved! Let's test this with the connector
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
lgtm
great job!
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 0
♻️ Duplicate comments (2)
airbyte_cdk/sources/declarative/incremental/concurrent_partition_cursor.py (2)
1-1
: Address the pipeline failure by running Ruff formatting.The GitHub Actions pipeline is failing due to formatting issues. Could you run
ruff format
to fix the code style issues? wdyt?🧰 Tools
🪛 GitHub Actions: Linters
[error] 1-1: Ruff formatting check failed. File would be reformatted. Run 'ruff format' to fix code style issues.
186-200
: Fix the unused variable flagged in past reviews.The
earliest_key
variable is still unused as flagged in previous reviews. Since we only need the state and sequence values, could you address this? wdyt?- earliest_key, (candidate_state, candidate_seq) = next( - iter(self._partition_parent_state_map.items()) - ) + _, (candidate_state, candidate_seq) = next( + iter(self._partition_parent_state_map.items()) + )
🧹 Nitpick comments (2)
airbyte_cdk/sources/declarative/incremental/concurrent_partition_cursor.py (2)
110-110
: Consider using list literal for initialization.The static analysis tool suggests using
[]
instead oflist()
for better performance and readability. wdyt about this change?- self._processing_partitions_indexes: List[int] = list() + self._processing_partitions_indexes: List[int] = []🧰 Tools
🪛 Pylint (3.3.7)
[refactor] 110-110: Consider using [] instead of list()
(R1734)
485-486
: Consider error handling for sequence removal.While the logic looks correct, the
remove()
operation could potentially raise aValueError
if the sequence isn't found. Would you consider adding error handling here for robustness? wdyt?seq = self._partition_key_to_index.pop(partition_key) - self._processing_partitions_indexes.remove(seq) + try: + self._processing_partitions_indexes.remove(seq) + except ValueError: + logger.warning(f"Sequence {seq} not found in processing indexes for partition {partition_key}")
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro
📒 Files selected for processing (1)
airbyte_cdk/sources/declarative/incremental/concurrent_partition_cursor.py
(9 hunks)
🧰 Additional context used
🪛 Pylint (3.3.7)
airbyte_cdk/sources/declarative/incremental/concurrent_partition_cursor.py
[refactor] 110-110: Consider using [] instead of list()
(R1734)
[refactor] 292-292: Do not raise StopIteration in generator, use return statement instead
(R1708)
🪛 GitHub Actions: Linters
airbyte_cdk/sources/declarative/incremental/concurrent_partition_cursor.py
[error] 1-1: Ruff formatting check failed. File would be reformatted. Run 'ruff format' to fix code style issues.
⏰ Context from checks skipped due to timeout of 90000ms (10)
- GitHub Check: Publish SDM to DockerHub
- GitHub Check: Analyze (python)
- GitHub Check: Check: 'source-pokeapi' (skip=false)
- GitHub Check: Check: 'source-amplitude' (skip=false)
- GitHub Check: Check: 'source-shopify' (skip=false)
- GitHub Check: Check: 'source-hardcoded-records' (skip=false)
- GitHub Check: SDM Docker Image Build
- GitHub Check: Pytest (Fast)
- GitHub Check: Pytest (All, Python 3.10, Ubuntu)
- GitHub Check: Pytest (All, Python 3.11, Ubuntu)
🔇 Additional comments (6)
airbyte_cdk/sources/declarative/incremental/concurrent_partition_cursor.py (6)
69-70
: Great improvement using constants for tuple indices!This addresses the previous review suggestion about using named constants instead of magic numbers. Much more readable now!
102-114
: The sequence-based tracking architecture looks solid.The new data structures for managing partition lifecycle and parent state updates appear well-designed. The separation of concerns between sequence tracking, completion status, and index mapping should improve memory management significantly.
🧰 Tools
🪛 Pylint (3.3.7)
[refactor] 110-110: Consider using [] instead of list()
(R1734)
170-177
: Good addition of cleanup logic in partition closing.The integration of
_cleanup_if_done
here ensures proper resource deallocation when partitions complete. The logic correctly checks both completion status and semaphore value before cleanup.
279-312
: Excellent implementation of duplicate detection and sequence tracking.The early return for duplicate partitions prevents processing issues, and the sequence-based tracking with proper cleanup in the finally block looks robust. The logic correctly handles both normal completion and exceptional cases.
🧰 Tools
🪛 Pylint (3.3.7)
[refactor] 292-292: Do not raise StopIteration in generator, use return statement instead
(R1708)
338-338
: Smart use of the new index tracking for partition cleanup.Using
_partition_key_to_index
to identify finished partitions for removal is more efficient than the previous approach. This should improve the partition limit enforcement logic.
471-489
: Well-designed cleanup method addresses the memory issues.This method effectively addresses the PR's objective of fixing memory issues by properly cleaning up all partition-related structures. The logic correctly validates completion conditions before cleanup and maintains consistency across all tracking data structures.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 0
🧹 Nitpick comments (2)
airbyte_cdk/sources/declarative/interpolation/jinja.py (2)
148-148
: Good catch on removing the cache to address memory issues! But should we consider a bounded cache alternative, wdyt?Removing the
@cache
decorator from_find_undeclared_variables
makes sense for memory management, especially in concurrent scenarios where many different template strings could accumulate in the cache. However, this might impact performance if the same templates are parsed repeatedly.Would you consider implementing a bounded cache (like
functools.lru_cache(maxsize=128)
) as a middle ground between memory efficiency and performance, wdyt?
156-156
: Removing template compilation cache addresses memory concerns - any thoughts on performance impact monitoring, wdyt?Removing the
@cache
decorator from_compile
is a solid approach to prevent memory accumulation from cached Template objects, which can be quite large. This change aligns well with the PR's memory fix objectives.Since template compilation can be computationally expensive, would it be worth adding some performance monitoring or metrics to track if this change introduces any noticeable performance regression in production, wdyt?
- # @cache + # @functools.lru_cache(maxsize=100) # Alternative: bounded cache def _compile(self, s: str) -> Template:
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro
📒 Files selected for processing (1)
airbyte_cdk/sources/declarative/interpolation/jinja.py
(1 hunks)
⏰ Context from checks skipped due to timeout of 90000ms (10)
- GitHub Check: Publish SDM to DockerHub
- GitHub Check: Check: 'source-pokeapi' (skip=false)
- GitHub Check: Check: 'source-amplitude' (skip=false)
- GitHub Check: Check: 'source-hardcoded-records' (skip=false)
- GitHub Check: Check: 'source-shopify' (skip=false)
- GitHub Check: SDM Docker Image Build
- GitHub Check: Pytest (Fast)
- GitHub Check: Pytest (All, Python 3.11, Ubuntu)
- GitHub Check: Pytest (All, Python 3.10, Ubuntu)
- GitHub Check: Analyze (python)
PyTest Results (Fast)3 659 tests 3 649 ✅ 5m 34s ⏱️ Results for commit d3b3f52. ♻️ This comment has been updated with latest results. |
PyTest Results (Full)3 662 tests 3 652 ✅ 17m 22s ⏱️ Results for commit d3b3f52. ♻️ This comment has been updated with latest results. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 0
♻️ Duplicate comments (1)
airbyte_cdk/sources/declarative/incremental/concurrent_partition_cursor.py (1)
186-199
: Fix the unused variable flagged by static analysis.The sequence-based logic for parent state updates looks much more efficient than checking all partitions. However, there's still an unused variable that should be addressed. Since we only need the state and sequence, wdyt about this fix?
- earliest_key, (candidate_state, candidate_seq) = next( + _, (candidate_state, candidate_seq) = next( iter(self._partition_parent_state_map.items()) )
🧹 Nitpick comments (1)
airbyte_cdk/sources/declarative/incremental/concurrent_partition_cursor.py (1)
102-114
: Nice addition of explicit partition lifecycle tracking.These new data structures provide comprehensive tracking for partition states and sequences, which should help with the memory management issues. The approach using sequence numbers for coordination looks solid.
One minor suggestion - wdyt about using
[]
instead oflist()
on line 110 as suggested by static analysis? It's slightly more idiomatic in Python.- self._processing_partitions_indexes: List[int] = list() + self._processing_partitions_indexes: List[int] = []🧰 Tools
🪛 Pylint (3.3.7)
[refactor] 110-110: Consider using [] instead of list()
(R1734)
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro
📒 Files selected for processing (1)
airbyte_cdk/sources/declarative/incremental/concurrent_partition_cursor.py
(7 hunks)
🧰 Additional context used
🧬 Code Graph Analysis (1)
airbyte_cdk/sources/declarative/incremental/concurrent_partition_cursor.py (4)
airbyte_cdk/sources/streams/checkpoint/per_partition_key_serializer.py (1)
PerPartitionKeySerializer
(7-22)airbyte_cdk/sources/declarative/incremental/global_substream_cursor.py (3)
Timer
(50-68)iterate_with_last_flag_and_state
(17-47)stream_slices
(102-132)airbyte_cdk/sources/declarative/partition_routers/substream_partition_router.py (2)
stream_slices
(150-231)logger
(421-422)airbyte_cdk/sources/declarative/incremental/per_partition_cursor.py (1)
stream_slices
(61-64)
🪛 Pylint (3.3.7)
airbyte_cdk/sources/declarative/incremental/concurrent_partition_cursor.py
[refactor] 110-110: Consider using [] instead of list()
(R1734)
[refactor] 292-292: Do not raise StopIteration in generator, use return statement instead
(R1708)
⏰ Context from checks skipped due to timeout of 90000ms (13)
- GitHub Check: Pytest (Fast)
- GitHub Check: Build and Inspect Python Package
- GitHub Check: Dependency Analysis with Deptry
- GitHub Check: Ruff Format Check
- GitHub Check: preview_docs
- GitHub Check: update_release_draft
- GitHub Check: MyPy Check
- GitHub Check: Ruff Lint Check
- GitHub Check: Validate PR title
- GitHub Check: SDM Docker Image Build
- GitHub Check: Pytest (All, Python 3.11, Ubuntu)
- GitHub Check: Pytest (All, Python 3.10, Ubuntu)
- GitHub Check: Analyze (python)
🔇 Additional comments (7)
airbyte_cdk/sources/declarative/incremental/concurrent_partition_cursor.py (7)
12-12
: LGTM on the import addition.Adding
List
to the type imports supports the new type annotations in the code. Good practice for type safety!
69-70
: Great improvement using named constants!This directly addresses the past review feedback about readability. Using
_PARENT_STATE = 0
and_GENERATION_SEQUENCE = 1
makes the tuple indexing much clearer than magic numbers.
176-176
: Good integration of cleanup logic.Calling
_cleanup_if_done
after the global cursor update ensures that completed partitions are properly cleaned up, which should address the memory issues mentioned in the PR title.
279-279
: Smart early return for duplicate partitions.Returning immediately when a duplicate partition is detected prevents unnecessary processing and potential memory issues. This is a clean solution!
284-296
: Excellent sequence tracking implementation.The logic here properly assigns unique sequence numbers to each partition and maintains all the tracking data structures consistently. The sequence-based approach should make parent state coordination much more reliable.
🧰 Tools
🪛 Pylint (3.3.7)
[refactor] 292-292: Do not raise StopIteration in generator, use return statement instead
(R1708)
334-334
: Good refinement of partition cleanup detection.Using
_partition_key_to_index
to determine which partitions can be removed is more precise than the previous approach. This should work well with the new lifecycle tracking.
467-485
: Excellent cleanup method implementation.This is the core of the memory fix and looks very solid. The method:
- Properly checks completion conditions before cleanup
- Maintains consistency across all tracking data structures
- Uses safe operations (pop with default, discard)
- Includes helpful debug logging
The implementation should effectively prevent memory leaks from completed partitions. Nice work on this!
Summary by CodeRabbit
Bug Fixes
Tests