Skip to content

Commit 9a075a1

Browse files
dbgold17Copilot
andauthored
feat: additional interpolation contexts for download step of async job (#757)
Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com>
1 parent e9d1ff0 commit 9a075a1

File tree

5 files changed

+59
-17
lines changed

5 files changed

+59
-17
lines changed

airbyte_cdk/sources/declarative/declarative_component_schema.yaml

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3797,7 +3797,6 @@ definitions:
37973797
- polling_requester
37983798
- download_requester
37993799
- status_extractor
3800-
- download_target_extractor
38013800
properties:
38023801
type:
38033802
type: string

airbyte_cdk/sources/declarative/models/declarative_component_schema.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -2852,8 +2852,8 @@ class AsyncRetriever(BaseModel):
28522852
status_extractor: Union[DpathExtractor, CustomRecordExtractor] = Field(
28532853
..., description="Responsible for fetching the actual status of the async job."
28542854
)
2855-
download_target_extractor: Union[DpathExtractor, CustomRecordExtractor] = Field(
2856-
...,
2855+
download_target_extractor: Optional[Union[DpathExtractor, CustomRecordExtractor]] = Field(
2856+
None,
28572857
description="Responsible for fetching the final result `urls` provided by the completed / finished / ready async job.",
28582858
)
28592859
download_extractor: Optional[

airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py

Lines changed: 14 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -3467,6 +3467,11 @@ def create_async_retriever(
34673467
transformations: List[RecordTransformation],
34683468
**kwargs: Any,
34693469
) -> AsyncRetriever:
3470+
if model.download_target_requester and not model.download_target_extractor:
3471+
raise ValueError(
3472+
f"`download_target_extractor` required if using a `download_target_requester`"
3473+
)
3474+
34703475
def _get_download_retriever(
34713476
requester: Requester, extractor: RecordExtractor, _decoder: Decoder
34723477
) -> SimpleRetriever:
@@ -3624,11 +3629,15 @@ def _get_job_timeout() -> datetime.timedelta:
36243629
status_extractor = self._create_component_from_model(
36253630
model=model.status_extractor, decoder=decoder, config=config, name=name
36263631
)
3627-
download_target_extractor = self._create_component_from_model(
3628-
model=model.download_target_extractor,
3629-
decoder=decoder,
3630-
config=config,
3631-
name=name,
3632+
download_target_extractor = (
3633+
self._create_component_from_model(
3634+
model=model.download_target_extractor,
3635+
decoder=decoder,
3636+
config=config,
3637+
name=name,
3638+
)
3639+
if model.download_target_extractor
3640+
else None
36323641
)
36333642

36343643
job_repository: AsyncJobRepository = AsyncHttpJobRepository(

airbyte_cdk/sources/declarative/requesters/README.md

Lines changed: 14 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,19 @@
1+
# Download Target and Download Requester
2+
3+
- The `creation_response` and `polling_response` interpolation contexts are always available during the job download step of the process.
4+
5+
- The`download_target` interpolation context is generated by the `download_target_extractor` and made available to the job download step as well.
6+
- if `download_target_requester` is not provided, `download_target_extractor` will get urls from the `polling_response`
7+
- if `download_target_requester` is provided, an additional request will be made to fetch job download targets and `download_target_extractor` will operate on that response
8+
9+
## Some important considerations
10+
11+
- **Note:** If the `download_target_extractor` and `download_target_requester` are not defined, a single job download request will be made without the `download_target` context.
12+
- **Note:** The `download_target_extractor` is required (not optional) if using a `download_target_requester`
13+
114
# AsyncHttpJobRepository sequence diagram
215

316
- Components marked as optional are not required and can be ignored.
4-
- if `download_target_requester` is not provided, `download_target_extractor` will get urls from the `polling_response`
5-
- interpolation_context, e.g. `creation_response` or `polling_response` can be obtained from stream_slice
617

718
```mermaid
819
---
@@ -37,7 +48,7 @@ sequenceDiagram
3748
UrlRequester -->> AsyncHttpJobRepository: Download URLs
3849
3950
AsyncHttpJobRepository ->> DownloadRetriever: Download reports
40-
DownloadRetriever ->> Reporting Server: Retrieve report data (interpolation_context: `url`)
51+
DownloadRetriever ->> Reporting Server: Retrieve report data (interpolation_context: `download_target`, `creation_response`, `polling_response`)
4152
Reporting Server -->> DownloadRetriever: Report data
4253
DownloadRetriever -->> AsyncHttpJobRepository: Report data
4354
else Status: Failed

airbyte_cdk/sources/declarative/requesters/http_job_repository.py

Lines changed: 29 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -43,7 +43,7 @@ class AsyncHttpJobRepository(AsyncJobRepository):
4343
delete_requester: Optional[Requester]
4444
status_extractor: DpathExtractor
4545
status_mapping: Mapping[str, AsyncJobStatus]
46-
download_target_extractor: DpathExtractor
46+
download_target_extractor: Optional[DpathExtractor]
4747

4848
# timeout for the job to be completed, passed from `polling_job_timeout`
4949
job_timeout: Optional[timedelta] = None
@@ -213,14 +213,16 @@ def fetch_records(self, job: AsyncJob) -> Iterable[Mapping[str, Any]]:
213213
214214
"""
215215

216-
for target_url in self._get_download_targets(job):
216+
for download_target in self._get_download_targets(job):
217217
job_slice = job.job_parameters()
218218
stream_slice = StreamSlice(
219219
partition=job_slice.partition,
220220
cursor_slice=job_slice.cursor_slice,
221221
extra_fields={
222222
**job_slice.extra_fields,
223-
"download_target": target_url,
223+
"download_target": download_target,
224+
"creation_response": self._get_creation_response_interpolation_context(job),
225+
"polling_response": self._get_polling_response_interpolation_context(job),
224226
},
225227
)
226228
for message in self.download_retriever.read_records({}, stream_slice):
@@ -330,9 +332,27 @@ def _get_create_job_stream_slice(self, job: AsyncJob) -> StreamSlice:
330332
)
331333

332334
def _get_download_targets(self, job: AsyncJob) -> Iterable[str]:
333-
if not self.download_target_requester:
334-
url_response = self._polling_job_response_by_id[job.api_job_id()]
335-
else:
335+
"""Returns an iterable of strings to help target requests for downloading async jobs."""
336+
# If neither download_target_extractor nor download_target_requester are provided, yield a single empty string
337+
# to express the need to make a single download request without any download_target value
338+
if not self.download_target_extractor:
339+
if not self.download_target_requester:
340+
lazy_log(
341+
LOGGER,
342+
logging.DEBUG,
343+
lambda: "No download_target_extractor or download_target_requester provided. Will attempt a single download request without a `download_target`.",
344+
)
345+
yield ""
346+
return
347+
else:
348+
raise AirbyteTracedException(
349+
internal_message="Must define a `download_target_extractor` when using a `download_target_requester`.",
350+
failure_type=FailureType.config_error,
351+
)
352+
353+
# We have a download_target_extractor, use it to extract the donload_target
354+
if self.download_target_requester:
355+
# if a download_target_requester if defined, we extract from the response of a request specifically for download targets.
336356
stream_slice: StreamSlice = StreamSlice(
337357
partition={},
338358
cursor_slice={},
@@ -346,5 +366,8 @@ def _get_download_targets(self, job: AsyncJob) -> Iterable[str]:
346366
internal_message="Always expect a response or an exception from download_target_requester",
347367
failure_type=FailureType.system_error,
348368
)
369+
else:
370+
# if no download_target_requester is defined, we extract from the polling response
371+
url_response = self._polling_job_response_by_id[job.api_job_id()]
349372

350373
yield from self.download_target_extractor.extract_records(url_response) # type: ignore # we expect download_target_extractor to always return list of strings

0 commit comments

Comments
 (0)