Skip to content

Commit 24e55be

Browse files
committed
Merge remote-tracking branch 'origin/main' into baz/cdk/oauth2-allow-authentication-using-access-token-value
2 parents 7a5507d + f8054a8 commit 24e55be

File tree

4 files changed

+253
-15
lines changed

4 files changed

+253
-15
lines changed

airbyte_cdk/sources/declarative/declarative_component_schema.yaml

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3054,6 +3054,7 @@ definitions:
30543054
interpolation_context:
30553055
- config
30563056
- components_values
3057+
- stream_slice
30573058
- stream_template_config
30583059
examples:
30593060
- ["data"]
@@ -3070,10 +3071,13 @@ definitions:
30703071
- config
30713072
- stream_template_config
30723073
- components_values
3074+
- stream_slice
30733075
examples:
30743076
- "{{ components_values['updates'] }}"
30753077
- "{{ components_values['MetaData']['LastUpdatedTime'] }}"
30763078
- "{{ config['segment_id'] }}"
3079+
- "{{ stream_slice['parent_id'] }}"
3080+
- "{{ stream_slice['extra_fields']['name'] }}"
30773081
value_type:
30783082
title: Value Type
30793083
description: The expected data type of the value. If omitted, the type will be inferred from the value provided.

airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2396,7 +2396,7 @@ def create_http_components_resolver(
23962396
config=config,
23972397
name="",
23982398
primary_key=None,
2399-
stream_slicer=combined_slicers,
2399+
stream_slicer=stream_slicer if stream_slicer else combined_slicers,
24002400
transformations=[],
24012401
)
24022402

airbyte_cdk/sources/declarative/resolvers/http_components_resolver.py

