From b9a421ba15aa108333d8272f486bfe5bd5a6c790 Mon Sep 17 00:00:00 2001 From: Artem Inzhyyants <36314070+artem1205@users.noreply.github.com> Date: Mon, 3 Jun 2024 17:20:34 +0200 Subject: [PATCH] feat(airbyte-cdk): add client side incremental sync (#38099) Signed-off-by: Artem Inzhyyants --- .../declarative_component_schema.yaml | 4 + .../declarative/extractors/record_filter.py | 80 ++++++++- .../incremental/datetime_based_cursor.py | 18 +- .../models/declarative_component_schema.py | 5 + .../parsers/model_to_component_factory.py | 34 +++- airbyte-cdk/python/unit_tests/__init__.py | 1 + .../extractors/test_record_filter.py | 155 +++++++++++++++--- .../test_model_to_component_factory.py | 155 +++++++++++++++++- 8 files changed, 415 insertions(+), 37 deletions(-) diff --git a/airbyte-cdk/python/airbyte_cdk/sources/declarative/declarative_component_schema.yaml b/airbyte-cdk/python/airbyte_cdk/sources/declarative/declarative_component_schema.yaml index 678e70ae2a2f..ddab9445ee93 100644 --- a/airbyte-cdk/python/airbyte_cdk/sources/declarative/declarative_component_schema.yaml +++ b/airbyte-cdk/python/airbyte_cdk/sources/declarative/declarative_component_schema.yaml @@ -801,6 +801,10 @@ definitions: title: Whether the target API is formatted as a data feed description: A data feed API is an API that does not allow filtering and paginates the content from the most recent to the least recent. Given this, the CDK needs to know when to stop paginating and this field will generate a stop condition for pagination. type: boolean + is_client_side_incremental: + title: Whether the target API does not support filtering and returns all data (the cursor filters records in the client instead of the API side) + description: If the target API endpoint does not take cursor values to filter records and returns all records anyway, the connector with this cursor will filter out records locally, and only emit new records from the last sync, hence incremental. This means that all records would be read from the API, but only new records will be emitted to the destination. + type: boolean lookback_window: title: Lookback Window description: Time interval before the start_datetime to read data for, e.g. P1M for looking back one month. diff --git a/airbyte-cdk/python/airbyte_cdk/sources/declarative/extractors/record_filter.py b/airbyte-cdk/python/airbyte_cdk/sources/declarative/extractors/record_filter.py index 78e55408a07a..3af9ddaa7e96 100644 --- a/airbyte-cdk/python/airbyte_cdk/sources/declarative/extractors/record_filter.py +++ b/airbyte-cdk/python/airbyte_cdk/sources/declarative/extractors/record_filter.py @@ -1,10 +1,11 @@ # # Copyright (c) 2023 Airbyte, Inc., all rights reserved. # - +import datetime from dataclasses import InitVar, dataclass from typing import Any, Iterable, Mapping, Optional +from airbyte_cdk.sources.declarative.incremental import DatetimeBasedCursor, PerPartitionCursor from airbyte_cdk.sources.declarative.interpolation.interpolated_boolean import InterpolatedBoolean from airbyte_cdk.sources.types import Config, StreamSlice, StreamState @@ -36,3 +37,80 @@ def filter_records( for record in records: if self._filter_interpolator.eval(self.config, record=record, **kwargs): yield record + + +class ClientSideIncrementalRecordFilterDecorator(RecordFilter): + """ + Applies a filter to a list of records to exclude those that are older than the stream_state/start_date. + + :param DatetimeBasedCursor date_time_based_cursor: Cursor used to extract datetime values + :param PerPartitionCursor per_partition_cursor: Optional Cursor used for mapping cursor value in nested stream_state + """ + + def __init__( + self, date_time_based_cursor: DatetimeBasedCursor, per_partition_cursor: Optional[PerPartitionCursor] = None, **kwargs: Any + ): + super().__init__(**kwargs) + self._date_time_based_cursor = date_time_based_cursor + self._per_partition_cursor = per_partition_cursor + + @property + def _cursor_field(self) -> str: + return self._date_time_based_cursor.cursor_field.eval(self._date_time_based_cursor.config) # type: ignore # eval returns a string in this context + + @property + def _start_date_from_config(self) -> datetime.datetime: + return self._date_time_based_cursor._start_datetime.get_datetime(self._date_time_based_cursor.config) + + @property + def _end_datetime(self) -> datetime.datetime: + return ( + self._date_time_based_cursor._end_datetime.get_datetime(self._date_time_based_cursor.config) + if self._date_time_based_cursor._end_datetime + else datetime.datetime.max + ) + + def filter_records( + self, + records: Iterable[Mapping[str, Any]], + stream_state: StreamState, + stream_slice: Optional[StreamSlice] = None, + next_page_token: Optional[Mapping[str, Any]] = None, + ) -> Iterable[Mapping[str, Any]]: + state_value = self._get_state_value(stream_state, stream_slice or StreamSlice(partition={}, cursor_slice={})) + filter_date: datetime.datetime = self._get_filter_date(state_value) + records = ( + record + for record in records + if self._end_datetime > self._date_time_based_cursor.parse_date(record[self._cursor_field]) > filter_date + ) + if self.condition: + records = super().filter_records( + records=records, stream_state=stream_state, stream_slice=stream_slice, next_page_token=next_page_token + ) + yield from records + + def _get_state_value(self, stream_state: StreamState, stream_slice: StreamSlice) -> Optional[str]: + """ + Return cursor_value or None in case it was not found. + Cursor_value may be empty if: + 1. It is an initial sync => no stream_state exist at all. + 2. In Parent-child stream, and we already make initial sync, so stream_state is present. + During the second read, we receive one extra record from parent and therefore no stream_state for this record will be found. + + :param StreamState stream_state: State + :param StreamSlice stream_slice: Current Stream slice + :return Optional[str]: cursor_value in case it was found, otherwise None. + """ + if self._per_partition_cursor: + # self._per_partition_cursor is the same object that DeclarativeStream uses to save/update stream_state + partition_state = self._per_partition_cursor.select_state(stream_slice=stream_slice) + return partition_state.get(self._cursor_field) if partition_state else None + return stream_state.get(self._cursor_field) + + def _get_filter_date(self, state_value: Optional[str]) -> datetime.datetime: + start_date_parsed = self._start_date_from_config + if state_value: + return max(start_date_parsed, self._date_time_based_cursor.parse_date(state_value)) + else: + return start_date_parsed diff --git a/airbyte-cdk/python/airbyte_cdk/sources/declarative/incremental/datetime_based_cursor.py b/airbyte-cdk/python/airbyte_cdk/sources/declarative/incremental/datetime_based_cursor.py index baf794747a0b..495d08db65f0 100644 --- a/airbyte-cdk/python/airbyte_cdk/sources/declarative/incremental/datetime_based_cursor.py +++ b/airbyte-cdk/python/airbyte_cdk/sources/declarative/incremental/datetime_based_cursor.py @@ -87,7 +87,7 @@ def __post_init__(self, parameters: Mapping[str, Any]) -> None: else datetime.timedelta.max ) self._cursor_granularity = self._parse_timedelta(self.cursor_granularity) - self._cursor_field = InterpolatedString.create(self.cursor_field, parameters=parameters) + self.cursor_field = InterpolatedString.create(self.cursor_field, parameters=parameters) self._lookback_window = InterpolatedString.create(self.lookback_window, parameters=parameters) if self.lookback_window else None self._partition_field_start = InterpolatedString.create(self.partition_field_start or "start_time", parameters=parameters) self._partition_field_end = InterpolatedString.create(self.partition_field_end or "end_time", parameters=parameters) @@ -103,7 +103,7 @@ def __post_init__(self, parameters: Mapping[str, Any]) -> None: self.cursor_datetime_formats = [self.datetime_format] def get_stream_state(self) -> StreamState: - return {self._cursor_field.eval(self.config): self._cursor} if self._cursor else {} + return {self.cursor_field.eval(self.config): self._cursor} if self._cursor else {} # type: ignore # cursor_field is converted to an InterpolatedString in __post_init__ def set_initial_state(self, stream_state: StreamState) -> None: """ @@ -112,7 +112,7 @@ def set_initial_state(self, stream_state: StreamState) -> None: :param stream_state: The state of the stream as returned by get_stream_state """ - self._cursor = stream_state.get(self._cursor_field.eval(self.config)) if stream_state else None + self._cursor = stream_state.get(self.cursor_field.eval(self.config)) if stream_state else None # type: ignore # cursor_field is converted to an InterpolatedString in __post_init__ def observe(self, stream_slice: StreamSlice, record: Record) -> None: """ @@ -122,7 +122,7 @@ def observe(self, stream_slice: StreamSlice, record: Record) -> None: :param record: the most recently-read record, which the cursor can use to update the stream state. Outwardly-visible changes to the stream state may need to be deferred depending on whether the source reliably orders records by the cursor field. """ - record_cursor_value = record.get(self._cursor_field.eval(self.config)) + record_cursor_value = record.get(self.cursor_field.eval(self.config)) # type: ignore # cursor_field is converted to an InterpolatedString in __post_init__ # if the current record has no cursor value, we cannot meaningfully update the state based on it, so there is nothing more to do if not record_cursor_value: return @@ -186,8 +186,8 @@ def _select_best_end_datetime(self) -> datetime.datetime: return min(self._end_datetime.get_datetime(self.config), now) def _calculate_cursor_datetime_from_state(self, stream_state: Mapping[str, Any]) -> datetime.datetime: - if self._cursor_field.eval(self.config, stream_state=stream_state) in stream_state: - return self.parse_date(stream_state[self._cursor_field.eval(self.config)]) + if self.cursor_field.eval(self.config, stream_state=stream_state) in stream_state: # type: ignore # cursor_field is converted to an InterpolatedString in __post_init__ + return self.parse_date(stream_state[self.cursor_field.eval(self.config)]) # type: ignore # cursor_field is converted to an InterpolatedString in __post_init__ return datetime.datetime.min.replace(tzinfo=datetime.timezone.utc) def _format_datetime(self, dt: datetime.datetime) -> str: @@ -300,7 +300,7 @@ def _get_request_options(self, option_type: RequestOptionType, stream_slice: Opt return options def should_be_synced(self, record: Record) -> bool: - cursor_field = self._cursor_field.eval(self.config) + cursor_field = self.cursor_field.eval(self.config) # type: ignore # cursor_field is converted to an InterpolatedString in __post_init__ record_cursor_value = record.get(cursor_field) if not record_cursor_value: self._send_log( @@ -315,7 +315,7 @@ def should_be_synced(self, record: Record) -> bool: def _is_within_daterange_boundaries( self, record: Record, start_datetime_boundary: Union[datetime.datetime, str], end_datetime_boundary: Union[datetime.datetime, str] ) -> bool: - cursor_field = self._cursor_field.eval(self.config) + cursor_field = self.cursor_field.eval(self.config) # type: ignore # cursor_field is converted to an InterpolatedString in __post_init__ record_cursor_value = record.get(cursor_field) if not record_cursor_value: self._send_log( @@ -339,7 +339,7 @@ def _send_log(self, level: Level, message: str) -> None: ) def is_greater_than_or_equal(self, first: Record, second: Record) -> bool: - cursor_field = self._cursor_field.eval(self.config) + cursor_field = self.cursor_field.eval(self.config) # type: ignore # cursor_field is converted to an InterpolatedString in __post_init__ first_cursor_value = first.get(cursor_field) second_cursor_value = second.get(cursor_field) if first_cursor_value and second_cursor_value: diff --git a/airbyte-cdk/python/airbyte_cdk/sources/declarative/models/declarative_component_schema.py b/airbyte-cdk/python/airbyte_cdk/sources/declarative/models/declarative_component_schema.py index 5bff43aef9a5..c0c51145ad04 100644 --- a/airbyte-cdk/python/airbyte_cdk/sources/declarative/models/declarative_component_schema.py +++ b/airbyte-cdk/python/airbyte_cdk/sources/declarative/models/declarative_component_schema.py @@ -937,6 +937,11 @@ class DatetimeBasedCursor(BaseModel): description='A data feed API is an API that does not allow filtering and paginates the content from the most recent to the least recent. Given this, the CDK needs to know when to stop paginating and this field will generate a stop condition for pagination.', title='Whether the target API is formatted as a data feed', ) + is_client_side_incremental: Optional[bool] = Field( + None, + description='If the target API endpoint does not take cursor values to filter records and returns all records anyway, the connector with this cursor will filter out records locally, and only emit new records from the last sync, hence incremental. This means that all records would be read from the API, but only new records will be emitted to the destination.', + title='Whether the target API does not support filtering and returns all data (the cursor filters records in the client instead of the API side)', + ) lookback_window: Optional[str] = Field( None, description='Time interval before the start_datetime to read data for, e.g. P1M for looking back one month.', diff --git a/airbyte-cdk/python/airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py b/airbyte-cdk/python/airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py index 5940673de590..6b568321c308 100644 --- a/airbyte-cdk/python/airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py +++ b/airbyte-cdk/python/airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py @@ -7,7 +7,7 @@ import importlib import inspect import re -from typing import Any, Callable, List, Mapping, Optional, Type, Union, get_args, get_origin, get_type_hints +from typing import Any, Callable, Dict, List, Mapping, Optional, Type, Union, get_args, get_origin, get_type_hints from airbyte_cdk.models import Level from airbyte_cdk.sources.declarative.auth import DeclarativeOauth2Authenticator, JwtAuthenticator @@ -27,6 +27,7 @@ from airbyte_cdk.sources.declarative.declarative_stream import DeclarativeStream from airbyte_cdk.sources.declarative.decoders import JsonDecoder from airbyte_cdk.sources.declarative.extractors import DpathExtractor, RecordFilter, RecordSelector +from airbyte_cdk.sources.declarative.extractors.record_filter import ClientSideIncrementalRecordFilterDecorator from airbyte_cdk.sources.declarative.extractors.record_selector import SCHEMA_TRANSFORMER_TYPE_MAPPING from airbyte_cdk.sources.declarative.incremental import ( CursorFactory, @@ -558,6 +559,8 @@ def create_datetime_based_cursor(self, model: DatetimeBasedCursorModel, config: end_datetime: Union[str, MinMaxDatetime, None] = None if model.is_data_feed and model.end_datetime: raise ValueError("Data feed does not support end_datetime") + if model.is_data_feed and model.is_client_side_incremental: + raise ValueError("`Client side incremental` cannot be applied with `data feed`. Choose only 1 from them.") if model.end_datetime: end_datetime = ( model.end_datetime if isinstance(model.end_datetime, str) else self.create_min_max_datetime(model.end_datetime, config) @@ -611,6 +614,18 @@ def create_declarative_stream(self, model: DeclarativeStreamModel, config: Confi stop_condition_on_cursor = ( model.incremental_sync and hasattr(model.incremental_sync, "is_data_feed") and model.incremental_sync.is_data_feed ) + client_side_incremental_sync = None + if ( + model.incremental_sync + and hasattr(model.incremental_sync, "is_client_side_incremental") + and model.incremental_sync.is_client_side_incremental + ): + if combined_slicers and not isinstance(combined_slicers, (DatetimeBasedCursor, PerPartitionCursor)): + raise ValueError("Unsupported Slicer is used. PerPartitionCursor should be used here instead") + client_side_incremental_sync = { + "date_time_based_cursor": self._create_component_from_model(model=model.incremental_sync, config=config), + "per_partition_cursor": combined_slicers if isinstance(combined_slicers, PerPartitionCursor) else None, + } transformations = [] if model.transformations: for transformation_model in model.transformations: @@ -622,6 +637,7 @@ def create_declarative_stream(self, model: DeclarativeStreamModel, config: Confi primary_key=primary_key, stream_slicer=combined_slicers, stop_condition_on_cursor=stop_condition_on_cursor, + client_side_incremental_sync=client_side_incremental_sync, transformations=transformations, ) cursor_field = model.incremental_sync.cursor_field if model.incremental_sync else None @@ -982,11 +998,19 @@ def create_record_selector( config: Config, *, transformations: List[RecordTransformation], + client_side_incremental_sync: Optional[Dict[str, Any]] = None, **kwargs: Any, ) -> RecordSelector: assert model.schema_normalization is not None # for mypy extractor = self._create_component_from_model(model=model.extractor, config=config) record_filter = self._create_component_from_model(model.record_filter, config=config) if model.record_filter else None + if client_side_incremental_sync: + record_filter = ClientSideIncrementalRecordFilterDecorator( + config=config, + parameters=model.parameters, + condition=model.record_filter.condition if model.record_filter else None, + **client_side_incremental_sync, + ) schema_normalization = TypeTransformer(SCHEMA_TRANSFORMER_TYPE_MAPPING[model.schema_normalization]) return RecordSelector( @@ -1038,10 +1062,16 @@ def create_simple_retriever( primary_key: Optional[Union[str, List[str], List[List[str]]]], stream_slicer: Optional[StreamSlicer], stop_condition_on_cursor: bool = False, + client_side_incremental_sync: Optional[Dict[str, Any]] = None, transformations: List[RecordTransformation], ) -> SimpleRetriever: requester = self._create_component_from_model(model=model.requester, config=config, name=name) - record_selector = self._create_component_from_model(model=model.record_selector, config=config, transformations=transformations) + record_selector = self._create_component_from_model( + model=model.record_selector, + config=config, + transformations=transformations, + client_side_incremental_sync=client_side_incremental_sync, + ) url_base = model.requester.url_base if hasattr(model.requester, "url_base") else requester.get_url_base() stream_slicer = stream_slicer or SinglePartitionRouter(parameters={}) cursor = stream_slicer if isinstance(stream_slicer, DeclarativeCursor) else None diff --git a/airbyte-cdk/python/unit_tests/__init__.py b/airbyte-cdk/python/unit_tests/__init__.py index b6b74b56a60d..51e56f3ad0e1 100644 --- a/airbyte-cdk/python/unit_tests/__init__.py +++ b/airbyte-cdk/python/unit_tests/__init__.py @@ -2,5 +2,6 @@ # Import the thing that needs to be imported to stop the tests from falling over from airbyte_cdk.sources.declarative.manifest_declarative_source import ManifestDeclarativeSource + # "Use" the thing so that the linter doesn't complain placeholder = ManifestDeclarativeSource diff --git a/airbyte-cdk/python/unit_tests/sources/declarative/extractors/test_record_filter.py b/airbyte-cdk/python/unit_tests/sources/declarative/extractors/test_record_filter.py index 2104e4243120..1ce4d249ac69 100644 --- a/airbyte-cdk/python/unit_tests/sources/declarative/extractors/test_record_filter.py +++ b/airbyte-cdk/python/unit_tests/sources/declarative/extractors/test_record_filter.py @@ -1,47 +1,50 @@ # # Copyright (c) 2023 Airbyte, Inc., all rights reserved. # +from typing import List, Mapping, Optional import pytest -from airbyte_cdk.sources.declarative.extractors.record_filter import RecordFilter +from airbyte_cdk.sources.declarative.datetime import MinMaxDatetime +from airbyte_cdk.sources.declarative.extractors.record_filter import ClientSideIncrementalRecordFilterDecorator, RecordFilter +from airbyte_cdk.sources.declarative.incremental import CursorFactory, DatetimeBasedCursor, PerPartitionCursor +from airbyte_cdk.sources.declarative.interpolation import InterpolatedString +from airbyte_cdk.sources.declarative.models import CustomRetriever, DeclarativeStream, ParentStreamConfig, SubstreamPartitionRouter +from airbyte_cdk.sources.declarative.types import StreamSlice @pytest.mark.parametrize( - "test_name, filter_template, records, expected_records", + "filter_template, records, expected_records", [ ( - "test_using_state_filter", - "{{ record['created_at'] > stream_state['created_at'] }}", - [{"id": 1, "created_at": "06-06-21"}, {"id": 2, "created_at": "06-07-21"}, {"id": 3, "created_at": "06-08-21"}], - [{"id": 2, "created_at": "06-07-21"}, {"id": 3, "created_at": "06-08-21"}], + "{{ record['created_at'] > stream_state['created_at'] }}", + [{"id": 1, "created_at": "06-06-21"}, {"id": 2, "created_at": "06-07-21"}, {"id": 3, "created_at": "06-08-21"}], + [{"id": 2, "created_at": "06-07-21"}, {"id": 3, "created_at": "06-08-21"}], ), ( - "test_with_slice_filter", - "{{ record['last_seen'] >= stream_slice['last_seen'] }}", - [{"id": 1, "last_seen": "06-06-21"}, {"id": 2, "last_seen": "06-07-21"}, {"id": 3, "last_seen": "06-10-21"}], - [{"id": 3, "last_seen": "06-10-21"}], + "{{ record['last_seen'] >= stream_slice['last_seen'] }}", + [{"id": 1, "last_seen": "06-06-21"}, {"id": 2, "last_seen": "06-07-21"}, {"id": 3, "last_seen": "06-10-21"}], + [{"id": 3, "last_seen": "06-10-21"}], ), ( - "test_with_next_page_token_filter", - "{{ record['id'] >= next_page_token['last_seen_id'] }}", - [{"id": 11}, {"id": 12}, {"id": 13}, {"id": 14}, {"id": 15}], - [{"id": 14}, {"id": 15}], + "{{ record['id'] >= next_page_token['last_seen_id'] }}", + [{"id": 11}, {"id": 12}, {"id": 13}, {"id": 14}, {"id": 15}], + [{"id": 14}, {"id": 15}], ), ( - "test_missing_filter_fields_return_no_results", - "{{ record['id'] >= next_page_token['path_to_nowhere'] }}", - [{"id": 11}, {"id": 12}, {"id": 13}, {"id": 14}, {"id": 15}], - [], + "{{ record['id'] >= next_page_token['path_to_nowhere'] }}", + [{"id": 11}, {"id": 12}, {"id": 13}, {"id": 14}, {"id": 15}], + [], ), ( - "test_using_parameters_filter", - "{{ record['created_at'] > parameters['created_at'] }}", - [{"id": 1, "created_at": "06-06-21"}, {"id": 2, "created_at": "06-07-21"}, {"id": 3, "created_at": "06-08-21"}], - [{"id": 3, "created_at": "06-08-21"}], + "{{ record['created_at'] > parameters['created_at'] }}", + [{"id": 1, "created_at": "06-06-21"}, {"id": 2, "created_at": "06-07-21"}, {"id": 3, "created_at": "06-08-21"}], + [{"id": 3, "created_at": "06-08-21"}], ), ], + ids=["test_using_state_filter", "test_with_slice_filter", "test_with_next_page_token_filter", + "test_missing_filter_fields_return_no_results", "test_using_parameters_filter", ] ) -def test_record_filter(test_name, filter_template, records, expected_records): +def test_record_filter(filter_template: str, records: List[Mapping], expected_records: List[Mapping]): config = {"response_override": "stop_if_you_see_me"} parameters = {"created_at": "06-07-21"} stream_state = {"created_at": "06-06-21"} @@ -53,3 +56,109 @@ def test_record_filter(test_name, filter_template, records, expected_records): records, stream_state=stream_state, stream_slice=stream_slice, next_page_token=next_page_token )) assert actual_records == expected_records + + +@pytest.mark.parametrize( + "stream_state, record_filter_expression, expected_record_ids", + [ + ({}, None, [2, 3]), + ({"created_at": "2021-01-03"}, None, [3]), + ({}, "{{ record['id'] % 2 == 1 }}", [3]), + ], + ids=["no_stream_state_no_record_filter", "with_stream_state_no_record_filter", "no_stream_state_with_record_filter"] +) +def test_client_side_record_filter_decorator_no_parent_stream(stream_state: Optional[Mapping], record_filter_expression: str, + expected_record_ids: List[int]): + records_to_filter = [ + {"id": 1, "created_at": "2020-01-03"}, + {"id": 2, "created_at": "2021-01-03"}, + {"id": 3, "created_at": "2021-01-04"}, + {"id": 4, "created_at": "2021-02-01"}, + ] + date_time_based_cursor = DatetimeBasedCursor( + start_datetime=MinMaxDatetime(datetime="2021-01-01", datetime_format="%Y-%m-%d", parameters={}), + end_datetime=MinMaxDatetime(datetime="2021-01-05", datetime_format="%Y-%m-%d", parameters={}), + step="P10Y", + cursor_field=InterpolatedString.create("created_at", parameters={}), + datetime_format="%Y-%m-%d", + cursor_granularity="P1D", + config={}, + parameters={}, + ) + + record_filter_decorator = ClientSideIncrementalRecordFilterDecorator( + config={}, + condition=record_filter_expression, + parameters={}, + date_time_based_cursor=date_time_based_cursor, + per_partition_cursor=None + ) + + filtered_records = list( + record_filter_decorator.filter_records(records=records_to_filter, stream_state=stream_state, stream_slice={}, next_page_token=None) + ) + + assert [x.get("id") for x in filtered_records] == expected_record_ids + + +@pytest.mark.parametrize( + "stream_state, expected_record_ids", + [ + ({}, [2, 3]), + ({"states": [{"some_parent_id": {"created_at": "2021-01-03"}}]}, [3]), + ], + ids=["no_stream_state_no_record_filter", "with_stream_state_no_record_filter"] +) +def test_client_side_record_filter_decorator_with_parent_stream(stream_state: Optional[Mapping], expected_record_ids: List[int]): + records_to_filter = [ + {"id": 1, "created_at": "2020-01-03"}, + {"id": 2, "created_at": "2021-01-03"}, + {"id": 3, "created_at": "2021-01-04"}, + {"id": 4, "created_at": "2021-02-01"}, + ] + date_time_based_cursor = DatetimeBasedCursor( + start_datetime=MinMaxDatetime(datetime="2021-01-01", datetime_format="%Y-%m-%d", parameters={}), + end_datetime=MinMaxDatetime(datetime="2021-01-05", datetime_format="%Y-%m-%d", parameters={}), + step="P10Y", + cursor_field=InterpolatedString.create("created_at", parameters={}), + datetime_format="%Y-%m-%d", + cursor_granularity="P1D", + config={}, + parameters={}, + ) + per_partition_cursor = PerPartitionCursor( + cursor_factory=CursorFactory( + lambda: date_time_based_cursor), + partition_router=SubstreamPartitionRouter( + type="SubstreamPartitionRouter", + parent_stream_configs=[ + ParentStreamConfig( + type="ParentStreamConfig", + parent_key="id", + partition_field="id", + stream=DeclarativeStream( + type="DeclarativeStream", + retriever=CustomRetriever( + type="CustomRetriever", + class_name="a_class_name" + ) + ) + ) + ] + ), + ) + if stream_state: + per_partition_cursor.set_initial_state({"states": [{"partition": {"id": "some_parent_id", "parent_slice": {}}, "cursor": {'created_at': '2021-01-03'}}]}) + record_filter_decorator = ClientSideIncrementalRecordFilterDecorator( + config={}, + parameters={}, + date_time_based_cursor=date_time_based_cursor, + per_partition_cursor=per_partition_cursor + ) + filtered_records = list( + record_filter_decorator.filter_records(records=records_to_filter, stream_state=stream_state, + stream_slice=StreamSlice(partition={"id": "some_parent_id", "parent_slice": {}}, cursor_slice={}), + next_page_token=None) + ) + + assert [x.get("id") for x in filtered_records] == expected_record_ids diff --git a/airbyte-cdk/python/unit_tests/sources/declarative/parsers/test_model_to_component_factory.py b/airbyte-cdk/python/unit_tests/sources/declarative/parsers/test_model_to_component_factory.py index fba43c12ea2c..44f39eaccfc9 100644 --- a/airbyte-cdk/python/unit_tests/sources/declarative/parsers/test_model_to_component_factory.py +++ b/airbyte-cdk/python/unit_tests/sources/declarative/parsers/test_model_to_component_factory.py @@ -22,6 +22,7 @@ from airbyte_cdk.sources.declarative.declarative_stream import DeclarativeStream from airbyte_cdk.sources.declarative.decoders import JsonDecoder from airbyte_cdk.sources.declarative.extractors import DpathExtractor, RecordFilter, RecordSelector +from airbyte_cdk.sources.declarative.extractors.record_filter import ClientSideIncrementalRecordFilterDecorator from airbyte_cdk.sources.declarative.incremental import DatetimeBasedCursor, PerPartitionCursor, ResumableFullRefreshCursor from airbyte_cdk.sources.declarative.interpolation import InterpolatedString from airbyte_cdk.sources.declarative.models import CheckStream as CheckStreamModel @@ -535,7 +536,7 @@ def test_datetime_based_cursor(): assert isinstance(stream_slicer, DatetimeBasedCursor) assert stream_slicer._step == datetime.timedelta(days=10) - assert stream_slicer._cursor_field.string == "created" + assert stream_slicer.cursor_field.string == "created" assert stream_slicer.cursor_granularity == "PT0.000001S" assert stream_slicer._lookback_window.string == "P5D" assert stream_slicer.start_time_option.inject_into == RequestOptionType.request_parameter @@ -651,7 +652,7 @@ def test_stream_with_incremental_and_retriever_with_partition_router(): assert isinstance(datetime_stream_slicer._end_datetime, MinMaxDatetime) assert datetime_stream_slicer._end_datetime.datetime.string == "{{ config['end_time'] }}" assert datetime_stream_slicer.step == "P10D" - assert datetime_stream_slicer._cursor_field.string == "created" + assert datetime_stream_slicer.cursor_field.string == "created" list_stream_slicer = stream.retriever.stream_slicer._partition_router assert isinstance(list_stream_slicer, ListPartitionRouter) @@ -862,6 +863,156 @@ def test_given_data_feed_and_incremental_then_raise_error(): ) +def test_client_side_incremental(): + content = """ +selector: + type: RecordSelector + extractor: + type: DpathExtractor + field_path: ["extractor_path"] +requester: + type: HttpRequester + name: "{{ parameters['name'] }}" + url_base: "https://api.sendgrid.com/v3/" + http_method: "GET" +list_stream: + type: DeclarativeStream + incremental_sync: + type: DatetimeBasedCursor + $parameters: + datetime_format: "%Y-%m-%dT%H:%M:%S.%f%z" + start_datetime: + type: MinMaxDatetime + datetime: "{{ config.get('start_date', '1970-01-01T00:00:00.0Z') }}" + datetime_format: "%Y-%m-%dT%H:%M:%S.%fZ" + cursor_field: "created" + is_client_side_incremental: true + retriever: + type: SimpleRetriever + name: "{{ parameters['name'] }}" + paginator: + type: DefaultPaginator + pagination_strategy: + type: "CursorPagination" + cursor_value: "{{ response._metadata.next }}" + page_size: 10 + requester: + $ref: "#/requester" + path: "/" + record_selector: + $ref: "#/selector" + $parameters: + name: "lists" + """ + + parsed_manifest = YamlDeclarativeSource._parse(content) + resolved_manifest = resolver.preprocess_manifest(parsed_manifest) + stream_manifest = transformer.propagate_types_and_parameters("", resolved_manifest["list_stream"], {}) + + stream = factory.create_component(model_type=DeclarativeStreamModel, component_definition=stream_manifest, config=input_config) + + assert isinstance(stream.retriever.record_selector.record_filter, ClientSideIncrementalRecordFilterDecorator) + + +def test_client_side_incremental_with_partition_router(): + content = """ +selector: + type: RecordSelector + extractor: + type: DpathExtractor + field_path: ["extractor_path"] +requester: + type: HttpRequester + name: "{{ parameters['name'] }}" + url_base: "https://api.sendgrid.com/v3/" + http_method: "GET" +schema_loader: + file_path: "./source_sendgrid/schemas/{{ parameters['name'] }}.yaml" + name: "{{ parameters['stream_name'] }}" +retriever: + requester: + type: "HttpRequester" + path: "kek" + record_selector: + extractor: + field_path: [] +stream_A: + type: DeclarativeStream + name: "A" + primary_key: "id" + $parameters: + retriever: "#/retriever" + url_base: "https://airbyte.io" + schema_loader: "#/schema_loader" +list_stream: + type: DeclarativeStream + incremental_sync: + type: DatetimeBasedCursor + $parameters: + datetime_format: "%Y-%m-%dT%H:%M:%S.%f%z" + start_datetime: + type: MinMaxDatetime + datetime: "{{ config.get('start_date', '1970-01-01T00:00:00.0Z') }}" + datetime_format: "%Y-%m-%dT%H:%M:%S.%fZ" + cursor_field: "created" + is_client_side_incremental: true + retriever: + type: SimpleRetriever + name: "{{ parameters['name'] }}" + partition_router: + type: SubstreamPartitionRouter + parent_stream_configs: + - stream: "#/stream_A" + parent_key: id + partition_field: id + paginator: + type: DefaultPaginator + pagination_strategy: + type: "CursorPagination" + cursor_value: "{{ response._metadata.next }}" + page_size: 10 + requester: + $ref: "#/requester" + path: "/" + record_selector: + $ref: "#/selector" + $parameters: + name: "lists" + """ + + parsed_manifest = YamlDeclarativeSource._parse(content) + resolved_manifest = resolver.preprocess_manifest(parsed_manifest) + stream_manifest = transformer.propagate_types_and_parameters("", resolved_manifest["list_stream"], {}) + + stream = factory.create_component(model_type=DeclarativeStreamModel, component_definition=stream_manifest, config=input_config) + + assert isinstance(stream.retriever.record_selector.record_filter, ClientSideIncrementalRecordFilterDecorator) + assert isinstance(stream.retriever.record_selector.record_filter._per_partition_cursor, PerPartitionCursor) + + +def test_given_data_feed_and_client_side_incremental_then_raise_error(): + content = """ +incremental_sync: + type: DatetimeBasedCursor + $parameters: + datetime_format: "%Y-%m-%dT%H:%M:%S.%f%z" + start_datetime: "{{ config['start_time'] }}" + cursor_field: "created" + is_data_feed: true + is_client_side_incremental: true + """ + + parsed_incremental_sync = YamlDeclarativeSource._parse(content) + resolved_incremental_sync = resolver.preprocess_manifest(parsed_incremental_sync) + datetime_based_cursor_definition = transformer.propagate_types_and_parameters("", resolved_incremental_sync["incremental_sync"], {}) + + with pytest.raises(ValueError) as e: + factory.create_component( + model_type=DatetimeBasedCursorModel, component_definition=datetime_based_cursor_definition, config=input_config + ) + assert e.value.args[0] == "`Client side incremental` cannot be applied with `data feed`. Choose only 1 from them." + + @pytest.mark.parametrize( "test_name, record_selector, expected_runtime_selector", [("test_static_record_selector", "result", "result"), ("test_options_record_selector", "{{ parameters['name'] }}", "lists")],