Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(airbyte-cdk): Add Global Parent State Cursor #39593

Merged
merged 33 commits into from
Sep 6, 2024
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
Show all changes
33 commits
Select commit Hold shift + click to select a range
60d8c79
Add GlobalParentCursor
tolik0 Jun 19, 2024
9467198
Move `global_parent_cursor` to incremental sync
tolik0 Jun 19, 2024
2a5d5e5
Move last slice flag to StreamSlice
tolik0 Jun 19, 2024
f6fbd1d
Fix format
tolik0 Jun 19, 2024
d72b817
Fix docs
tolik0 Jun 19, 2024
af78dda
Add Slack changes
tolik0 Jul 9, 2024
f4f5b78
Small fix for Slack state migration
tolik0 Jul 9, 2024
72fc51f
Add Jira changes
tolik0 Jul 11, 2024
4f4461b
Add local filtering for Global Parent cursor
tolik0 Jul 11, 2024
2ff6f0e
Fix formatting
tolik0 Jul 12, 2024
8b3cf0b
Fix description
tolik0 Jul 12, 2024
bcd0645
Fix warnings
tolik0 Jul 16, 2024
5f15d19
Rename class and update the docs
tolik0 Jul 31, 2024
1d90e13
Fix mypy errors
tolik0 Jul 31, 2024
de255ce
Update docs
tolik0 Aug 1, 2024
85fe4cc
Add unit tests
tolik0 Aug 1, 2024
dd42be7
Delete connector changes
tolik0 Aug 1, 2024
b536e2c
Fix format
tolik0 Aug 1, 2024
23ea1ce
Delete Slack changes
tolik0 Aug 1, 2024
596b9c2
Add docs and fix small errors
tolik0 Aug 2, 2024
9f2d882
Update docs
tolik0 Aug 6, 2024
cdc0d6d
Add lookback window
tolik0 Aug 14, 2024
c2491ec
Update the docs with the lookback window
tolik0 Aug 14, 2024
6883e9b
Update incremental sync docs
tolik0 Aug 15, 2024
a2b210d
Update field description
tolik0 Aug 15, 2024
4cde7f0
Update class doc for GlobalSubstreamCursor
tolik0 Aug 15, 2024
1b60b3e
Refactor for concurrent CDK compatibility
tolik0 Aug 19, 2024
70bc219
Update docstring for stream_slices
tolik0 Aug 19, 2024
eed4423
Add comment with sequence for stream slices
tolik0 Aug 20, 2024
933af1e
Delete wrong change
tolik0 Aug 20, 2024
e4d5172
Fix Timer
tolik0 Sep 6, 2024
015114c
Fix tests
tolik0 Sep 6, 2024
95cd310
Fix formatting
tolik0 Sep 6, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
Delete Slack changes
  • Loading branch information
tolik0 committed Sep 6, 2024
commit 23ea1ce72083cfb8969b482284f1382e9d0c6b64

This file was deleted.

Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
version: 0.51.42
version: 0.73.0
type: DeclarativeSource

definitions:
Expand Down Expand Up @@ -82,9 +82,6 @@ definitions:
schema_loader:
$ref: "#/definitions/schema_loader"

incremental_stream:
$ref: "#/definitions/stream_base"

users_stream:
primary_key: "id"
retriever:
Expand Down Expand Up @@ -216,15 +213,6 @@ definitions:
request_option:
field_name: "channel"
inject_into: "request_parameter"
transformations:
- type: AddFields
fields:
- path:
- float_ts
value: "{{ record.ts|float }}"
- path:
- channel_id
value: "{{ stream_partition.get('channel') }}"
incremental_sync:
type: DatetimeBasedCursor
cursor_field: float_ts
Expand All @@ -250,40 +238,6 @@ definitions:
type: MinMaxDatetime
datetime: "{{ now_utc().strftime('%Y-%m-%dT%H:%M:%SZ') }}"
datetime_format: "%Y-%m-%dT%H:%M:%SZ"

threads_stream:
$ref: "#/definitions/stream_base"
$parameters:
name: threads
path: conversations.replies
field_path: messages
primary_key:
- channel_id
- ts
retriever:
type: SimpleRetriever
requester:
$ref: "#/definitions/requester"
request_parameters:
channel: "{{ stream_slice['parent_slice']['channel'] }}"
inclusive: "true"
record_selector:
$ref: "#/definitions/selector"
paginator:
$ref: "#/definitions/default_paginator"
partition_router:
type: SubstreamPartitionRouter
parent_stream_configs:
- type: ParentStreamConfig
stream:
$ref: "#/definitions/channel_messages_stream"
parent_key: ts
partition_field: float_ts
incremental_dependency: true
request_option:
type: RequestOption
field_name: "ts"
inject_into: "request_parameter"
transformations:
- type: AddFields
fields:
Expand All @@ -293,28 +247,12 @@ definitions:
- path:
- channel_id
value: "{{ stream_partition.get('channel') }}"
incremental_sync:
type: DatetimeBasedCursor
cursor_field: ts
cursor_datetime_formats:
- "%s_as_float"
datetime_format: "%s_as_float"
start_datetime:
type: MinMaxDatetime
datetime: "{{ config['start_date'] }}"
datetime_format: "%Y-%m-%dT%H:%M:%SZ"
global_parent_cursor: true
is_client_side_incremental: true
state_migrations:
- type: CustomStateMigration
class_name: "source_slack.components.threads_state_migrations.ThreadsStateMigration"

streams:
- "#/definitions/users_stream"
- "#/definitions/channels_stream"
- "#/definitions/channel_members_stream"
- "#/definitions/channel_messages_stream"
- "#/definitions/threads_stream"

check:
type: CheckStream
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,19 +38,21 @@ def get_threads_stream(self, config: Mapping[str, Any]) -> HttpStream:
end_date = end_date and pendulum.parse(end_date)
threads_lookback_window = pendulum.Duration(days=config["lookback_window"])
channel_filter = config.get("channel_filter", [])
include_private_channels = config.get("include_private_channels", False)
threads = Threads(
authenticator=authenticator,
default_start_date=default_start_date,
end_date=end_date,
lookback_window=threads_lookback_window,
channel_filter=channel_filter,
include_private_channels=include_private_channels,
)
return threads

def streams(self, config: Mapping[str, Any]) -> List[Stream]:
declarative_streams = super().streams(config)

# threads_stream = self.get_threads_stream(config)
# declarative_streams.append(threads_stream)
threads_stream = self.get_threads_stream(config)
declarative_streams.append(threads_stream)

return declarative_streams