-
Notifications
You must be signed in to change notification settings - Fork 4.3k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
feat(airbyte-cdk): Add Global Parent State Cursor #39593
feat(airbyte-cdk): Add Global Parent State Cursor #39593
Conversation
The latest updates on your projects. Learn more about Vercel for Git ↗︎ 1 Skipped Deployment
|
62e9567
to
e94e1b5
Compare
b7152de
to
b2fd84a
Compare
Regression test for Jira: https://github.com/airbytehq/airbyte/actions/runs/9957227440/job/27508789617 |
@@ -812,6 +812,11 @@ definitions: | |||
description: Set to True if the target API does not accept queries where the start time equal the end time. | |||
type: boolean | |||
default: False | |||
global_parent_cursor: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think we should try to speak the user's language here.
How about we call this
use_parent_cursor
title: Whether to use the parent stream cursor
description: If the parent stream cursor is kept in sync with this stream's, using the parent stream's cursor allows for more efficient storage of the connection state.
@lmossman thoughts on how make this most useful for users?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think I'd modify this slightly:
use_parent_cursor:
title: Use Parent Cursor
description: Make syncs more efficient by using the parent stream's cursor for the connection state.\n\n**WARNING:** This only works if both the parent stream and this child stream are incremental and if the parent stream cursor is kept in sync with this child stream's cursor!
If that sounds correct to you.
Though thinking about this more, I almost feel like we should just have a single boolean field in the builder like Parent cursor is updated when child cursor is updated?
. And if the user enables this, then we set both incremental_dependency
and use_parent_cursor
to true
, since I think they would always want both enabled in that case, and that way the user doesn't really need to know about the internals.
What do you think?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Alternatively, what if we just combined this global_parent_cursor
functionality into the existing incremental_dependency
field? Is there ever a use case to have one enabled and not the other?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@girarda, @lmossman That's not the correct explanation for this parameter. This is what incremental_dependency does. The global cursor is responsible for storing the state as one value instead of per partition.
These parameters can be used separately:
incremental_dependency: true + global_cursor: true
: Incremental streams with dependency.incremental_dependency: true + global_cursor: false
: Per partition but saves state for efficiency, need a lookback.incremental_dependency: false + global_cursor: true
: Not a good idea, as we will always read the parent in the full refresh, and the state will be updated only at the end of the sync.incremental_dependency: false + global_cursor: false
: Per partition with full refresh parent.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What is the use case for this option?
incremental_dependency: true + global_cursor: false
: Per partition but saves state for efficiency, need a lookback.
I thought that the requirement that updates to the child record resulted in updates to the parent record cursor was just for the incremental_dependency
value, and if so it seems like global_cursor: true
would always be desired in that situation, right?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The use case involves a stream without dependency between cursors. Adding incremental_dependency
may lead to losing records but will help limit parent records, significantly speeding up the sync compared to using just PerPartition without parent state.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
But in that use case, why would you ever want to have global_cursor: false
? It seems like you would always want to have global_cursor: true
if you are setting incremental_dependency: true
, so that the state is saved more efficiently, right? I feel like I'm missing something here
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
For example, we can use incremental_dependency
without a global cursor to avoid using the lookback window. This approach may be more efficient if the number of partitions is small.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm confused, you said above:
incremental_dependency: true + global_cursor: false
: Per partition but saves state for efficiency, need a lookback.
But just now you said
we can use
incremental_dependency
without a global cursor to avoid using the lookback window
Which seems contradictory.
It doesn't seem like this would be a straightforward decision for users to make, since I am also struggling to understand the use cases. Is there no way we can simplify this more?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sorry for the confusion, I mixed up different cases. I've added more documentation for a better explanation. Here's a short recap:
- Global Cursor: This should be used if there are many partitions, as we cannot store all partitions, so there's no reason to save only part of them.
- Incremental Dependency: This should be used if there is a dependency between parent and child cursors. If we use this field without the dependency, we can miss new child records that were added to old parent records.
However, previously and possibly in the future, we used the incremental dependency approach for streams without dependency. Although this can lead to missing records, it provides a way to read these streams incrementally, significantly increasing performance.
|
||
class GlobalParentCursor(DeclarativeCursor): | ||
""" | ||
The GlobalParentCursor is designed to track the state of parent streams using a single global cursor. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
can you update the description to make the constraints super clear? The main one I'm worried about is the fact that the parent and child cursors must be in sync
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The dependency between the child and parent cursor is not a requirement for this class. The GlobalParentCursor
can be used to read the parent stream in full refresh mode while using a global cursor for the child stream. This approach is particularly useful when the number of partitions is too large to store individually. By managing the state globally, we can efficiently handle a large number of partitions without exceeding storage limits.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
can you add a high level description of how this feature is used / when should a connector developer use global parent cursor?
I think it's worth documenting in the PR, in this docstring, and in the yaml schema file
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
update: I see it's in the docs <3
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I also added a note on how the state is updated
stream_slice (StreamSlice): The stream slice to be closed. | ||
*args (Any): Additional arguments. | ||
""" | ||
if stream_slice.last_slice: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
where is stream_slice.last_slice
defined? It doesn't seem to be a field on StreamSlice
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
In GlobalCursorStreamSlice
|
||
return state | ||
|
||
def select_state(self, stream_slice: Optional[StreamSlice] = None) -> Optional[StreamState]: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
let's add a comment mentioning the stream_slice param is ignored
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Added
@@ -682,6 +684,12 @@ def _merge_stream_slicers(self, model: DeclarativeStreamModel, config: Config) - | |||
and model.retriever.partition_router | |||
): | |||
stream_slicer_model = model.retriever.partition_router | |||
|
|||
# if model.incremental_sync and hasattr(model.incremental_sync, "global_parent_cursor") and model.incremental_sync.global_parent_cursor: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
is this commented code block still required?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Fixed
@@ -53,12 +53,13 @@ def __ne__(self, other: object) -> bool: | |||
|
|||
|
|||
class StreamSlice(Mapping[str, Any]): | |||
def __init__(self, *, partition: Mapping[str, Any], cursor_slice: Mapping[str, Any]) -> None: | |||
def __init__(self, *, partition: Mapping[str, Any], cursor_slice: Mapping[str, Any], last_slice: bool = False) -> None: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
is last_slice
only useful for the global parent cursor? If yes, can we keep it scoped to the global parent cursor instead of introducing a new field?
The concern is that removing or correcting a field from public interfaces is very difficult so we should be conservative about what we add. This could be done by specializing the type of StreamSlice yielded by the global parent cursor.
Alternatively, if we think the field will be useful for other cases, I think we should promote the flag_last
method up so the field is always properly set instead of only when using a global parent cursor.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Created a new class only for GlobalSubstreamCursor.
airbyte-integrations/connectors/source-jira/source_jira/manifest.yaml
Outdated
Show resolved
Hide resolved
airbyte-integrations/connectors/source-slack/source_slack/manifest.yaml
Outdated
Show resolved
Hide resolved
airbyte-integrations/connectors/source-slack/source_slack/source.py
Outdated
Show resolved
Hide resolved
d235ac5
to
280abcf
Compare
280abcf
to
df0dd6f
Compare
|
||
def __init__(self, *, partition: Mapping[str, Any], cursor_slice: Mapping[str, Any], last_slice: bool = False) -> None: | ||
super().__init__(partition=partition, cursor_slice=cursor_slice) | ||
self.last_slice = last_slice |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
is last_slice meant to be immutable? If yes, let's make it either a property or a getter so users can't modify it
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Fixed
@@ -627,11 +628,12 @@ def create_declarative_stream(self, model: DeclarativeStreamModel, config: Confi | |||
and hasattr(model.incremental_sync, "is_client_side_incremental") | |||
and model.incremental_sync.is_client_side_incremental | |||
): | |||
if combined_slicers and not isinstance(combined_slicers, (DatetimeBasedCursor, PerPartitionCursor)): | |||
if combined_slicers and not isinstance(combined_slicers, (DatetimeBasedCursor, GlobalSubstreamCursor, PerPartitionCursor)): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit let's move this to a set of supported or default slicers
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Fixed
), | ||
partition_router=stream_slicer, | ||
) | ||
if hasattr(incremental_sync_model, "global_substream_cursor") and incremental_sync_model.global_substream_cursor: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
not a problem specific with this PR, but as a general note: this factory code is getting pretty complicated and has some weird coupling scattered throughout the code. We'll probably need to revisit our approach soon-ish
stream_slice (StreamSlice): The stream slice to be closed. | ||
*args (Any): Additional arguments. | ||
""" | ||
if isinstance(stream_slice, GlobalCursorStreamSlice) and stream_slice.last_slice: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
should we raise an exception if stream_slice is not of the right type? I think that would be indicative of a bug
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Added raising an exception
### Usage Guidance | ||
- **Use 'incremental_dependency'**: When the parent stream should be read incrementally. This option should only be used when the update or addition of a new child record updates the parent cursor so only the new data will be synced. If there is no dependency between the child and parent cursor, child records added to old parent records will be missed. | ||
|
||
- **Use 'global_substream_cursor'**: When there are many partitions in the parent stream, and you want to manage the state globally instead of per partition. This simplifies state management and reduces the size of state messages. However, this should only be used with a lookback window to avoid missing records that were added during the sync. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
isn't there a risk of losing records if the sync fails?
Example:
- Sync partition A with timestamp = t1
- Fail before syncing partition B which has a timestamp = t2
On the next attempt, we won't try to sync Partition B
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
No, as the child cursor is updated at the end of the sync.
629fdea
to
015114c
Compare
/approve-regression-tests
|
if self._start: | ||
return int((time.perf_counter_ns() - self._start) // 1e9) | ||
else: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
if self._start: | |
return int((time.perf_counter_ns() - self._start) // 1e9) | |
else: | |
if self._start is None: | |
return (time.perf_counter_ns() - self._start) // 1_000_000_000 | |
def start(self) -> None: | |
self._start = time.perf_counter_ns() | |
⚡️ Codeflash found optimizations for this PR📄
|
if self._start: | ||
return int((time.perf_counter_ns() - self._start) // 1e9) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
if self._start: | |
return int((time.perf_counter_ns() - self._start) // 1e9) | |
if self._start != -1: | |
return (time.perf_counter_ns() - self._start) // 1_000_000_000 |
⚡️ Codeflash found optimizations for this PR📄
|
What
Add the ability to sync streams with incremental parent streams by tracking the parent cursor as global instead of per partition. Resolves: https://github.com/airbytehq/airbyte-internal-issues/issues/8185
How
Introduce a new class
GlobalParentCursor
that tracks the state of the parent stream as a single cursor when theglobal_parent_cursor
parameter is set for incremental sync.Review guide
airbyte-cdk/python/airbyte_cdk/sources/declarative/declarative_component_schema.yaml
airbyte-cdk/python/airbyte_cdk/sources/declarative/incremental/global_parent_cursor.py
airbyte-cdk/python/airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py
airbyte-cdk/python/unit_tests/sources/declarative/partition_routers/test_parent_state_stream.py
User Impact
The
Tickets Comments
stream that iterates over tickets can now be implemented for Zendesk Support, resolving Zendesk Support Issue #5254. The following connectors can migrate to low code without breaking changes:IssueComments
andIssueWorklogs
streams to low code #38612Slack: 🚨 🚨 ✨ Source Slack: MigrateThreads
streams to low-code CDK #38618 - There is no dependency between the parent and child, so the global cursor will not be used.Zendesk Support: 🚨 🚨 ✨ Source Zendesk Support: Migrate incremental substreams #38653 - There is no dependency between the parent and child, so the global cursor will not be used.Can this PR be safely reverted and rolled back?