Skip to content

Commit ac6cf92

Browse files
authored
feat(low-code concurrent): [ISSUE #10550] have streams without partition routers nor cursor run … (#61)
1 parent acb6630 commit ac6cf92

File tree

6 files changed

+182
-75
lines changed

6 files changed

+182
-75
lines changed

airbyte_cdk/sources/declarative/concurrent_declarative_source.py

Lines changed: 43 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -200,7 +200,11 @@ def _group_streams(
200200
# Some low-code sources use a combination of DeclarativeStream and regular Python streams. We can't inspect
201201
# these legacy Python streams the way we do low-code streams to determine if they are concurrent compatible,
202202
# so we need to treat them as synchronous
203-
if isinstance(declarative_stream, DeclarativeStream):
203+
if (
204+
isinstance(declarative_stream, DeclarativeStream)
205+
and name_to_stream_mapping[declarative_stream.name].get("retriever")["type"]
206+
== "SimpleRetriever"
207+
):
204208
incremental_sync_component_definition = name_to_stream_mapping[
205209
declarative_stream.name
206210
].get("incremental_sync")
@@ -210,36 +214,30 @@ def _group_streams(
210214
.get("retriever")
211215
.get("partition_router")
212216
)
217+
is_without_partition_router_or_cursor = not bool(
218+
incremental_sync_component_definition
219+
) and not bool(partition_router_component_definition)
213220

214221
is_substream_without_incremental = (
215222
partition_router_component_definition
216223
and not incremental_sync_component_definition
217224
)
218225

219-
if (
220-
incremental_sync_component_definition
221-
and incremental_sync_component_definition.get("type", "")
222-
== DatetimeBasedCursorModel.__name__
223-
and self._stream_supports_concurrent_partition_processing(
224-
declarative_stream=declarative_stream
225-
)
226-
and hasattr(declarative_stream.retriever, "stream_slicer")
227-
and isinstance(declarative_stream.retriever.stream_slicer, DatetimeBasedCursor)
226+
if self._is_datetime_incremental_without_partition_routing(
227+
declarative_stream, incremental_sync_component_definition
228228
):
229229
stream_state = state_manager.get_stream_state(
230230
stream_name=declarative_stream.name, namespace=declarative_stream.namespace
231231
)
232232

233-
cursor, connector_state_converter = (
234-
self._constructor.create_concurrent_cursor_from_datetime_based_cursor(
235-
state_manager=state_manager,
236-
model_type=DatetimeBasedCursorModel,
237-
component_definition=incremental_sync_component_definition,
238-
stream_name=declarative_stream.name,
239-
stream_namespace=declarative_stream.namespace,
240-
config=config or {},
241-
stream_state=stream_state,
242-
)
233+
cursor = self._constructor.create_concurrent_cursor_from_datetime_based_cursor(
234+
state_manager=state_manager,
235+
model_type=DatetimeBasedCursorModel,
236+
component_definition=incremental_sync_component_definition,
237+
stream_name=declarative_stream.name,
238+
stream_namespace=declarative_stream.namespace,
239+
config=config or {},
240+
stream_state=stream_state,
243241
)
244242

245243
partition_generator = StreamSlicerPartitionGenerator(
@@ -263,14 +261,19 @@ def _group_streams(
263261
json_schema=declarative_stream.get_json_schema(),
264262
availability_strategy=AlwaysAvailableAvailabilityStrategy(),
265263
primary_key=get_primary_key_from_stream(declarative_stream.primary_key),
266-
cursor_field=cursor.cursor_field.cursor_field_key,
264+
cursor_field=cursor.cursor_field.cursor_field_key
265+
if hasattr(cursor, "cursor_field")
266+
and hasattr(
267+
cursor.cursor_field, "cursor_field_key"
268+
) # FIXME this will need to be updated once we do the per partition
269+
else None,
267270
logger=self.logger,
268271
cursor=cursor,
269272
)
270273
)
271-
elif is_substream_without_incremental and hasattr(
272-
declarative_stream.retriever, "stream_slicer"
273-
):
274+
elif (
275+
is_substream_without_incremental or is_without_partition_router_or_cursor
276+
) and hasattr(declarative_stream.retriever, "stream_slicer"):
274277
partition_generator = StreamSlicerPartitionGenerator(
275278
DeclarativePartitionFactory(
276279
declarative_stream.name,
@@ -310,6 +313,22 @@ def _group_streams(
310313

311314
return concurrent_streams, synchronous_streams
312315

316+
def _is_datetime_incremental_without_partition_routing(
317+
self,
318+
declarative_stream: DeclarativeStream,
319+
incremental_sync_component_definition: Mapping[str, Any],
320+
) -> bool:
321+
return (
322+
bool(incremental_sync_component_definition)
323+
and incremental_sync_component_definition.get("type", "")
324+
== DatetimeBasedCursorModel.__name__
325+
and self._stream_supports_concurrent_partition_processing(
326+
declarative_stream=declarative_stream
327+
)
328+
and hasattr(declarative_stream.retriever, "stream_slicer")
329+
and isinstance(declarative_stream.retriever.stream_slicer, DatetimeBasedCursor)
330+
)
331+
313332
def _stream_supports_concurrent_partition_processing(
314333
self, declarative_stream: DeclarativeStream
315334
) -> bool:

airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py

Lines changed: 15 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,6 @@
1717
Mapping,
1818
MutableMapping,
1919
Optional,
20-
Tuple,
2120
Type,
2221
Union,
2322
get_args,
@@ -760,7 +759,7 @@ def create_concurrent_cursor_from_datetime_based_cursor(
760759
config: Config,
761760
stream_state: MutableMapping[str, Any],
762761
**kwargs: Any,
763-
) -> Tuple[ConcurrentCursor, DateTimeStreamStateConverter]:
762+
) -> ConcurrentCursor:
764763
component_type = component_definition.get("type")
765764
if component_definition.get("type") != model_type.__name__:
766765
raise ValueError(
@@ -891,23 +890,20 @@ def create_concurrent_cursor_from_datetime_based_cursor(
891890
if evaluated_step:
892891
step_length = parse_duration(evaluated_step)
893892

894-
return (
895-
ConcurrentCursor(
896-
stream_name=stream_name,
897-
stream_namespace=stream_namespace,
898-
stream_state=stream_state,
899-
message_repository=self._message_repository, # type: ignore # message_repository is always instantiated with a value by factory
900-
connector_state_manager=state_manager,
901-
connector_state_converter=connector_state_converter,
902-
cursor_field=cursor_field,
903-
slice_boundary_fields=slice_boundary_fields,
904-
start=start_date, # type: ignore # Having issues w/ inspection for GapType and CursorValueType as shown in existing tests. Confirmed functionality is working in practice
905-
end_provider=end_date_provider, # type: ignore # Having issues w/ inspection for GapType and CursorValueType as shown in existing tests. Confirmed functionality is working in practice
906-
lookback_window=lookback_window,
907-
slice_range=step_length,
908-
cursor_granularity=cursor_granularity,
909-
),
910-
connector_state_converter,
893+
return ConcurrentCursor(
894+
stream_name=stream_name,
895+
stream_namespace=stream_namespace,
896+
stream_state=stream_state,
897+
message_repository=self._message_repository, # type: ignore # message_repository is always instantiated with a value by factory
898+
connector_state_manager=state_manager,
899+
connector_state_converter=connector_state_converter,
900+
cursor_field=cursor_field,
901+
slice_boundary_fields=slice_boundary_fields,
902+
start=start_date, # type: ignore # Having issues w/ inspection for GapType and CursorValueType as shown in existing tests. Confirmed functionality is working in practice
903+
end_provider=end_date_provider, # type: ignore # Having issues w/ inspection for GapType and CursorValueType as shown in existing tests. Confirmed functionality is working in practice
904+
lookback_window=lookback_window,
905+
slice_range=step_length,
906+
cursor_granularity=cursor_granularity,
911907
)
912908

913909
@staticmethod

airbyte_cdk/sources/streams/concurrent/helpers.py

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -13,8 +13,15 @@ def get_primary_key_from_stream(
1313
elif isinstance(stream_primary_key, str):
1414
return [stream_primary_key]
1515
elif isinstance(stream_primary_key, list):
16-
if len(stream_primary_key) > 0 and all(isinstance(k, str) for k in stream_primary_key):
16+
are_all_elements_str = all(isinstance(k, str) for k in stream_primary_key)
17+
are_all_elements_list_of_size_one = all(
18+
isinstance(k, list) and len(k) == 1 for k in stream_primary_key
19+
)
20+
21+
if are_all_elements_str:
1722
return stream_primary_key # type: ignore # We verified all items in the list are strings
23+
elif are_all_elements_list_of_size_one:
24+
return list(map(lambda x: x[0], stream_primary_key))
1825
else:
1926
raise ValueError(f"Nested primary keys are not supported. Found {stream_primary_key}")
2027
else:

unit_tests/sources/declarative/parsers/test_model_to_component_factory.py

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -3062,7 +3062,7 @@ def test_create_concurrent_cursor_from_datetime_based_cursor_all_fields(
30623062
"lookback_window": "P3D",
30633063
}
30643064

3065-
concurrent_cursor, stream_state_converter = (
3065+
concurrent_cursor = (
30663066
connector_builder_factory.create_concurrent_cursor_from_datetime_based_cursor(
30673067
state_manager=connector_state_manager,
30683068
model_type=DatetimeBasedCursorModel,
@@ -3094,6 +3094,7 @@ def test_create_concurrent_cursor_from_datetime_based_cursor_all_fields(
30943094
assert concurrent_cursor._end_provider() == expected_end
30953095
assert concurrent_cursor._concurrent_state == expected_concurrent_state
30963096

3097+
stream_state_converter = concurrent_cursor._connector_state_converter
30973098
assert isinstance(stream_state_converter, CustomFormatConcurrentStreamStateConverter)
30983099
assert stream_state_converter._datetime_format == expected_datetime_format
30993100
assert stream_state_converter._is_sequential_state
@@ -3194,7 +3195,7 @@ def test_create_concurrent_cursor_from_datetime_based_cursor(
31943195
stream_state={},
31953196
)
31963197
else:
3197-
concurrent_cursor, stream_state_converter = (
3198+
concurrent_cursor = (
31983199
connector_builder_factory.create_concurrent_cursor_from_datetime_based_cursor(
31993200
state_manager=connector_state_manager,
32003201
model_type=DatetimeBasedCursorModel,
@@ -3251,7 +3252,7 @@ def test_create_concurrent_cursor_uses_min_max_datetime_format_if_defined():
32513252
"lookback_window": "P3D",
32523253
}
32533254

3254-
concurrent_cursor, stream_state_converter = (
3255+
concurrent_cursor = (
32553256
connector_builder_factory.create_concurrent_cursor_from_datetime_based_cursor(
32563257
state_manager=connector_state_manager,
32573258
model_type=DatetimeBasedCursorModel,

0 commit comments

Comments
 (0)