Lines changed: 20 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -88,19 +88,25 @@ def resolve_components(
8888
"""
8989
kwargs = {"stream_template_config": stream_template_config}
9090

91-
for components_values in self.retriever.read_records({}):
92-
updated_config = deepcopy(stream_template_config)
93-
kwargs["components_values"] = components_values # type: ignore[assignment] # component_values will always be of type Mapping[str, Any]
94-
95-
for resolved_component in self._resolved_components:
96-
valid_types = (
97-
(resolved_component.value_type,) if resolved_component.value_type else None
98-
)
99-
value = resolved_component.value.eval(
100-
self.config, valid_types=valid_types, **kwargs
101-
)
91+
for stream_slice in self.retriever.stream_slices():
92+
for components_values in self.retriever.read_records(
93+
records_schema={}, stream_slice=stream_slice
94+
):
95+
updated_config = deepcopy(stream_template_config)
96+
kwargs["components_values"] = components_values # type: ignore[assignment] # component_values will always be of type Mapping[str, Any]
97+
kwargs["stream_slice"] = stream_slice # type: ignore[assignment] # stream_slice will always be of type Mapping[str, Any]
98+
99+
for resolved_component in self._resolved_components:
100+
valid_types = (
101+
(resolved_component.value_type,) if resolved_component.value_type else None
102+
)
103+
value = resolved_component.value.eval(
104+
self.config, valid_types=valid_types, **kwargs
105+
)
102106

103-
path = [path.eval(self.config, **kwargs) for path in resolved_component.field_path]
104-
dpath.set(updated_config, path, value)
107+
path = [
108+
path.eval(self.config, **kwargs) for path in resolved_component.field_path
109+
]
110+
dpath.set(updated_config, path, value)
105111

106-
yield updated_config
112+
yield updated_config

unit_tests/sources/declarative/resolvers/test_http_components_resolver.py

Lines changed: 228 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -197,6 +197,133 @@
197197
],
198198
}
199199

200+
_MANIFEST_WITH_HTTP_COMPONENT_RESOLVER_WITH_RETRIEVER_WITH_PARENT_STREAM = {
201+
"version": "6.7.0",
202+
"type": "DeclarativeSource",
203+
"check": {"type": "CheckStream", "stream_names": ["Rates"]},
204+
"dynamic_streams": [
205+
{
206+
"type": "DynamicDeclarativeStream",
207+
"stream_template": {
208+
"type": "DeclarativeStream",
209+
"name": "",
210+
"primary_key": [],
211+
"schema_loader": {
212+
"type": "InlineSchemaLoader",
213+
"schema": {
214+
"$schema": "http://json-schema.org/schema#",
215+
"properties": {
216+
"ABC": {"type": "number"},
217+
"AED": {"type": "number"},
218+
},
219+
"type": "object",
220+
},
221+
},
222+
"retriever": {
223+
"type": "SimpleRetriever",
224+
"requester": {
225+
"type": "HttpRequester",
226+
"url_base": "https://api.test.com",
227+
"path": "",
228+
"http_method": "GET",
229+
"authenticator": {
230+
"type": "ApiKeyAuthenticator",
231+
"header": "apikey",
232+
"api_token": "{{ config['api_key'] }}",
233+
},
234+
},
235+
"record_selector": {
236+
"type": "RecordSelector",
237+
"extractor": {"type": "DpathExtractor", "field_path": []},
238+
},
239+
"paginator": {"type": "NoPagination"},
240+
},
241+
},
242+
"components_resolver": {
243+
"type": "HttpComponentsResolver",
244+
"retriever": {
245+
"type": "SimpleRetriever",
246+
"requester": {
247+
"type": "HttpRequester",
248+
"url_base": "https://api.test.com",
249+
"path": "parent/{{ stream_partition.parent_id }}/items",
250+
"http_method": "GET",
251+
"authenticator": {
252+
"type": "ApiKeyAuthenticator",
253+
"header": "apikey",
254+
"api_token": "{{ config['api_key'] }}",
255+
},
256+
},
257+
"record_selector": {
258+
"type": "RecordSelector",
259+
"extractor": {"type": "DpathExtractor", "field_path": []},
260+
},
261+
"paginator": {"type": "NoPagination"},
262+
"partition_router": {
263+
"type": "SubstreamPartitionRouter",
264+
"parent_stream_configs": [
265+
{
266+
"type": "ParentStreamConfig",
267+
"parent_key": "id",
268+
"partition_field": "parent_id",
269+
"stream": {
270+
"type": "DeclarativeStream",
271+
"name": "parent",
272+
"retriever": {
273+
"type": "SimpleRetriever",
274+
"requester": {
275+
"type": "HttpRequester",
276+
"url_base": "https://api.test.com",
277+
"path": "/parents",
278+
"http_method": "GET",
279+
"authenticator": {
280+
"type": "ApiKeyAuthenticator",
281+
"header": "apikey",
282+
"api_token": "{{ config['api_key'] }}",
283+
},
284+
},
285+
"record_selector": {
286+
"type": "RecordSelector",
287+
"extractor": {
288+
"type": "DpathExtractor",
289+
"field_path": [],
290+
},
291+
},
292+
},
293+
"schema_loader": {
294+
"type": "InlineSchemaLoader",
295+
"schema": {
296+
"$schema": "http://json-schema.org/schema#",
297+
"properties": {"id": {"type": "integer"}},
298+
"type": "object",
299+
},
300+
},
301+
},
302+
}
303+
],
304+
},
305+
},
306+
"components_mapping": [
307+
{
308+
"type": "ComponentMappingDefinition",
309+
"field_path": ["name"],
310+
"value": "parent_{{stream_slice['parent_id']}}_{{components_values['name']}}",
311+
},
312+
{
313+
"type": "ComponentMappingDefinition",
314+
"field_path": [
315+
"retriever",
316+
"requester",
317+
"path",
318+
],
319+
"value": "{{ stream_slice['parent_id'] }}/{{ components_values['id'] }}",
320+
},
321+
],
322+
},
323+
}
324+
],
325+
}
326+
200327

201328
@pytest.mark.parametrize(
202329
"components_mapping, retriever_data, stream_template_config, expected_result",
@@ -221,6 +348,44 @@ def test_http_components_resolver(
221348
):
222349
mock_retriever = MagicMock()
223350
mock_retriever.read_records.return_value = retriever_data
351+
mock_retriever.stream_slices.return_value = [{}]
352+
config = {}
353+
354+
resolver = HttpComponentsResolver(
355+
retriever=mock_retriever,
356+
config=config,
357+
components_mapping=components_mapping,
358+
parameters={},
359+
)
360+
361+
result = list(resolver.resolve_components(stream_template_config=stream_template_config))
362+
assert result == expected_result
363+
364+
365+
@pytest.mark.parametrize(
366+
"components_mapping, retriever_data, stream_template_config, expected_result",
367+
[
368+
(
369+
[
370+
ComponentMappingDefinition(
371+
field_path=[InterpolatedString.create("path", parameters={})],
372+
value="{{stream_slice['parent_id']}}/{{components_values['id']}}",
373+
value_type=str,
374+
parameters={},
375+
)
376+
],
377+
[{"id": "1", "field1": "data1"}, {"id": "2", "field1": "data2"}],
378+
{"path": None},
379+
[{"path": "1/1"}, {"path": "1/2"}, {"path": "2/1"}, {"path": "2/2"}],
380+
)
381+
],
382+
)
383+
def test_http_components_resolver_with_stream_slices(
384+
components_mapping, retriever_data, stream_template_config, expected_result
385+
):
386+
mock_retriever = MagicMock()
387+
mock_retriever.read_records.return_value = retriever_data
388+
mock_retriever.stream_slices.return_value = [{"parent_id": 1}, {"parent_id": 2}]
224389
config = {}
225390

226391
resolver = HttpComponentsResolver(
@@ -305,3 +470,66 @@ def test_duplicated_dynamic_streams_read_with_http_components_resolver():
305470
str(exc_info.value)
306471
== "Dynamic streams list contains a duplicate name: item_2. Please contact Airbyte Support."
307472
)
473+
474+
475+
def test_dynamic_streams_with_http_components_resolver_retriever_with_parent_stream():
476+
expected_stream_names = [
477+
"parent_1_item_1",
478+
"parent_1_item_2",
479+
"parent_2_item_1",
480+
"parent_2_item_2",
481+
]
482+
with HttpMocker() as http_mocker:
483+
http_mocker.get(
484+
HttpRequest(url="https://api.test.com/parents"),
485+
HttpResponse(body=json.dumps([{"id": 1}, {"id": 2}])),
486+
)
487+
parent_ids = [1, 2]
488+
for parent_id in parent_ids:
489+
http_mocker.get(
490+
HttpRequest(url=f"https://api.test.com/parent/{parent_id}/items"),
491+
HttpResponse(
492+
body=json.dumps(
493+
[
494+
{"id": 1, "name": "item_1"},
495+
{"id": 2, "name": "item_2"},
496+
]
497+
)
498+
),
499+
)
500+
dynamic_stream_paths = ["1/1", "2/1", "1/2", "2/2"]
501+
for dynamic_stream_path in dynamic_stream_paths:
502+
http_mocker.get(
503+
HttpRequest(url=f"https://api.test.com/{dynamic_stream_path}"),
504+
HttpResponse(body=json.dumps([{"ABC": 1, "AED": 2}])),
505+
)
506+
507+
source = ConcurrentDeclarativeSource(
508+
source_config=_MANIFEST_WITH_HTTP_COMPONENT_RESOLVER_WITH_RETRIEVER_WITH_PARENT_STREAM,
509+
config=_CONFIG,
510+
catalog=None,
511+
state=None,
512+
)
513+
514+
actual_catalog = source.discover(logger=source.logger, config=_CONFIG)
515+
516+
configured_streams = [
517+
to_configured_stream(stream, primary_key=stream.source_defined_primary_key)
518+
for stream in actual_catalog.streams
519+
]
520+
configured_catalog = to_configured_catalog(configured_streams)
521+
522+
records = [
523+
message.record
524+
for message in source.read(MagicMock(), _CONFIG, configured_catalog)
525+
if message.type == Type.RECORD
526+
]
527+
528+
assert len(actual_catalog.streams) == 4
529+
assert [stream.name for stream in actual_catalog.streams] == expected_stream_names
530+
assert len(records) == 4
531+
532+
actual_record_stream_names = [record.stream for record in records]
533+
actual_record_stream_names.sort()
534+
535+
assert actual_record_stream_names == expected_stream_names

0 commit comments

Comments
 (0)