Skip to content

Commit 4dbb6fe

Browse files
author
Baz
authored
fix: (CDK) (AsyncRetriever) - Add the request and response to each async operations (#356)
1 parent 1ad437e commit 4dbb6fe

File tree

3 files changed

+53
-7
lines changed

3 files changed

+53
-7
lines changed

airbyte_cdk/sources/declarative/extractors/response_to_file_extractor.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -136,6 +136,7 @@ def _read_with_chunks(
136136
"""
137137

138138
try:
139+
# TODO: Add support for other file types, like `json`, with `pd.read_json()`
139140
with open(path, "r", encoding=file_encoding) as data:
140141
chunks = pd.read_csv(
141142
data, chunksize=chunk_size, iterator=True, dialect="unix", dtype=object

airbyte_cdk/sources/declarative/requesters/http_job_repository.py

Lines changed: 42 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@
2323
)
2424
from airbyte_cdk.sources.declarative.requesters.requester import Requester
2525
from airbyte_cdk.sources.declarative.retrievers.simple_retriever import SimpleRetriever
26+
from airbyte_cdk.sources.http_logger import format_http_message
2627
from airbyte_cdk.sources.types import Record, StreamSlice
2728
from airbyte_cdk.utils import AirbyteTracedException
2829

@@ -71,7 +72,15 @@ def _get_validated_polling_response(self, stream_slice: StreamSlice) -> requests
7172
"""
7273

7374
polling_response: Optional[requests.Response] = self.polling_requester.send_request(
74-
stream_slice=stream_slice
75+
stream_slice=stream_slice,
76+
log_formatter=lambda polling_response: format_http_message(
77+
response=polling_response,
78+
title="Async Job -- Polling",
79+
description="Poll the status of the server-side async job.",
80+
stream_name=None,
81+
is_auxiliary=True,
82+
type="ASYNC_POLL",
83+
),
7584
)
7685
if polling_response is None:
7786
raise AirbyteTracedException(
@@ -118,8 +127,17 @@ def _start_job_and_validate_response(self, stream_slice: StreamSlice) -> request
118127
"""
119128

120129
response: Optional[requests.Response] = self.creation_requester.send_request(
121-
stream_slice=stream_slice
130+
stream_slice=stream_slice,
131+
log_formatter=lambda response: format_http_message(
132+
response=response,
133+
title="Async Job -- Create",
134+
description="Create the server-side async job.",
135+
stream_name=None,
136+
is_auxiliary=True,
137+
type="ASYNC_CREATE",
138+
),
122139
)
140+
123141
if not response:
124142
raise AirbyteTracedException(
125143
internal_message="Always expect a response or an exception from creation_requester",
@@ -217,13 +235,33 @@ def abort(self, job: AsyncJob) -> None:
217235
if not self.abort_requester:
218236
return
219237

220-
self.abort_requester.send_request(stream_slice=self._get_create_job_stream_slice(job))
238+
abort_response = self.abort_requester.send_request(
239+
stream_slice=self._get_create_job_stream_slice(job),
240+
log_formatter=lambda abort_response: format_http_message(
241+
response=abort_response,
242+
title="Async Job -- Abort",
243+
description="Abort the running server-side async job.",
244+
stream_name=None,
245+
is_auxiliary=True,
246+
type="ASYNC_ABORT",
247+
),
248+
)
221249

222250
def delete(self, job: AsyncJob) -> None:
223251
if not self.delete_requester:
224252
return
225253

226-
self.delete_requester.send_request(stream_slice=self._get_create_job_stream_slice(job))
254+
delete_job_reponse = self.delete_requester.send_request(
255+
stream_slice=self._get_create_job_stream_slice(job),
256+
log_formatter=lambda delete_job_reponse: format_http_message(
257+
response=delete_job_reponse,
258+
title="Async Job -- Delete",
259+
description="Delete the specified job from the list of Jobs.",
260+
stream_name=None,
261+
is_auxiliary=True,
262+
type="ASYNC_DELETE",
263+
),
264+
)
227265
self._clean_up_job(job.api_job_id())
228266

229267
def _clean_up_job(self, job_id: str) -> None:

airbyte_cdk/sources/declarative/retrievers/async_retriever.py

Lines changed: 10 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,13 +1,12 @@
11
# Copyright (c) 2024 Airbyte, Inc., all rights reserved.
22

33

4-
from dataclasses import InitVar, dataclass
4+
from dataclasses import InitVar, dataclass, field
55
from typing import Any, Iterable, Mapping, Optional
66

77
from typing_extensions import deprecated
88

99
from airbyte_cdk.sources.declarative.async_job.job import AsyncJob
10-
from airbyte_cdk.sources.declarative.async_job.job_orchestrator import AsyncPartition
1110
from airbyte_cdk.sources.declarative.extractors.record_selector import RecordSelector
1211
from airbyte_cdk.sources.declarative.partition_routers.async_job_partition_router import (
1312
AsyncJobPartitionRouter,
@@ -16,6 +15,7 @@
1615
from airbyte_cdk.sources.source import ExperimentalClassWarning
1716
from airbyte_cdk.sources.streams.core import StreamData
1817
from airbyte_cdk.sources.types import Config, StreamSlice, StreamState
18+
from airbyte_cdk.sources.utils.slice_logger import AlwaysLogSliceLogger
1919

2020

2121
@deprecated(
@@ -28,6 +28,10 @@ class AsyncRetriever(Retriever):
2828
parameters: InitVar[Mapping[str, Any]]
2929
record_selector: RecordSelector
3030
stream_slicer: AsyncJobPartitionRouter
31+
slice_logger: AlwaysLogSliceLogger = field(
32+
init=False,
33+
default_factory=lambda: AlwaysLogSliceLogger(),
34+
)
3135

3236
def __post_init__(self, parameters: Mapping[str, Any]) -> None:
3337
self._parameters = parameters
@@ -75,13 +79,16 @@ def _validate_and_get_stream_slice_jobs(
7579
return stream_slice.extra_fields.get("jobs", []) if stream_slice else []
7680

7781
def stream_slices(self) -> Iterable[Optional[StreamSlice]]:
78-
return self.stream_slicer.stream_slices()
82+
yield from self.stream_slicer.stream_slices()
7983

8084
def read_records(
8185
self,
8286
records_schema: Mapping[str, Any],
8387
stream_slice: Optional[StreamSlice] = None,
8488
) -> Iterable[StreamData]:
89+
# emit the slice_descriptor log message, for connector builder TestRead
90+
yield self.slice_logger.create_slice_log_message(stream_slice.cursor_slice) # type: ignore
91+
8592
stream_state: StreamState = self._get_stream_state()
8693
jobs: Iterable[AsyncJob] = self._validate_and_get_stream_slice_jobs(stream_slice)
8794
records: Iterable[Mapping[str, Any]] = self.stream_slicer.fetch_records(jobs)

0 commit comments

Comments
 (0)