Skip to content

feat: add IncrementingCountCursor #346

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 20 commits into from
Feb 28, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
35 changes: 28 additions & 7 deletions airbyte_cdk/sources/declarative/concurrent_declarative_source.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,9 @@
from airbyte_cdk.sources.declarative.models.declarative_component_schema import (
DatetimeBasedCursor as DatetimeBasedCursorModel,
)
from airbyte_cdk.sources.declarative.models.declarative_component_schema import (
IncrementingCountCursor as IncrementingCountCursorModel,
)
from airbyte_cdk.sources.declarative.parsers.model_to_component_factory import (
ModelToComponentFactory,
)
Expand Down Expand Up @@ -222,7 +225,7 @@ def _group_streams(
and not incremental_sync_component_definition
)

if self._is_datetime_incremental_without_partition_routing(
if self._is_concurrent_cursor_incremental_without_partition_routing(
declarative_stream, incremental_sync_component_definition
):
stream_state = self._connector_state_manager.get_stream_state(
Expand Down Expand Up @@ -254,15 +257,26 @@ def _group_streams(
stream_slicer=declarative_stream.retriever.stream_slicer,
)
else:
cursor = (
self._constructor.create_concurrent_cursor_from_datetime_based_cursor(
if (
incremental_sync_component_definition
and incremental_sync_component_definition.get("type")
== IncrementingCountCursorModel.__name__
):
cursor = self._constructor.create_concurrent_cursor_from_incrementing_count_cursor(
model_type=IncrementingCountCursorModel,
component_definition=incremental_sync_component_definition, # type: ignore # Not None because of the if condition above
stream_name=declarative_stream.name,
stream_namespace=declarative_stream.namespace,
config=config or {},
)
else:
cursor = self._constructor.create_concurrent_cursor_from_datetime_based_cursor(
model_type=DatetimeBasedCursorModel,
component_definition=incremental_sync_component_definition, # type: ignore # Not None because of the if condition above
stream_name=declarative_stream.name,
stream_namespace=declarative_stream.namespace,
config=config or {},
)
)
partition_generator = StreamSlicerPartitionGenerator(
partition_factory=DeclarativePartitionFactory(
declarative_stream.name,
Expand Down Expand Up @@ -389,19 +403,26 @@ def _group_streams(

return concurrent_streams, synchronous_streams

def _is_datetime_incremental_without_partition_routing(
def _is_concurrent_cursor_incremental_without_partition_routing(
self,
declarative_stream: DeclarativeStream,
incremental_sync_component_definition: Mapping[str, Any] | None,
) -> bool:
return (
incremental_sync_component_definition is not None
and bool(incremental_sync_component_definition)
and incremental_sync_component_definition.get("type", "")
== DatetimeBasedCursorModel.__name__
and (
incremental_sync_component_definition.get("type", "")
in (DatetimeBasedCursorModel.__name__, IncrementingCountCursorModel.__name__)
)
and hasattr(declarative_stream.retriever, "stream_slicer")
and (
isinstance(declarative_stream.retriever.stream_slicer, DatetimeBasedCursor)
# IncrementingCountCursorModel is hardcoded to be of type DatetimeBasedCursor
# add isintance check here if we want to create a Declarative IncrementingCountCursor
# or isinstance(
# declarative_stream.retriever.stream_slicer, IncrementingCountCursor
# )
or isinstance(declarative_stream.retriever.stream_slicer, AsyncJobPartitionRouter)
)
)
Expand Down
39 changes: 39 additions & 0 deletions airbyte_cdk/sources/declarative/declarative_component_schema.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -777,6 +777,44 @@ definitions:
type:
type: string
enum: [LegacyToPerPartitionStateMigration]
IncrementingCountCursor:
title: Incrementing Count Cursor
description: Cursor that allows for incremental sync according to a continuously increasing integer.
type: object
required:
- type
- cursor_field
properties:
type:
type: string
enum: [IncrementingCountCursor]
cursor_field:
title: Cursor Field
description: The location of the value on a record that will be used as a bookmark during sync. To ensure no data loss, the API must return records in ascending order based on the cursor field. Nested fields are not supported, so the field must be at the top level of the record. You can use a combination of Add Field and Remove Field transformations to move the nested field to the top.
type: string
interpolation_context:
- config
examples:
- "created_at"
- "{{ config['record_cursor'] }}"
start_value:
title: Start Value
description: The value that determines the earliest record that should be synced.
anyOf:
- type: string
- type: integer
interpolation_context:
- config
examples:
- 0
- "{{ config['start_value'] }}"
start_value_option:
title: Inject Start Value Into Outgoing HTTP Request
description: Optionally configures how the start value will be sent in requests to the source API.
"$ref": "#/definitions/RequestOption"
$parameters:
type: object
additionalProperties: true
DatetimeBasedCursor:
title: Datetime Based Cursor
description: Cursor to provide incremental capabilities over datetime.
Expand Down Expand Up @@ -1318,6 +1356,7 @@ definitions:
anyOf:
- "$ref": "#/definitions/CustomIncrementalSync"
- "$ref": "#/definitions/DatetimeBasedCursor"
- "$ref": "#/definitions/IncrementingCountCursor"
name:
title: Name
description: The stream name.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1508,6 +1508,28 @@ class AuthFlow(BaseModel):
oauth_config_specification: Optional[OAuthConfigSpecification] = None


class IncrementingCountCursor(BaseModel):
type: Literal["IncrementingCountCursor"]
cursor_field: str = Field(
...,
description="The location of the value on a record that will be used as a bookmark during sync. To ensure no data loss, the API must return records in ascending order based on the cursor field. Nested fields are not supported, so the field must be at the top level of the record. You can use a combination of Add Field and Remove Field transformations to move the nested field to the top.",
examples=["created_at", "{{ config['record_cursor'] }}"],
title="Cursor Field",
)
start_value: Optional[Union[str, int]] = Field(
None,
description="The value that determines the earliest record that should be synced.",
examples=[0, "{{ config['start_value'] }}"],
title="Start Value",
)
start_value_option: Optional[RequestOption] = Field(
None,
description="Optionally configures how the start value will be sent in requests to the source API.",
title="Inject Start Value Into Outgoing HTTP Request",
)
parameters: Optional[Dict[str, Any]] = Field(None, alias="$parameters")


class DatetimeBasedCursor(BaseModel):
type: Literal["DatetimeBasedCursor"]
clamping: Optional[Clamping] = Field(
Expand Down Expand Up @@ -1948,7 +1970,9 @@ class Config:
description="Component used to coordinate how records are extracted across stream slices and request pages.",
title="Retriever",
)
incremental_sync: Optional[Union[CustomIncrementalSync, DatetimeBasedCursor]] = Field(
incremental_sync: Optional[
Union[CustomIncrementalSync, DatetimeBasedCursor, IncrementingCountCursor]
] = Field(
None,
description="Component used to fetch data incrementally based on a time field in the data.",
title="Incremental Sync",
Expand Down
112 changes: 112 additions & 0 deletions airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py
Original file line number Diff line number Diff line change
Expand Up @@ -245,6 +245,9 @@
from airbyte_cdk.sources.declarative.models.declarative_component_schema import (
HttpResponseFilter as HttpResponseFilterModel,
)
from airbyte_cdk.sources.declarative.models.declarative_component_schema import (
IncrementingCountCursor as IncrementingCountCursorModel,
)
from airbyte_cdk.sources.declarative.models.declarative_component_schema import (
InlineSchemaLoader as InlineSchemaLoaderModel,
)
Expand Down Expand Up @@ -496,6 +499,9 @@
CustomFormatConcurrentStreamStateConverter,
DateTimeStreamStateConverter,
)
from airbyte_cdk.sources.streams.concurrent.state_converters.incrementing_count_stream_state_converter import (
IncrementingCountStreamStateConverter,
)
from airbyte_cdk.sources.streams.http.error_handlers.response_models import ResponseAction
from airbyte_cdk.sources.types import Config
from airbyte_cdk.sources.utils.transform import TransformConfig, TypeTransformer
Expand Down Expand Up @@ -584,6 +590,7 @@ def _init_mappings(self) -> None:
FlattenFieldsModel: self.create_flatten_fields,
DpathFlattenFieldsModel: self.create_dpath_flatten_fields,
IterableDecoderModel: self.create_iterable_decoder,
IncrementingCountCursorModel: self.create_incrementing_count_cursor,
XmlDecoderModel: self.create_xml_decoder,
JsonFileSchemaLoaderModel: self.create_json_file_schema_loader,
DynamicSchemaLoaderModel: self.create_dynamic_schema_loader,
Expand Down Expand Up @@ -1189,6 +1196,70 @@ def create_concurrent_cursor_from_datetime_based_cursor(
clamping_strategy=clamping_strategy,
)

def create_concurrent_cursor_from_incrementing_count_cursor(
self,
model_type: Type[BaseModel],
component_definition: ComponentDefinition,
stream_name: str,
stream_namespace: Optional[str],
config: Config,
message_repository: Optional[MessageRepository] = None,
**kwargs: Any,
) -> ConcurrentCursor:
# Per-partition incremental streams can dynamically create child cursors which will pass their current
# state via the stream_state keyword argument. Incremental syncs without parent streams use the
# incoming state and connector_state_manager that is initialized when the component factory is created
stream_state = (
self._connector_state_manager.get_stream_state(stream_name, stream_namespace)
if "stream_state" not in kwargs
else kwargs["stream_state"]
)

component_type = component_definition.get("type")
if component_definition.get("type") != model_type.__name__:
raise ValueError(
f"Expected manifest component of type {model_type.__name__}, but received {component_type} instead"
)

incrementing_count_cursor_model = model_type.parse_obj(component_definition)

if not isinstance(incrementing_count_cursor_model, IncrementingCountCursorModel):
raise ValueError(
f"Expected {model_type.__name__} component, but received {incrementing_count_cursor_model.__class__.__name__}"
)

interpolated_start_value = (
InterpolatedString.create(
incrementing_count_cursor_model.start_value, # type: ignore
parameters=incrementing_count_cursor_model.parameters or {},
)
if incrementing_count_cursor_model.start_value
else 0
)

interpolated_cursor_field = InterpolatedString.create(
incrementing_count_cursor_model.cursor_field,
parameters=incrementing_count_cursor_model.parameters or {},
)
cursor_field = CursorField(interpolated_cursor_field.eval(config=config))

connector_state_converter = IncrementingCountStreamStateConverter(
is_sequential_state=True, # ConcurrentPerPartitionCursor only works with sequential state
)

return ConcurrentCursor(
stream_name=stream_name,
stream_namespace=stream_namespace,
stream_state=stream_state,
message_repository=message_repository or self._message_repository,
connector_state_manager=self._connector_state_manager,
connector_state_converter=connector_state_converter,
cursor_field=cursor_field,
slice_boundary_fields=None,
start=interpolated_start_value, # type: ignore # Having issues w/ inspection for GapType and CursorValueType as shown in existing tests. Confirmed functionality is working in practice
end_provider=connector_state_converter.get_end_provider(), # type: ignore # Having issues w/ inspection for GapType and CursorValueType as shown in existing tests. Confirmed functionality is working in practice
)

def _assemble_weekday(self, weekday: str) -> Weekday:
match weekday:
case "MONDAY":
Expand Down Expand Up @@ -1622,6 +1693,31 @@ def create_declarative_stream(
config=config,
parameters=model.parameters or {},
)
elif model.incremental_sync and isinstance(
model.incremental_sync, IncrementingCountCursorModel
):
cursor_model: IncrementingCountCursorModel = model.incremental_sync # type: ignore

start_time_option = (
self._create_component_from_model(
cursor_model.start_value_option, # type: ignore # mypy still thinks cursor_model of type DatetimeBasedCursor
config,
parameters=cursor_model.parameters or {},
)
if cursor_model.start_value_option # type: ignore # mypy still thinks cursor_model of type DatetimeBasedCursor
else None
)

# The concurrent engine defaults the start/end fields on the slice to "start" and "end", but
# the default DatetimeBasedRequestOptionsProvider() sets them to start_time/end_time
partition_field_start = "start"

request_options_provider = DatetimeBasedRequestOptionsProvider(
start_time_option=start_time_option,
partition_field_start=partition_field_start,
config=config,
parameters=model.parameters or {},
)
else:
request_options_provider = None

Expand Down Expand Up @@ -2111,6 +2207,22 @@ def create_gzip_decoder(
stream_response=False if self._emit_connector_builder_messages else True,
)

@staticmethod
def create_incrementing_count_cursor(
model: IncrementingCountCursorModel, config: Config, **kwargs: Any
) -> DatetimeBasedCursor:
# This should not actually get used anywhere at runtime, but needed to add this to pass checks since
# we still parse models into components. The issue is that there's no runtime implementation of a
# IncrementingCountCursor.
# A known and expected issue with this stub is running a check with the declared IncrementingCountCursor because it is run without ConcurrentCursor.
return DatetimeBasedCursor(
cursor_field=model.cursor_field,
datetime_format="%Y-%m-%d",
start_datetime="2024-12-12",
config=config,
parameters={},
)

@staticmethod
def create_iterable_decoder(
model: IterableDecoderModel, config: Config, **kwargs: Any
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,14 +4,15 @@

from abc import ABC, abstractmethod
from enum import Enum
from typing import TYPE_CHECKING, Any, List, MutableMapping, Optional, Tuple
from typing import TYPE_CHECKING, Any, Callable, List, MutableMapping, Optional, Tuple

if TYPE_CHECKING:
from airbyte_cdk.sources.streams.concurrent.cursor import CursorField


class ConcurrencyCompatibleStateType(Enum):
date_range = "date-range"
integer = "integer"


class AbstractStreamStateConverter(ABC):
Expand Down
Loading
Loading