Skip to content

Commit 73ec055

Browse files
darynaishchenkooctavia-squidington-iii
andauthored
fix(file-based-cdk): handle schema inference error for empty file (#802)
Co-authored-by: octavia-squidington-iii <contact@airbyte.com>
1 parent 041c201 commit 73ec055

File tree

3 files changed

+42
-14
lines changed

3 files changed

+42
-14
lines changed

airbyte_cdk/sources/file_based/exceptions.py

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -157,3 +157,7 @@ class CustomFileBasedException(AirbyteTracedException):
157157

158158
class FileSizeLimitError(CustomFileBasedException):
159159
pass
160+
161+
162+
class EmptyFileSchemaInferenceError(AirbyteTracedException):
163+
pass

airbyte_cdk/sources/file_based/file_types/csv_parser.py

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,11 @@
2222
InferenceType,
2323
)
2424
from airbyte_cdk.sources.file_based.config.file_based_stream_config import FileBasedStreamConfig
25-
from airbyte_cdk.sources.file_based.exceptions import FileBasedSourceError, RecordParseError
25+
from airbyte_cdk.sources.file_based.exceptions import (
26+
EmptyFileSchemaInferenceError,
27+
FileBasedSourceError,
28+
RecordParseError,
29+
)
2630
from airbyte_cdk.sources.file_based.file_based_stream_reader import (
2731
AbstractFileBasedStreamReader,
2832
FileReadMode,
@@ -203,7 +207,7 @@ async def infer_schema(
203207
break
204208

205209
if not type_inferrer_by_field:
206-
raise AirbyteTracedException(
210+
raise EmptyFileSchemaInferenceError(
207211
message=f"Could not infer schema as there are no rows in {file.uri}. If having an empty CSV file is expected, ignore this. "
208212
f"Else, please contact Airbyte.",
209213
failure_type=FailureType.config_error,

airbyte_cdk/sources/file_based/stream/default_file_based_stream.py

Lines changed: 32 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -9,13 +9,26 @@
99
from copy import deepcopy
1010
from functools import cache
1111
from os import path
12-
from typing import Any, Dict, Iterable, List, Mapping, MutableMapping, Optional, Set, Tuple, Union
12+
from typing import (
13+
Any,
14+
Dict,
15+
Iterable,
16+
List,
17+
Mapping,
18+
MutableMapping,
19+
NoReturn,
20+
Optional,
21+
Set,
22+
Tuple,
23+
Union,
24+
)
1325

1426
from airbyte_cdk.models import AirbyteLogMessage, AirbyteMessage, AirbyteStream, FailureType, Level
1527
from airbyte_cdk.models import Type as MessageType
1628
from airbyte_cdk.sources.file_based.config.file_based_stream_config import PrimaryKeyType
1729
from airbyte_cdk.sources.file_based.exceptions import (
1830
DuplicatedFilesError,
31+
EmptyFileSchemaInferenceError,
1932
FileBasedSourceError,
2033
InvalidSchemaError,
2134
MissingSchemaError,
@@ -230,7 +243,7 @@ def cursor_field(self) -> Union[str, List[str]]:
230243
return self.ab_last_mod_col
231244

232245
@cache
233-
def get_json_schema(self) -> JsonSchema:
246+
def get_json_schema(self) -> JsonSchema: # type: ignore
234247
if self.use_file_transfer:
235248
return file_transfer_schema
236249
extra_fields = {
@@ -246,12 +259,12 @@ def get_json_schema(self) -> JsonSchema:
246259
exception=AirbyteTracedException(exception=config_exception),
247260
failure_type=FailureType.config_error,
248261
)
262+
except EmptyFileSchemaInferenceError as exc:
263+
self._raise_schema_inference_error(exc)
249264
except AirbyteTracedException as ate:
250265
raise ate
251266
except Exception as exc:
252-
raise SchemaInferenceError(
253-
FileBasedSourceError.SCHEMA_INFERENCE_ERROR, stream=self.name
254-
) from exc
267+
self._raise_schema_inference_error(exc)
255268
else:
256269
return {"type": "object", "properties": {**extra_fields, **schema["properties"]}}
257270

@@ -380,17 +393,24 @@ async def _infer_schema(self, files: List[RemoteFile]) -> Mapping[str, Any]:
380393

381394
return base_schema
382395

383-
async def _infer_file_schema(self, file: RemoteFile) -> SchemaType:
396+
async def _infer_file_schema(self, file: RemoteFile) -> SchemaType: # type: ignore
384397
try:
385398
return await self.get_parser().infer_schema(
386399
self.config, file, self.stream_reader, self.logger
387400
)
401+
except EmptyFileSchemaInferenceError as exc:
402+
self._raise_schema_inference_error(exc, file)
388403
except AirbyteTracedException as ate:
389404
raise ate
390405
except Exception as exc:
391-
raise SchemaInferenceError(
392-
FileBasedSourceError.SCHEMA_INFERENCE_ERROR,
393-
file=file.uri,
394-
format=str(self.config.format),
395-
stream=self.name,
396-
) from exc
406+
self._raise_schema_inference_error(exc, file)
407+
408+
def _raise_schema_inference_error(
409+
self, exc: Exception, file: Optional[RemoteFile] = None
410+
) -> NoReturn:
411+
raise SchemaInferenceError(
412+
FileBasedSourceError.SCHEMA_INFERENCE_ERROR,
413+
file=file.uri if file else None,
414+
format=str(self.config.format) if self.config.format else None,
415+
stream=self.name,
416+
) from exc

0 commit comments

Comments
 (0)