Skip to content

Commit 98583c4

Browse files
committed
switch the ordering page iteration and property chunking process chunks first instead of pages first
1 parent bf998bd commit 98583c4

File tree

1 file changed

+86
-101
lines changed

1 file changed

+86
-101
lines changed

airbyte_cdk/sources/declarative/retrievers/simple_retriever.py

Lines changed: 86 additions & 101 deletions
Original file line numberDiff line numberDiff line change
@@ -367,14 +367,66 @@ def _read_pages(
367367
{"next_page_token": initial_token} if initial_token is not None else None
368368
)
369369
while not pagination_complete:
370-
response = self._fetch_next_page(stream_state, stream_slice, next_page_token)
370+
property_chunks = (
371+
list(
372+
self.additional_query_properties.get_request_property_chunks(
373+
stream_slice=stream_slice
374+
)
375+
)
376+
if self.additional_query_properties
377+
else [
378+
None
379+
] # A single None property chunk represents the case where property chunking is not configured
380+
)
371381

382+
records_without_merge_key = []
383+
merged_records: MutableMapping[str, Any] = defaultdict(dict)
372384
last_page_size = 0
373385
last_record: Optional[Record] = None
374-
for record in records_generator_fn(response):
375-
last_page_size += 1
376-
last_record = record
377-
yield record
386+
response: Optional[requests.Response] = None
387+
for properties in property_chunks:
388+
if properties:
389+
stream_slice = StreamSlice(
390+
partition=stream_slice.partition or {},
391+
cursor_slice=stream_slice.cursor_slice or {},
392+
extra_fields={"query_properties": properties},
393+
)
394+
395+
response = self._fetch_next_page(stream_state, stream_slice, next_page_token)
396+
for current_record in records_generator_fn(response):
397+
if (
398+
current_record
399+
and self.additional_query_properties
400+
and self.additional_query_properties.property_chunking
401+
):
402+
merge_key = (
403+
self.additional_query_properties.property_chunking.get_merge_key(
404+
current_record
405+
)
406+
)
407+
if merge_key:
408+
merged_records[merge_key].update(current_record)
409+
else:
410+
# We should still emit records even if the record did not have a merge key
411+
last_page_size += 1
412+
last_record = current_record
413+
yield current_record
414+
else:
415+
last_page_size += 1
416+
last_record = current_record
417+
yield current_record
418+
419+
if (
420+
self.additional_query_properties
421+
and self.additional_query_properties.property_chunking
422+
):
423+
for merged_record in merged_records.values():
424+
record = Record(
425+
data=merged_record, stream_name=self.name, associated_slice=stream_slice
426+
)
427+
last_page_size += 1
428+
last_record = record
429+
yield record
378430

