-
Notifications
You must be signed in to change notification settings - Fork 24
fix(low-code): sync records without cursor field in ConcurrentPerPartitionCursor #605
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
fix(low-code): sync records without cursor field in ConcurrentPerPartitionCursor #605
Conversation
📝 WalkthroughWalkthroughThe Additionally, a test was modified to relax strict ordering in record assertions by verifying record presence without enforcing order. Two new parameterized test cases were added to verify that incremental sync correctly handles records missing the cursor field by retaining the initial cursor state for those partitions. Changes
Sequence Diagram(s)sequenceDiagram
participant User
participant ConcurrentPerPartitionCursor
participant Record
User->>ConcurrentPerPartitionCursor: observe(record)
ConcurrentPerPartitionCursor->>ConcurrentPerPartitionCursor: extract cursor value from record
alt Cursor value present
ConcurrentPerPartitionCursor->>ConcurrentPerPartitionCursor: parse and format cursor value
ConcurrentPerPartitionCursor->>ConcurrentPerPartitionCursor: update global/per-partition cursor
else Cursor value missing (ValueError)
ConcurrentPerPartitionCursor-->>User: return early (no state update)
end
Possibly related PRs
Suggested reviewers
Would you like to consider if similar error handling should be added to other cursor-related classes for consistency, wdyt? 📜 Recent review detailsConfiguration used: CodeRabbit UI 📒 Files selected for processing (1)
🧰 Additional context used🪛 Flake8 (7.2.0)unit_tests/sources/declarative/incremental/test_concurrent_perpartitioncursor.py[error] 1469-1469: line too long (109 > 79 characters) (E501) [error] 1478-1478: line too long (86 > 79 characters) (E501) [error] 1499-1499: line too long (100 > 79 characters) (E501) [error] 1501-1501: line too long (143 > 79 characters) (E501) [error] 1513-1513: line too long (82 > 79 characters) (E501) [error] 1514-1514: line too long (84 > 79 characters) (E501) [error] 1515-1515: line too long (98 > 79 characters) (E501) [error] 1520-1520: line too long (129 > 79 characters) (E501) [error] 1522-1522: line too long (100 > 79 characters) (E501) [error] 1528-1528: line too long (91 > 79 characters) (E501) [error] 1545-1545: line too long (92 > 79 characters) (E501) [error] 1548-1548: line too long (87 > 79 characters) (E501) [error] 1558-1558: line too long (84 > 79 characters) (E501) [error] 1565-1565: line too long (84 > 79 characters) (E501) [error] 1576-1576: line too long (93 > 79 characters) (E501) [error] 1584-1584: line too long (80 > 79 characters) (E501) [error] 1593-1593: line too long (95 > 79 characters) (E501) [error] 1594-1594: line too long (82 > 79 characters) (E501) [error] 1595-1595: line too long (84 > 79 characters) (E501) [error] 1598-1598: line too long (95 > 79 characters) (E501) [error] 1610-1610: line too long (109 > 79 characters) (E501) [error] 1619-1619: line too long (86 > 79 characters) (E501) [error] 1640-1640: line too long (100 > 79 characters) (E501) [error] 1642-1642: line too long (143 > 79 characters) (E501) [error] 1652-1652: line too long (100 > 79 characters) (E501) [error] 1654-1654: line too long (82 > 79 characters) (E501) [error] 1655-1655: line too long (84 > 79 characters) (E501) [error] 1661-1661: line too long (91 > 79 characters) (E501) [error] 1677-1677: line too long (92 > 79 characters) (E501) [error] 1680-1680: line too long (87 > 79 characters) (E501) [error] 1690-1690: line too long (84 > 79 characters) (E501) [error] 1697-1697: line too long (84 > 79 characters) (E501) [error] 1708-1708: line too long (93 > 79 characters) (E501) [error] 1716-1716: line too long (80 > 79 characters) (E501) [error] 1725-1725: line too long (95 > 79 characters) (E501) [error] 1726-1726: line too long (82 > 79 characters) (E501) [error] 1727-1727: line too long (84 > 79 characters) (E501) [error] 1730-1730: line too long (95 > 79 characters) (E501) [error] 1731-1731: line too long (84 > 79 characters) (E501) ⏰ Context from checks skipped due to timeout of 90000ms (7)
🔇 Additional comments (2)
✨ Finishing Touches
Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. 🪧 TipsChatThere are 3 ways to chat with CodeRabbit:
SupportNeed help? Create a ticket on our support page for assistance with any issues or questions. Note: Be mindful of the bot's finite context window. It's strongly recommended to break down tasks such as reading entire modules into smaller chunks. For a focused discussion, use review comments to chat about specific files and their changes, instead of using the PR comments. CodeRabbit Commands (Invoked using PR comments)
Other keywords and placeholders
CodeRabbit Configuration File (
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 0
🧹 Nitpick comments (1)
airbyte_cdk/sources/declarative/incremental/concurrent_partition_cursor.py (1)
451-456
: Excellent defensive programming approach!The guard clause properly handles records without cursor values and aligns with the existing
DatetimeBasedCursor.observe
logic as mentioned in the PR. The early return prevents unnecessary processing and potential downstream errors.One small suggestion - would you consider making the comment slightly more concise? Something like:
- # if the current record has no cursor value, we cannot meaningfully update the state based on it, so there is nothing more to do + # Skip state updates for records without cursor valuesThe current comment is clear, but a shorter version might be easier to scan. What do you think (wdyt)?
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro
📒 Files selected for processing (1)
airbyte_cdk/sources/declarative/incremental/concurrent_partition_cursor.py
(1 hunks)
⏰ Context from checks skipped due to timeout of 90000ms (10)
- GitHub Check: Check: source-intercom
- GitHub Check: Check: source-amplitude
- GitHub Check: Check: source-pokeapi
- GitHub Check: Check: source-shopify
- GitHub Check: Check: source-hardcoded-records
- GitHub Check: SDM Docker Image Build
- GitHub Check: Pytest (All, Python 3.11, Ubuntu)
- GitHub Check: Pytest (All, Python 3.10, Ubuntu)
- GitHub Check: Pytest (Fast)
- GitHub Check: Analyze (python)
🔇 Additional comments (1)
airbyte_cdk/sources/declarative/incremental/concurrent_partition_cursor.py (1)
457-459
: Nice optimization with the pre-extracted value!Using the pre-extracted
record_cursor_value
instead of callingself._cursor_field.extract_value(record)
again is more efficient and maintains consistency with the guard clause above. The flow is now cleaner and avoids duplicate work.
…ized_components_resolver
unit_tests/sources/declarative/resolvers/test_parametrized_components_resolver.py
Show resolved
Hide resolved
Can you please add the unit test for records without a cursor? |
/autofix
|
added |
Could you please also add a test case where none of the records have a cursor? |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 0
🧹 Nitpick comments (1)
unit_tests/sources/declarative/incremental/test_concurrent_perpartitioncursor.py (1)
1463-1630
: Great test coverage for the missing cursor field scenario! 🎯This test case perfectly validates the enhancement described in the PR objectives. The logic correctly simulates records without cursor fields and verifies that partitions retain their initial cursor state when records lack the cursor field. The test structure follows the existing patterns well, and the expected behavior aligns with the fix implementation.
However, there are several line length violations flagged by the linter (lines exceeding 79 characters). Would you consider breaking these long lines for better code style consistency? For example, the URL construction on lines 1501, 1513-1515, 1533, 1540, 1547 could be split into multiple lines using string concatenation, wdyt?
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro
📒 Files selected for processing (1)
unit_tests/sources/declarative/incremental/test_concurrent_perpartitioncursor.py
(1 hunks)
🧰 Additional context used
🪛 Flake8 (7.2.0)
unit_tests/sources/declarative/incremental/test_concurrent_perpartitioncursor.py
[error] 1469-1469: line too long (109 > 79 characters)
(E501)
[error] 1478-1478: line too long (86 > 79 characters)
(E501)
[error] 1499-1499: line too long (100 > 79 characters)
(E501)
[error] 1501-1501: line too long (143 > 79 characters)
(E501)
[error] 1513-1513: line too long (82 > 79 characters)
(E501)
[error] 1514-1514: line too long (84 > 79 characters)
(E501)
[error] 1515-1515: line too long (98 > 79 characters)
(E501)
[error] 1519-1519: line too long (123 > 79 characters)
(E501)
[error] 1524-1524: line too long (86 > 79 characters)
(E501)
[error] 1526-1526: line too long (100 > 79 characters)
(E501)
[error] 1527-1527: line too long (110 > 79 characters)
(E501)
[error] 1533-1533: line too long (129 > 79 characters)
(E501)
[error] 1535-1535: line too long (100 > 79 characters)
(E501)
[error] 1540-1540: line too long (129 > 79 characters)
(E501)
[error] 1542-1542: line too long (100 > 79 characters)
(E501)
[error] 1547-1547: line too long (129 > 79 characters)
(E501)
[error] 1555-1555: line too long (91 > 79 characters)
(E501)
[error] 1572-1572: line too long (92 > 79 characters)
(E501)
[error] 1575-1575: line too long (87 > 79 characters)
(E501)
[error] 1585-1585: line too long (84 > 79 characters)
(E501)
[error] 1592-1592: line too long (84 > 79 characters)
(E501)
[error] 1603-1603: line too long (93 > 79 characters)
(E501)
[error] 1611-1611: line too long (80 > 79 characters)
(E501)
[error] 1620-1620: line too long (95 > 79 characters)
(E501)
[error] 1621-1621: line too long (82 > 79 characters)
(E501)
[error] 1622-1622: line too long (84 > 79 characters)
(E501)
[error] 1625-1625: line too long (95 > 79 characters)
(E501)
⏰ Context from checks skipped due to timeout of 90000ms (3)
- GitHub Check: Check: source-shopify
- GitHub Check: Pytest (All, Python 3.11, Ubuntu)
- GitHub Check: Pytest (All, Python 3.10, Ubuntu)
/autofix
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 0
🧹 Nitpick comments (2)
unit_tests/sources/declarative/incremental/test_concurrent_perpartitioncursor.py (2)
1463-1630
: Excellent test coverage for missing cursor fields! Could we clean up the line lengths though? 🤔This test case properly validates the scenario where some records lack cursor fields. The logic looks solid - vote 100 without
created_at
should retain initial state while vote 111 withcreated_at
should advance. The expected state correctly reflects this behavior.However, there are multiple line length violations that should be addressed for better code quality, wdyt?
Apply these formatting fixes to comply with line length limits:
- f"https://api.example.com/community/posts?per_page=100&start_time={PARENT_POSTS_CURSOR}", + f"https://api.example.com/community/posts" + f"?per_page=100&start_time={PARENT_POSTS_CURSOR}",And similar fixes for other long lines in the mock requests setup.
1631-1797
: Great comprehensive test for complete cursor field absence! Line length cleanup needed though 🧹This test case thoroughly validates the scenario where all records are missing cursor fields. The expectation that both partitions retain their initial states is exactly right for this edge case.
Same formatting concern here - multiple line length violations should be addressed, wdyt?
Similar line length fixes needed throughout this test case:
- f"https://api.example.com/community/posts?per_page=100&start_time={PARENT_POSTS_CURSOR}", + f"https://api.example.com/community/posts" + f"?per_page=100&start_time={PARENT_POSTS_CURSOR}",And for other long lines in the test setup.
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro
📒 Files selected for processing (1)
unit_tests/sources/declarative/incremental/test_concurrent_perpartitioncursor.py
(1 hunks)
🧰 Additional context used
🪛 Flake8 (7.2.0)
unit_tests/sources/declarative/incremental/test_concurrent_perpartitioncursor.py
[error] 1469-1469: line too long (109 > 79 characters)
(E501)
[error] 1478-1478: line too long (86 > 79 characters)
(E501)
[error] 1499-1499: line too long (100 > 79 characters)
(E501)
[error] 1501-1501: line too long (143 > 79 characters)
(E501)
[error] 1513-1513: line too long (82 > 79 characters)
(E501)
[error] 1514-1514: line too long (84 > 79 characters)
(E501)
[error] 1515-1515: line too long (98 > 79 characters)
(E501)
[error] 1519-1519: line too long (123 > 79 characters)
(E501)
[error] 1524-1524: line too long (86 > 79 characters)
(E501)
[error] 1526-1526: line too long (100 > 79 characters)
(E501)
[error] 1527-1527: line too long (110 > 79 characters)
(E501)
[error] 1533-1533: line too long (129 > 79 characters)
(E501)
[error] 1535-1535: line too long (100 > 79 characters)
(E501)
[error] 1540-1540: line too long (129 > 79 characters)
(E501)
[error] 1542-1542: line too long (100 > 79 characters)
(E501)
[error] 1547-1547: line too long (129 > 79 characters)
(E501)
[error] 1555-1555: line too long (91 > 79 characters)
(E501)
[error] 1572-1572: line too long (92 > 79 characters)
(E501)
[error] 1575-1575: line too long (87 > 79 characters)
(E501)
[error] 1585-1585: line too long (84 > 79 characters)
(E501)
[error] 1592-1592: line too long (84 > 79 characters)
(E501)
[error] 1603-1603: line too long (93 > 79 characters)
(E501)
[error] 1611-1611: line too long (80 > 79 characters)
(E501)
[error] 1620-1620: line too long (95 > 79 characters)
(E501)
[error] 1621-1621: line too long (82 > 79 characters)
(E501)
[error] 1622-1622: line too long (84 > 79 characters)
(E501)
[error] 1625-1625: line too long (95 > 79 characters)
(E501)
[error] 1637-1637: line too long (109 > 79 characters)
(E501)
[error] 1646-1646: line too long (86 > 79 characters)
(E501)
[error] 1667-1667: line too long (100 > 79 characters)
(E501)
[error] 1669-1669: line too long (143 > 79 characters)
(E501)
[error] 1679-1679: line too long (100 > 79 characters)
(E501)
[error] 1681-1681: line too long (82 > 79 characters)
(E501)
[error] 1682-1682: line too long (84 > 79 characters)
(E501)
[error] 1687-1687: line too long (123 > 79 characters)
(E501)
[error] 1692-1692: line too long (86 > 79 characters)
(E501)
[error] 1694-1694: line too long (100 > 79 characters)
(E501)
[error] 1695-1695: line too long (110 > 79 characters)
(E501)
[error] 1701-1701: line too long (129 > 79 characters)
(E501)
[error] 1703-1703: line too long (100 > 79 characters)
(E501)
[error] 1708-1708: line too long (129 > 79 characters)
(E501)
[error] 1710-1710: line too long (100 > 79 characters)
(E501)
[error] 1715-1715: line too long (129 > 79 characters)
(E501)
[error] 1723-1723: line too long (91 > 79 characters)
(E501)
[error] 1739-1739: line too long (92 > 79 characters)
(E501)
[error] 1742-1742: line too long (87 > 79 characters)
(E501)
[error] 1752-1752: line too long (84 > 79 characters)
(E501)
[error] 1759-1759: line too long (84 > 79 characters)
(E501)
[error] 1770-1770: line too long (93 > 79 characters)
(E501)
[error] 1778-1778: line too long (80 > 79 characters)
(E501)
[error] 1787-1787: line too long (95 > 79 characters)
(E501)
[error] 1788-1788: line too long (82 > 79 characters)
(E501)
[error] 1789-1789: line too long (84 > 79 characters)
(E501)
[error] 1792-1792: line too long (95 > 79 characters)
(E501)
[error] 1793-1793: line too long (84 > 79 characters)
(E501)
⏰ Context from checks skipped due to timeout of 90000ms (4)
- GitHub Check: Check: source-shopify
- GitHub Check: Pytest (All, Python 3.11, Ubuntu)
- GitHub Check: Pytest (All, Python 3.10, Ubuntu)
- GitHub Check: Pytest (Fast)
ConcurrentPerPartitionCursor.observe
catch case when record doesn't have a cursor value, skip updating state logic for this record.Same logic as in
DatetimeBasedCursor.observe
: https://github.com/airbytehq/airbyte-python-cdk/blob/main/airbyte_cdk/sources/declarative/incremental/datetime_based_cursor.py#L155Summary by CodeRabbit