Skip to content

Commit 7dcbc04

Browse files
committed
PoC pagination reset
1 parent 7ab013d commit 7dcbc04

File tree

9 files changed

+118
-48
lines changed

9 files changed

+118
-48
lines changed

airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2057,6 +2057,7 @@ def create_default_stream(
20572057
client_side_incremental_sync={"cursor": concurrent_cursor}
20582058
if self._is_client_side_filtering_enabled(model)
20592059
else None,
2060+
cursor=concurrent_cursor,
20602061
transformations=transformations,
20612062
file_uploader=file_uploader,
20622063
incremental_sync=model.incremental_sync,
@@ -2279,6 +2280,7 @@ def create_default_paginator(
22792280
config: Config,
22802281
*,
22812282
url_base: str,
2283+
cursor: Cursor,
22822284
extractor_model: Optional[Union[CustomRecordExtractorModel, DpathExtractorModel]] = None,
22832285
decoder: Optional[Decoder] = None,
22842286
cursor_used_for_stop_condition: Optional[Cursor] = None,
@@ -2316,6 +2318,7 @@ def create_default_paginator(
23162318
page_token_option=page_token_option,
23172319
pagination_strategy=pagination_strategy,
23182320
url_base=url_base,
2321+
cursor=cursor,
23192322
config=config,
23202323
parameters=model.parameters or {},
23212324
)
@@ -3149,6 +3152,7 @@ def create_simple_retriever(
31493152
config: Config,
31503153
*,
31513154
name: str,
3155+
cursor: Cursor,
31523156
primary_key: Optional[Union[str, List[str], List[List[str]]]],
31533157
request_options_provider: Optional[RequestOptionsProvider] = None,
31543158
stop_condition_cursor: Optional[Cursor] = None,
@@ -3271,6 +3275,7 @@ def _get_url(req: Requester) -> str:
32713275
extractor_model=model.record_selector.extractor,
32723276
decoder=decoder,
32733277
cursor_used_for_stop_condition=stop_condition_cursor or None,
3278+
cursor=cursor,
32743279
)
32753280
if model.paginator
32763281
else NoPagination(parameters={})

airbyte_cdk/sources/declarative/requesters/error_handlers/default_error_handler.py

Lines changed: 11 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,7 @@
1313
from airbyte_cdk.sources.declarative.requesters.error_handlers.http_response_filter import (
1414
HttpResponseFilter,
1515
)
16-
from airbyte_cdk.sources.streams.http.error_handlers import BackoffStrategy, ErrorHandler
16+
from airbyte_cdk.sources.streams.http.error_handlers import BackoffStrategy, ErrorHandler, ResponseAction
1717
from airbyte_cdk.sources.streams.http.error_handlers.response_models import (
1818
SUCCESS_RESOLUTION,
1919
ErrorResolution,
@@ -106,6 +106,16 @@ def __post_init__(self, parameters: Mapping[str, Any]) -> None:
106106
if not self.response_filters:
107107
self.response_filters = [HttpResponseFilter(config=self.config, parameters={})]
108108

109+
self.response_filters = [
110+
# FIXME I have not wired up the RESET_PAGINATION in the model but assume I did and I configured this in the manifest.yaml...
111+
HttpResponseFilter(
112+
action=ResponseAction.RESET_PAGINATION,
113+
http_codes={400},
114+
error_message_contains="You cannot access tickets beyond the 300th page",
115+
config=self.config,
116+
parameters={}
117+
)
118+
] + self.response_filters
109119
self._last_request_to_attempt_count: MutableMapping[requests.PreparedRequest, int] = {}
110120

111121
def interpret_response(

airbyte_cdk/sources/declarative/requesters/paginators/default_paginator.py

Lines changed: 14 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@
33
#
44

55
from dataclasses import InitVar, dataclass, field
6+
from sqlite3 import Cursor
67
from typing import Any, Mapping, MutableMapping, Optional, Union
78

89
import requests
@@ -22,11 +23,9 @@
2223
RequestOptionType,
2324
)
2425
from airbyte_cdk.sources.declarative.requesters.request_path import RequestPath
26+
from airbyte_cdk.sources.streams.concurrent.cursor import Cursor
2527
from airbyte_cdk.sources.types import Config, Record, StreamSlice, StreamState
26-
from airbyte_cdk.utils.mapping_helpers import (
27-
_validate_component_request_option_paths,
28-
get_interpolation_context,
29-
)
28+
from airbyte_cdk.utils.mapping_helpers import _validate_component_request_option_paths
3029

3130

3231
@dataclass
@@ -100,6 +99,7 @@ class DefaultPaginator(Paginator):
10099
"""
101100

102101
pagination_strategy: PaginationStrategy
102+
cursor: Cursor
103103
config: Config
104104
url_base: Union[InterpolatedString, str]
105105
parameters: InitVar[Mapping[str, Any]]
@@ -223,6 +223,9 @@ def _get_request_options(
223223

224224
return options
225225

226+
def generate_stream_slice_on_reset(self, stream_slice: StreamSlice) -> Optional[StreamSlice]:
227+
return self.cursor.reduce_slice_range(stream_slice)
228+
226229

227230
class PaginatorTestReadDecorator(Paginator):
228231
"""
@@ -318,3 +321,10 @@ def get_request_body_json(
318321
return self._decorated.get_request_body_json(
319322
stream_state=stream_state, stream_slice=stream_slice, next_page_token=next_page_token
320323
)
324+
325+
def generate_stream_slice_on_reset(self, stream_slice: StreamSlice) -> Optional[StreamSlice]:
326+
"""
327+
We assume that this will not happen during test read because the feature relates to very long pagination and
328+
hence we should not hit the maximum_number_of_pages limit.
329+
"""
330+
return stream_slice

airbyte_cdk/sources/declarative/requesters/paginators/no_pagination.py

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -74,3 +74,6 @@ def next_page_token(
7474
last_page_token_value: Optional[Any],
7575
) -> Optional[Mapping[str, Any]]:
7676
return {}
77+
78+
def generate_stream_slice_on_reset(self, stream_slice: StreamSlice) -> Optional[StreamSlice]:
79+
return stream_slice

airbyte_cdk/sources/declarative/requesters/paginators/paginator.py

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -63,3 +63,7 @@ def path(
6363
:return: path to hit to fetch the next request. Returning None means the path is not defined by the next_page_token
6464
"""
6565
pass
66+
67+
@abstractmethod
68+
def generate_stream_slice_on_reset(self, stream_slice: StreamSlice) -> Optional[StreamSlice]:
69+
pass

airbyte_cdk/sources/declarative/retrievers/simple_retriever.py

Lines changed: 56 additions & 43 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@
33
#
44

55
import json
6+
import logging
67
from collections import defaultdict
78
from dataclasses import InitVar, dataclass, field
89
from functools import partial
@@ -43,11 +44,14 @@
4344
from airbyte_cdk.sources.declarative.stream_slicers.stream_slicer import StreamSlicer
4445
from airbyte_cdk.sources.source import ExperimentalClassWarning
4546
from airbyte_cdk.sources.streams.core import StreamData
47+
from airbyte_cdk.sources.streams.http.http_client import PaginationResetRequiredException
4648
from airbyte_cdk.sources.types import Config, Record, StreamSlice, StreamState
4749
from airbyte_cdk.utils.mapping_helpers import combine_mappings
4850

4951
FULL_REFRESH_SYNC_COMPLETE_KEY = "__ab_full_refresh_sync_complete"
5052

53+
LOGGER = logging.getLogger("airbyte")
54+
5155

5256
@dataclass
5357
class SimpleRetriever(Retriever):
@@ -392,56 +396,65 @@ def _read_pages(
392396
extra_fields={"query_properties": properties},
393397
)
394398

395-
response = self._fetch_next_page(stream_state, stream_slice, next_page_token)
396-
for current_record in records_generator_fn(response):
397-
if (
398-
current_record
399-
and self.additional_query_properties
400-
and self.additional_query_properties.property_chunking
401-
):
402-
merge_key = (
403-
self.additional_query_properties.property_chunking.get_merge_key(
404-
current_record
399+
try:
400+
response = self._fetch_next_page(stream_state, stream_slice, next_page_token)
401+
for current_record in records_generator_fn(response):
402+
if (
403+
current_record
404+
and self.additional_query_properties
405+
and self.additional_query_properties.property_chunking
406+
):
407+
merge_key = (
408+
self.additional_query_properties.property_chunking.get_merge_key(
409+
current_record
410+
)
405411
)
406-
)
407-
if merge_key:
408-
_deep_merge(merged_records[merge_key], current_record)
412+
if merge_key:
413+
_deep_merge(merged_records[merge_key], current_record)
414+
else:
415+
# We should still emit records even if the record did not have a merge key
416+
last_page_size += 1
417+
last_record = current_record
418+
yield current_record
409419
else:
410-
# We should still emit records even if the record did not have a merge key
411420
last_page_size += 1
412421
last_record = current_record
413422
yield current_record
423+
424+
if (
425+
self.additional_query_properties
426+
and self.additional_query_properties.property_chunking
427+
):
428+
for merged_record in merged_records.values():
429+
record = Record(
430+
data=merged_record, stream_name=self.name, associated_slice=stream_slice
431+
)
432+
last_page_size += 1
433+
last_record = record
434+
yield record
435+
436+
if not response:
437+
pagination_complete = True
414438
else:
415-
last_page_size += 1
416-
last_record = current_record
417-
yield current_record
418-
419-
if (
420-
self.additional_query_properties
421-
and self.additional_query_properties.property_chunking
422-
):
423-
for merged_record in merged_records.values():
424-
record = Record(
425-
data=merged_record, stream_name=self.name, associated_slice=stream_slice
439+
last_page_token_value = (
440+
next_page_token.get("next_page_token") if next_page_token else None
441+
)
442+
next_page_token = self._next_page_token(
443+
response=response,
444+
last_page_size=last_page_size,
445+
last_record=last_record,
446+
last_page_token_value=last_page_token_value,
447+
)
448+
if not next_page_token:
449+
pagination_complete = True
450+
except PaginationResetRequiredException:
451+
initial_token = self._paginator.get_initial_token()
452+
next_page_token: Optional[Mapping[str, Any]] = (
453+
{"next_page_token": initial_token} if initial_token is not None else None
426454
)
427-
last_page_size += 1
428-
last_record = record
429-
yield record
430-
431-
if not response:
432-
pagination_complete = True
433-
else:
434-
last_page_token_value = (
435-
next_page_token.get("next_page_token") if next_page_token else None
436-
)
437-
next_page_token = self._next_page_token(
438-
response=response,
439-
last_page_size=last_page_size,
440-
last_record=last_record,
441-
last_page_token_value=last_page_token_value,
442-
)
443-
if not next_page_token:
444-
pagination_complete = True
455+
previous_slice = stream_slice
456+
stream_slice = self._paginator.generate_stream_slice_on_reset(stream_slice)
457+
LOGGER.info(f"Hitting PaginationReset event. StreamSlice used will go from {previous_slice} to {stream_slice}")
445458

446459
# Always return an empty generator just in case no records were ever yielded
447460
yield from []

airbyte_cdk/sources/streams/concurrent/cursor.py

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -86,6 +86,9 @@ def stream_slices(self) -> Iterable[StreamSlice]:
8686
"""
8787
yield StreamSlice(partition={}, cursor_slice={})
8888

89+
def reduce_slice_range(self, stream_slice: StreamSlice) -> StreamSlice:
90+
return stream_slice
91+
8992

9093
class FinalStateCursor(Cursor):
9194
"""Cursor that is used to guarantee at least one state message is emitted for a concurrent stream."""
@@ -516,3 +519,17 @@ def _log_for_record_without_cursor_value(self) -> None:
516519
f"Could not find cursor field `{self.cursor_field.cursor_field_key}` in record for stream {self._stream_name}. The incremental sync will assume it needs to be synced"
517520
)
518521
self._should_be_synced_logger_triggered = True
522+
523+
def reduce_slice_range(self, stream_slice: StreamSlice) -> StreamSlice:
524+
return StreamSlice(
525+
partition=stream_slice.partition,
526+
cursor_slice={
527+
self._slice_boundary_fields_wrapper[
528+
self._START_BOUNDARY
529+
]: self._connector_state_converter.output_format(self._most_recent_cursor_value_per_partition[stream_slice]),
530+
self._slice_boundary_fields_wrapper[
531+
self._END_BOUNDARY
532+
]: stream_slice.cursor_slice[self._slice_boundary_fields_wrapper[self._END_BOUNDARY]],
533+
},
534+
extra_fields=stream_slice.extra_fields,
535+
)

airbyte_cdk/sources/streams/http/error_handlers/response_models.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@ class ResponseAction(Enum):
1717
FAIL = "FAIL"
1818
IGNORE = "IGNORE"
1919
RATE_LIMITED = "RATE_LIMITED"
20+
RESET_PAGINATION = "RESET_PAGINATION"
2021

2122

2223
@dataclass

airbyte_cdk/sources/streams/http/http_client.py

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -79,6 +79,10 @@ def monkey_patched_get_item(self, key): # type: ignore # this interface is a co
7979
requests_cache.SQLiteDict.__getitem__ = monkey_patched_get_item # type: ignore # see the method doc for more information
8080

8181

82+
class PaginationResetRequiredException(Exception):
83+
pass
84+
85+
8286
class MessageRepresentationAirbyteTracedErrors(AirbyteTracedException):
8387
"""
8488
Before the migration to the HttpClient in low-code, the exception raised was
@@ -428,6 +432,9 @@ def _handle_error_resolution(
428432
if error_resolution.response_action not in self._ACTIONS_TO_RETRY_ON:
429433
self._evict_key(request)
430434

435+
if error_resolution.response_action == ResponseAction.RESET_PAGINATION:
436+
raise PaginationResetRequiredException()
437+
431438
# Emit stream status RUNNING with the reason RATE_LIMITED to log that the rate limit has been reached
432439
if error_resolution.response_action == ResponseAction.RATE_LIMITED:
433440
# TODO: Update to handle with message repository when concurrent message repository is ready

0 commit comments

Comments
 (0)