379431
if not response:
380432
pagination_complete = True
@@ -449,110 +501,43 @@ def read_records(
449501
:param stream_slice: The stream slice to read data for
450502
:return: The records read from the API source
451503
"""
452-
453-
property_chunks = (
454-
list(
455-
self.additional_query_properties.get_request_property_chunks(
456-
stream_slice=stream_slice
457-
)
458-
)
459-
if self.additional_query_properties
460-
else []
461-
)
462-
records_without_merge_key = []
463-
merged_records: MutableMapping[str, Any] = defaultdict(dict)
464-
465504
_slice = stream_slice or StreamSlice(partition={}, cursor_slice={}) # None-check
505+
466506
most_recent_record_from_slice = None
507+
record_generator = partial(
508+
self._parse_records,
509+
stream_slice=stream_slice,
510+
stream_state=self.state or {},
511+
records_schema=records_schema,
512+
)
467513

468-
if self.additional_query_properties:
469-
for properties in property_chunks:
470-
_slice = StreamSlice(
471-
partition=_slice.partition or {},
472-
cursor_slice=_slice.cursor_slice or {},
473-
extra_fields={"query_properties": properties},
474-
) # None-check
475-
476-
record_generator = partial(
477-
self._parse_records,
478-
stream_slice=_slice,
479-
stream_state=self.state or {},
480-
records_schema=records_schema,
481-
)
514+
if self.cursor and isinstance(self.cursor, ResumableFullRefreshCursor):
515+
stream_state = self.state
482516

483-
for stream_data in self._read_pages(record_generator, self.state, _slice):
484-
current_record = self._extract_record(stream_data, _slice)
485-
if self.cursor and current_record:
486-
self.cursor.observe(_slice, current_record)
517+
# Before syncing the RFR stream, we check if the job's prior attempt was successful and don't need to
518+
# fetch more records. The platform deletes stream state for full refresh streams before starting a
519+
# new job, so we don't need to worry about this value existing for the initial attempt
520+
if stream_state.get(FULL_REFRESH_SYNC_COMPLETE_KEY):
521+
return
487522

488-
# Latest record read, not necessarily within slice boundaries.
489-
# TODO Remove once all custom components implement `observe` method.
490-
# https://github.com/airbytehq/airbyte-internal-issues/issues/6955
491-
most_recent_record_from_slice = self._get_most_recent_record(
492-
most_recent_record_from_slice, current_record, _slice
493-
)
523+
yield from self._read_single_page(record_generator, stream_state, _slice)
524+
else:
525+
for stream_data in self._read_pages(record_generator, self.state, _slice):
526+
current_record = self._extract_record(stream_data, _slice)
527+
if self.cursor and current_record:
528+
self.cursor.observe(_slice, current_record)
529+
530+
# Latest record read, not necessarily within slice boundaries.
531+
# TODO Remove once all custom components implement `observe` method.
532+
# https://github.com/airbytehq/airbyte-internal-issues/issues/6955
533+
most_recent_record_from_slice = self._get_most_recent_record(
534+
most_recent_record_from_slice, current_record, _slice
535+
)
536+
yield stream_data
494537

495-
if current_record and self.additional_query_properties.property_chunking:
496-
merge_key = (
497-
self.additional_query_properties.property_chunking.get_merge_key(
498-
current_record
499-
)
500-
)
501-
if merge_key:
502-
merged_records[merge_key].update(current_record)
503-
else:
504-
# We should still emit records even if the record did not have a merge key
505-
records_without_merge_key.append(current_record)
506-
else:
507-
yield stream_data
508538
if self.cursor:
509539
self.cursor.close_slice(_slice, most_recent_record_from_slice)
510-
511-
if len(merged_records) > 0:
512-
yield from [
513-
Record(data=merged_record, stream_name=self.name, associated_slice=stream_slice)
514-
for merged_record in merged_records.values()
515-
]
516-
if len(records_without_merge_key) > 0:
517-
yield from records_without_merge_key
518-
else:
519-
_slice = stream_slice or StreamSlice(partition={}, cursor_slice={}) # None-check
520-
521-
most_recent_record_from_slice = None
522-
record_generator = partial(
523-
self._parse_records,
524-
stream_slice=stream_slice,
525-
stream_state=self.state or {},
526-
records_schema=records_schema,
527-
)
528-
529-
if self.cursor and isinstance(self.cursor, ResumableFullRefreshCursor):
530-
stream_state = self.state
531-
532-
# Before syncing the RFR stream, we check if the job's prior attempt was successful and don't need to
533-
# fetch more records. The platform deletes stream state for full refresh streams before starting a
534-
# new job, so we don't need to worry about this value existing for the initial attempt
535-
if stream_state.get(FULL_REFRESH_SYNC_COMPLETE_KEY):
536-
return
537-
538-
yield from self._read_single_page(record_generator, stream_state, _slice)
539-
else:
540-
for stream_data in self._read_pages(record_generator, self.state, _slice):
541-
current_record = self._extract_record(stream_data, _slice)
542-
if self.cursor and current_record:
543-
self.cursor.observe(_slice, current_record)
544-
545-
# Latest record read, not necessarily within slice boundaries.
546-
# TODO Remove once all custom components implement `observe` method.
547-
# https://github.com/airbytehq/airbyte-internal-issues/issues/6955
548-
most_recent_record_from_slice = self._get_most_recent_record(
549-
most_recent_record_from_slice, current_record, _slice
550-
)
551-
yield stream_data
552-
553-
if self.cursor:
554-
self.cursor.close_slice(_slice, most_recent_record_from_slice)
555-
return
540+
return
556541

557542
def _get_most_recent_record(
558543
self,

0 commit comments

Comments
 (0)